From 7db0cd2a02f989775a73b9adf0d7c52f839c3ab2 Mon Sep 17 00:00:00 2001 From: "user.mail" Date: Thu, 25 Jun 2026 13:29:52 +0300 Subject: [PATCH] feat(datasets): add generic 'datasets trigger' passthrough command - New faithful wrapper over /datasets/v3/trigger with zero per-dataset knowledge: --dataset/--dataset-id, --type, --discover-by, --input-file/--input (JSON or JSONL), --limit, --include-errors/--no-include-errors, --format, --timeout, --async. - Sends the user's input array verbatim (bypasses build_input), unlocking discovery mode (type=discover_new + discover_by) that the pipelines command couldn't express. - Reuses the existing trigger/poll/format path (poll_until, resolve_format, snapshot fetch); --async hands off to the existing status command. - Exports the shared engine pieces from dataset.ts (additive, non-breaking) and registers the command in index.ts. - Adds 13 tests; full suite green at 352. --- .../commands/datasets-trigger.test.ts | 245 +++++++++++++ src/commands/dataset.ts | 14 +- src/commands/datasets-trigger.ts | 326 ++++++++++++++++++ src/index.ts | 2 + 4 files changed, 586 insertions(+), 1 deletion(-) create mode 100644 src/__tests__/commands/datasets-trigger.test.ts create mode 100644 src/commands/datasets-trigger.ts diff --git a/src/__tests__/commands/datasets-trigger.test.ts b/src/__tests__/commands/datasets-trigger.test.ts new file mode 100644 index 0000000..efb35df --- /dev/null +++ b/src/__tests__/commands/datasets-trigger.test.ts @@ -0,0 +1,245 @@ +import {describe, it, expect, beforeEach, afterEach, vi} from 'vitest'; +import os from 'os'; +import path from 'path'; +import {writeFileSync, rmSync} from 'fs'; + +const mocks = vi.hoisted(()=>({ + post: vi.fn(), + get: vi.fn(), + ensure_authenticated: vi.fn(), + start: vi.fn(), + stop: vi.fn(), + print: vi.fn(), + success: vi.fn(), + info: vi.fn(), + fail: vi.fn((msg: string)=>{ throw new Error(`fail:${msg}`); }), + parse_timeout: vi.fn(), + poll_until: vi.fn(), +})); + +vi.mock('../../utils/client', ()=>({ + post: mocks.post, + get: mocks.get, +})); + +vi.mock('../../utils/auth', ()=>({ + ensure_authenticated: mocks.ensure_authenticated, +})); + +vi.mock('../../utils/spinner', ()=>({ + start: mocks.start, +})); + +vi.mock('../../utils/output', ()=>({ + print: mocks.print, + dim: (s: string)=>s, + fail: mocks.fail, + success: mocks.success, + info: mocks.info, +})); + +vi.mock('../../utils/polling', ()=>({ + parse_timeout: mocks.parse_timeout, + poll_until: mocks.poll_until, +})); + +import {handle_datasets_trigger} from '../../commands/datasets-trigger'; + +const X_POSTS_ID = 'gd_lwxkxvnf1cynvib9co'; + +describe('commands/datasets trigger', ()=>{ + beforeEach(()=>{ + vi.clearAllMocks(); + mocks.ensure_authenticated.mockReturnValue('api_key'); + mocks.start.mockReturnValue({stop: mocks.stop}); + mocks.parse_timeout.mockReturnValue(600); + vi.spyOn(console, 'error').mockImplementation(()=>{}); + }); + + afterEach(()=>{ + vi.restoreAllMocks(); + }); + + it('triggers collect-by-URL by dataset name and sends input verbatim', + async()=>{ + mocks.post.mockResolvedValue({snapshot_id: 'snap_1'}); + await handle_datasets_trigger({ + dataset: 'x_posts', + input: '[{"url":"https://x.com/sama"}]', + async: true, + }); + expect(mocks.post).toHaveBeenCalledWith( + 'api_key', + `/datasets/v3/trigger?dataset_id=${X_POSTS_ID}` + +'&include_errors=true', + [{url: 'https://x.com/sama'}], + {timing: undefined} + ); + expect(mocks.success).toHaveBeenCalledWith( + 'Trigger submitted. Snapshot ID: snap_1' + ); + expect(mocks.poll_until).not.toHaveBeenCalled(); + }); + + it('adds discovery params to the trigger query string', async()=>{ + mocks.post.mockResolvedValue({snapshot_id: 'snap_1'}); + await handle_datasets_trigger({ + dataset: 'x_posts', + type: 'discover_new', + discoverBy: 'profile_url', + limit: '15000', + input: '[{"url":"https://x.com/sama"}]', + async: true, + }); + expect(mocks.post).toHaveBeenCalledWith( + 'api_key', + `/datasets/v3/trigger?dataset_id=${X_POSTS_ID}` + +'&type=discover_new&discover_by=profile_url' + +'&limit_per_input=15000&include_errors=true', + [{url: 'https://x.com/sama'}], + {timing: undefined} + ); + }); + + it('accepts a raw --dataset-id for any dataset', async()=>{ + mocks.post.mockResolvedValue({snapshot_id: 'snap_1'}); + await handle_datasets_trigger({ + datasetId: 'gd_brand_new_dataset', + input: '[{"x":1}]', + async: true, + }); + expect(mocks.post).toHaveBeenCalledWith( + 'api_key', + '/datasets/v3/trigger?dataset_id=gd_brand_new_dataset' + +'&include_errors=true', + [{x: 1}], + {timing: undefined} + ); + }); + + it('fails when both --dataset and --dataset-id are given', async()=>{ + await expect(handle_datasets_trigger({ + dataset: 'x_posts', + datasetId: 'gd_x', + input: '[{}]', + })).rejects.toThrow( + 'fail:Provide either --dataset or --dataset-id, not both.' + ); + expect(mocks.post).not.toHaveBeenCalled(); + }); + + it('fails on an unknown dataset name', async()=>{ + await expect(handle_datasets_trigger({ + dataset: 'not_a_dataset', + input: '[{}]', + })).rejects.toThrow('Unknown dataset "not_a_dataset"'); + expect(mocks.post).not.toHaveBeenCalled(); + }); + + it('fails when no input is provided', async()=>{ + await expect(handle_datasets_trigger({dataset: 'x_posts'})) + .rejects.toThrow('No input provided'); + expect(mocks.post).not.toHaveBeenCalled(); + }); + + it('fails on an empty input array', async()=>{ + await expect(handle_datasets_trigger({ + dataset: 'x_posts', + input: '[]', + })).rejects.toThrow('Input is empty'); + expect(mocks.post).not.toHaveBeenCalled(); + }); + + it('fails when both --input-file and --input are given', async()=>{ + await expect(handle_datasets_trigger({ + dataset: 'x_posts', + inputFile: 'seeds.jsonl', + input: '[{}]', + })).rejects.toThrow( + 'Provide either --input-file or --input, not both.' + ); + expect(mocks.post).not.toHaveBeenCalled(); + }); + + it('fails on an invalid --limit', async()=>{ + await expect(handle_datasets_trigger({ + dataset: 'x_posts', + input: '[{"x":1}]', + limit: '-5', + })).rejects.toThrow('Invalid --limit "-5"'); + expect(mocks.post).not.toHaveBeenCalled(); + }); + + it('--no-include-errors sets include_errors=false', async()=>{ + mocks.post.mockResolvedValue({snapshot_id: 'snap_1'}); + await handle_datasets_trigger({ + dataset: 'x_posts', + input: '[{"x":1}]', + includeErrors: false, + async: true, + }); + expect(mocks.post).toHaveBeenCalledWith( + 'api_key', + `/datasets/v3/trigger?dataset_id=${X_POSTS_ID}` + +'&include_errors=false', + [{x: 1}], + {timing: undefined} + ); + }); + + it('wraps a single inline JSON object into an array', async()=>{ + mocks.post.mockResolvedValue({snapshot_id: 'snap_1'}); + await handle_datasets_trigger({ + dataset: 'x_posts', + input: '{"url":"u"}', + async: true, + }); + expect(mocks.post).toHaveBeenCalledWith( + 'api_key', + expect.any(String), + [{url: 'u'}], + {timing: undefined} + ); + }); + + it('reads and parses a JSONL input file (real temp file)', async()=>{ + const tmp = path.join( + os.tmpdir(), `bd-trigger-${process.pid}.jsonl` + ); + writeFileSync(tmp, '{"url":"u1"}\n{"url":"u2"}\n'); + mocks.post.mockResolvedValue({snapshot_id: 'snap_1'}); + try { + await handle_datasets_trigger({ + dataset: 'x_posts', + inputFile: tmp, + async: true, + }); + } finally { + rmSync(tmp, {force: true}); + } + expect(mocks.post).toHaveBeenCalledWith( + 'api_key', + expect.stringContaining(`dataset_id=${X_POSTS_ID}`), + [{url: 'u1'}, {url: 'u2'}], + {timing: undefined} + ); + }); + + it('sync mode polls until ready and prints the data', async()=>{ + mocks.post.mockResolvedValue({snapshot_id: 'snap_1'}); + mocks.poll_until.mockResolvedValue({ + result: [{a: 1}], + attempts: 1, + }); + await handle_datasets_trigger({ + dataset: 'x_posts', + input: '[{"url":"u"}]', + }); + expect(mocks.poll_until).toHaveBeenCalledTimes(1); + expect(mocks.print).toHaveBeenCalledWith( + [{a: 1}], + {json: undefined, pretty: undefined, output: undefined} + ); + expect(mocks.success).not.toHaveBeenCalled(); + }); +}); diff --git a/src/commands/dataset.ts b/src/commands/dataset.ts index 92fa1a0..a018c93 100644 --- a/src/commands/dataset.ts +++ b/src/commands/dataset.ts @@ -323,4 +323,16 @@ add_examples(pipelines_command, [ }, ]); -export {pipelines_command, handle_pipelines}; +export { + pipelines_command, + handle_pipelines, + DATASET_IDS, + resolve_dataset_type, + resolve_format, + strip_nulls, + extract_status, + TRIGGER_ENDPOINT, + SNAPSHOT_ENDPOINT, + RUNNING_STATUSES, +}; +export type {Dataset_type}; diff --git a/src/commands/datasets-trigger.ts b/src/commands/datasets-trigger.ts new file mode 100644 index 0000000..52e907e --- /dev/null +++ b/src/commands/datasets-trigger.ts @@ -0,0 +1,326 @@ +import fs from 'fs'; +import {Command} from 'commander'; +import {ensure_authenticated} from '../utils/auth'; +import {get, post} from '../utils/client'; +import {print, dim, fail, success, info} from '../utils/output'; +import {start as start_spinner} from '../utils/spinner'; +import {parse_timeout, poll_until} from '../utils/polling'; +import {add_examples} from '../utils/help'; +import { + DATASET_IDS, + resolve_dataset_type, + resolve_format, + strip_nulls, + extract_status, + TRIGGER_ENDPOINT, + SNAPSHOT_ENDPOINT, + RUNNING_STATUSES, +} from './dataset'; +import type {Trigger_response} from '../types/dataset'; + +// A faithful, dataset-agnostic wrapper over /datasets/v3/trigger. Unlike +// `pipelines ` (which hardcodes a bare-{url} body via build_input), +// this command forwards the user's input array verbatim and exposes the +// trigger's mode/filter params (type / discover_by / limit_per_input / +// include_errors). It bakes in NO per-dataset schema knowledge — the user +// owns the input shape, exactly as the REST API expects it. + +type Trigger_opts = { + dataset?: string; + datasetId?: string; + type?: string; + discoverBy?: string; + inputFile?: string; + input?: string; + limit?: string; + includeErrors?: boolean; + format?: string; + timeout?: string; + async?: boolean; + output?: string; + json?: boolean; + pretty?: boolean; + timing?: boolean; + apiKey?: string; +}; + +const resolve_dataset_id = (opts: Trigger_opts): string|undefined=>{ + if (opts.dataset && opts.datasetId) + { + fail('Provide either --dataset or --dataset-id, not both.'); + return undefined; + } + if (opts.datasetId) + return opts.datasetId; + if (opts.dataset) + { + const type_key = resolve_dataset_type(opts.dataset); + if (!type_key) + { + fail( + `Unknown dataset "${opts.dataset}".\n` + +' Run \'brightdata pipelines list\' to see available names,\n' + +' or pass a raw id with --dataset-id .' + ); + return undefined; + } + return DATASET_IDS[type_key]; + } + fail( + 'No dataset specified.\n' + +' Use --dataset (e.g. x_posts) or --dataset-id .' + ); + return undefined; +}; + +const parse_input_text = (text: string, source: string): unknown[]=>{ + const trimmed = text.trim(); + if (!trimmed) + { + fail(`No input found in ${source}.`); + return []; + } + // Prefer whole-document JSON (an array, or a single object we wrap). + try { + const parsed = JSON.parse(trimmed); + return Array.isArray(parsed) ? parsed : [parsed]; + } catch(_e) { + // Fall back to JSONL: one JSON value per non-empty line. + const lines = trimmed.split(/\r?\n/) + .map(line=>line.trim()) + .filter(Boolean); + const out: unknown[] = []; + for (let i=0; i{ + if (opts.inputFile && opts.input !== undefined) + { + fail('Provide either --input-file or --input, not both.'); + return undefined; + } + let array: unknown[]; + if (opts.inputFile) + { + let text: string; + try { + text = fs.readFileSync(opts.inputFile, 'utf8'); + } catch(e) { + fail( + `Cannot read input file "${opts.inputFile}": ` + +`${(e as Error).message}` + ); + return undefined; + } + array = parse_input_text(text, `input file "${opts.inputFile}"`); + } + else if (opts.input !== undefined) + array = parse_input_text(opts.input, '--input'); + else + { + fail( + 'No input provided.\n' + +' Pass --input-file (JSON or JSONL) or --input \'\'.\n' + +' The input is the array sent verbatim to the trigger API.' + ); + return undefined; + } + if (!array.length) + { + fail('Input is empty; provide at least one input object.'); + return undefined; + } + return array; +}; + +const parse_limit = (raw: string|undefined): number|undefined=>{ + if (raw === undefined) + return undefined; + const value = Number(raw); + if (!Number.isInteger(value) || value <= 0) + { + fail( + `Invalid --limit "${raw}".\n` + +' Use a positive integer (maps to limit_per_input).' + ); + return undefined; + } + return value; +}; + +const build_trigger_endpoint = ( + dataset_id: string, + opts: Trigger_opts, + limit_per_input: number|undefined, + include_errors: boolean +): string=>{ + const params = [`dataset_id=${encodeURIComponent(dataset_id)}`]; + if (opts.type) + params.push(`type=${encodeURIComponent(opts.type)}`); + if (opts.discoverBy) + params.push(`discover_by=${encodeURIComponent(opts.discoverBy)}`); + if (limit_per_input !== undefined) + params.push(`limit_per_input=${limit_per_input}`); + params.push(`include_errors=${include_errors}`); + return `${TRIGGER_ENDPOINT}?${params.join('&')}`; +}; + +const handle_datasets_trigger = async(opts: Trigger_opts)=>{ + const dataset_id = resolve_dataset_id(opts); + if (!dataset_id) + return; + const input = load_input(opts); + if (!input) + return; + const limit_per_input = parse_limit(opts.limit); + // include_errors defaults to true (matching `pipelines`); only an + // explicit --no-include-errors turns it off. + const include_errors = opts.includeErrors !== false; + const api_key = ensure_authenticated(opts.apiKey); + let timeout = 600; + try { + timeout = parse_timeout(opts.timeout); + } catch(e) { + fail((e as Error).message); + return; + } + const format = resolve_format(opts.format); + const endpoint = build_trigger_endpoint( + dataset_id, opts, limit_per_input, include_errors + ); + const spinner = start_spinner( + `Triggering dataset collection (${dataset_id})...` + ); + try { + const trigger = await post( + api_key, + endpoint, + input, + {timing: opts.timing} + ); + spinner.stop(); + const snapshot_id = trigger.snapshot_id; + if (!snapshot_id) + { + fail('Failed to trigger collection (missing snapshot_id).'); + return; + } + if (opts.async) + { + success(`Trigger submitted. Snapshot ID: ${snapshot_id}`); + info(`Track it with: brightdata status ${snapshot_id} --wait`); + return; + } + console.error(dim( + `Triggered collection with snapshot ID: ${snapshot_id}` + )); + const poll_result = await poll_until({ + timeout_seconds: timeout, + fetch_once: ()=>{ + const snapshot = `${SNAPSHOT_ENDPOINT}/${snapshot_id}` + +`?format=${format}`; + return get(api_key, snapshot, {timing: opts.timing}); + }, + get_status: extract_status, + running_statuses: RUNNING_STATUSES, + timeout_label: 'data', + on_running: ({attempt, timeout_seconds, status})=>{ + console.error(dim( + `Status: ${status} - polling again ` + +`(attempt ${attempt}/${timeout_seconds})` + )); + }, + }); + console.error(dim( + `Data received after ${poll_result.attempts} attempts` + )); + const result = poll_result.result; + const cleaned = format == 'json' ? strip_nulls(result) : result; + print(cleaned, { + json: opts.json, + pretty: opts.pretty, + output: opts.output, + }); + } catch(e) { + spinner.stop(); + console.error((e as Error).message); + process.exit(1); + } +}; + +const trigger_command = new Command('trigger') + .description( + 'Trigger a Datasets v3 collection job (collect-by-URL or discovery)' + ) + .option('--dataset ', + 'Dataset name (e.g. x_posts); see \'brightdata pipelines list\'') + .option('--dataset-id ', + 'Raw dataset id (gd_...); works for any dataset, no code change') + .option('--type ', + 'Trigger type, e.g. discover_new (omit for collect-by-URL)') + .option('--discover-by ', + 'Discovery seed field, e.g. profile_url or url') + .option('--input-file ', + 'Input array file (JSON or JSONL), sent verbatim to the API') + .option('--input ', + 'Inline JSON input (array or single object), sent verbatim') + .option('--limit ', + 'Max records per input (maps to limit_per_input)') + .option('--include-errors', + 'Include per-row errors in the snapshot (default: true)') + .option('--no-include-errors', + 'Exclude per-row errors from the snapshot') + .option('--format ', + 'Result format: json, csv, ndjson, jsonl (default: json)') + .option('--timeout ', + 'Polling timeout in seconds ' + +'(default: 600 or BRIGHTDATA_POLLING_TIMEOUT)') + .option('--async', + 'Trigger only; print the snapshot ID and exit (no polling)') + .option('-o, --output ', 'Write output to file') + .option('--json', 'Force JSON output') + .option('--pretty', 'Pretty-print JSON output') + .option('--timing', 'Show request timing') + .action(handle_datasets_trigger); + +add_examples(trigger_command, [ + { + description: 'Discovery mode: crawl an X profile\'s posts by date ' + +'range (input array in a JSONL file)', + command: 'brightdata datasets trigger --dataset x_posts ' + +'--type discover_new --discover-by profile_url ' + +'--input-file seeds.jsonl --limit 15000 --format jsonl', + }, + { + description: 'Collect-by-URL with inline input (no per-dataset flags)', + command: 'brightdata datasets trigger --dataset linkedin_posts ' + +'--input \'[{"url":"https://www.linkedin.com/posts/example"}]\'', + }, + { + description: 'Trigger asynchronously by raw dataset id, then poll ' + +'with status', + command: 'brightdata datasets trigger ' + +'--dataset-id gd_lwxkxvnf1cynvib9co ' + +'--input-file seeds.jsonl --async', + }, +]); + +const datasets_command = new Command('datasets') + .description('Trigger Bright Data Datasets v3 collection jobs') + .addCommand(trigger_command); + +export {datasets_command, trigger_command, handle_datasets_trigger}; diff --git a/src/index.ts b/src/index.ts index 7493e74..8d4ccfe 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,6 +7,7 @@ import {logout_command} from './commands/logout'; import {scrape_command} from './commands/scrape'; import {search_command} from './commands/search'; import {pipelines_command} from './commands/dataset'; +import {datasets_command} from './commands/datasets-trigger'; import {status_command} from './commands/status'; import {zones_command} from './commands/zones'; import {config_command} from './commands/config'; @@ -42,6 +43,7 @@ const build_program = ()=>{ program.addCommand(scrape_command); program.addCommand(search_command); program.addCommand(pipelines_command); + program.addCommand(datasets_command); program.addCommand(status_command); program.addCommand(zones_command); program.addCommand(config_command);