diff --git a/.agents/skills/add-tools/SKILL.md b/.agents/skills/add-tools/SKILL.md index 66f6f88d047..03dbd68456f 100644 --- a/.agents/skills/add-tools/SKILL.md +++ b/.agents/skills/add-tools/SKILL.md @@ -266,9 +266,9 @@ export * from './types' ## Registering Tools -After creating tools, remind the user to: +After creating tools: 1. Import tools in `apps/sim/tools/registry.ts` -2. Add to the `tools` object with snake_case keys: +2. Add to the `tools` object with snake_case keys (alphabetically): ```typescript import { serviceActionTool } from '@/tools/{service}' @@ -278,6 +278,130 @@ export const tools = { } ``` +## Wiring Tools into the Block (Required) + +After registering in `tools/registry.ts`, you MUST also update the block definition at `apps/sim/blocks/blocks/{service}.ts`. This is not optional — tools are only usable from the UI if they are wired into the block. + +### 1. Add to `tools.access` + +```typescript +tools: { + access: [ + // existing tools... + 'service_new_action', // Add every new tool ID here + ], + config: { ... } +} +``` + +### 2. Add operation dropdown options + +If the block uses an operation dropdown, add an option for each new tool: + +```typescript +{ + id: 'operation', + type: 'dropdown', + options: [ + // existing options... + { label: 'New Action', id: 'new_action' }, // id maps to what tools.config.tool returns + ], +} +``` + +### 3. Add subBlocks for new tool params + +For each new tool, add subBlocks covering all its required params (and optional ones where useful). Apply `condition` to show them only for the right operation, and mark required params with `required`: + +```typescript +// Required param for new_action +{ + id: 'someParam', + title: 'Some Param', + type: 'short-input', + placeholder: 'e.g., value', + condition: { field: 'operation', value: 'new_action' }, + required: { field: 'operation', value: 'new_action' }, +}, +// Optional param — put in advanced mode +{ + id: 'optionalParam', + title: 'Optional Param', + type: 'short-input', + condition: { field: 'operation', value: 'new_action' }, + mode: 'advanced', +}, +``` + +### 4. Update `tools.config.tool` + +Ensure the tool selector returns the correct tool ID for every new operation. The simplest pattern: + +```typescript +tool: (params) => `service_${params.operation}`, +// If operation dropdown IDs already match tool IDs, this requires no change. +``` + +If the dropdown IDs differ from tool IDs, add explicit mappings: + +```typescript +tool: (params) => { + const map: Record = { + new_action: 'service_new_action', + // ... + } + return map[params.operation] ?? `service_${params.operation}` +}, +``` + +### 5. Update `tools.config.params` + +Add any type coercions needed for new params (runs at execution time, after variable resolution): + +```typescript +params: (params) => { + const result: Record = {} + if (params.limit != null && params.limit !== '') result.limit = Number(params.limit) + if (params.newParamName) result.toolParamName = params.newParamName // rename if IDs differ + return result +}, +``` + +### 6. Add new outputs + +Add any new fields returned by the new tools to the block `outputs`: + +```typescript +outputs: { + // existing outputs... + newField: { type: 'string', description: 'Description of new field' }, +} +``` + +### 7. Add new inputs + +Add new subBlock param IDs to the block `inputs` section: + +```typescript +inputs: { + // existing inputs... + someParam: { type: 'string', description: 'Param description' }, + optionalParam: { type: 'string', description: 'Optional param description' }, +} +``` + +### Block wiring checklist + +- [ ] New tool IDs added to `tools.access` +- [ ] Operation dropdown has an option for each new tool +- [ ] SubBlocks cover all required params for each new tool +- [ ] SubBlocks have correct `condition` (only show for the right operation) +- [ ] Optional/rarely-used params set to `mode: 'advanced'` +- [ ] `tools.config.tool` returns correct ID for every new operation +- [ ] `tools.config.params` handles any ID remapping or type coercions +- [ ] New outputs added to block `outputs` +- [ ] New params added to block `inputs` + ## V2 Tool Pattern If creating V2 tools (API-aligned outputs), use `_v2` suffix: @@ -299,7 +423,9 @@ All tool IDs MUST use `snake_case`: `{service}_{action}` (e.g., `x_create_tweet` - [ ] All optional outputs have `optional: true` - [ ] No raw JSON dumps in outputs - [ ] Types file has all interfaces -- [ ] Index.ts exports all tools +- [ ] Index.ts exports all tools and re-exports types (`export * from './types'`) +- [ ] Tools registered in `tools/registry.ts` +- [ ] Block wired: `tools.access`, dropdown options, subBlocks, `tools.config`, outputs, inputs ## Final Validation (Required) diff --git a/apps/docs/components/icons.tsx b/apps/docs/components/icons.tsx index cbc3b5edd85..8b0ed3aabb1 100644 --- a/apps/docs/components/icons.tsx +++ b/apps/docs/components/icons.tsx @@ -4929,6 +4929,49 @@ export function SSHIcon(props: SVGProps) { ) } +export function DagsterIcon(props: SVGProps) { + return ( + + + + + + + + + + + + ) +} + export function DatabricksIcon(props: SVGProps) { return ( diff --git a/apps/docs/components/ui/icon-mapping.ts b/apps/docs/components/ui/icon-mapping.ts index c65d55ed9d4..237e5941316 100644 --- a/apps/docs/components/ui/icon-mapping.ts +++ b/apps/docs/components/ui/icon-mapping.ts @@ -32,6 +32,7 @@ import { CloudWatchIcon, ConfluenceIcon, CursorIcon, + DagsterIcon, DatabricksIcon, DatadogIcon, DevinIcon, @@ -218,6 +219,7 @@ export const blockTypeToIconMap: Record = { cloudwatch: CloudWatchIcon, confluence_v2: ConfluenceIcon, cursor_v2: CursorIcon, + dagster: DagsterIcon, databricks: DatabricksIcon, datadog: DatadogIcon, devin: DevinIcon, diff --git a/apps/docs/content/docs/en/tools/dagster.mdx b/apps/docs/content/docs/en/tools/dagster.mdx new file mode 100644 index 00000000000..b82c1a7f4ab --- /dev/null +++ b/apps/docs/content/docs/en/tools/dagster.mdx @@ -0,0 +1,343 @@ +--- +title: Dagster +description: Orchestrate data pipelines and manage job runs with Dagster +--- + +import { BlockInfoCard } from "@/components/ui/block-info-card" + + + +{/* MANUAL-CONTENT-START:intro */} +[Dagster](https://dagster.io/) is an open-source data orchestration platform designed for building, testing, and monitoring data pipelines. It provides a unified model for defining data assets, scheduling jobs, and observing pipeline execution — whether running locally or deployed to Dagster+. + +With Dagster, you can: + +- **Orchestrate data pipelines**: Define and run jobs composed of ops and assets with full dependency tracking +- **Monitor executions**: Track run status, inspect logs, and debug failures step by step +- **Manage schedules and sensors**: Automate pipeline triggers on a cron schedule or in response to external events +- **Reexecute selectively**: Resume failed pipelines from the point of failure without rerunning successful steps + +In Sim, the Dagster integration enables your agents to interact with a Dagster instance programmatically. Agents can launch and monitor job runs, retrieve execution logs, reexecute failed runs, and manage schedules and sensors — all as part of a larger automated workflow. Use Dagster as an orchestration layer your agents can control and observe, enabling data-driven automation that responds dynamically to pipeline outcomes. +{/* MANUAL-CONTENT-END */} + + +## Usage Instructions + +Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, terminate or delete runs, reexecute failed runs, fetch run logs, and manage schedules and sensors. API token only required for Dagster+. + + + +## Tools + +### `dagster_launch_run` + +Launch a job run on a Dagster instance. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3000\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `jobName` | string | Yes | Name of the job to launch | +| `runConfigJson` | string | No | Run configuration as a JSON object \(optional\) | +| `tags` | string | No | Tags as a JSON array of \{key, value\} objects \(optional\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | The globally unique ID of the launched run | + +### `dagster_get_run` + +Get the status and details of a Dagster run by its ID. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3000\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `runId` | string | Yes | The ID of the run to retrieve | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | Run ID | +| `jobName` | string | Name of the job this run belongs to | +| `status` | string | Run status \(QUEUED, NOT_STARTED, STARTING, MANAGED, STARTED, SUCCESS, FAILURE, CANCELING, CANCELED\) | +| `startTime` | number | Run start time as Unix timestamp | +| `endTime` | number | Run end time as Unix timestamp | +| `runConfigYaml` | string | Run configuration as YAML | +| `tags` | json | Run tags as array of \{key, value\} objects | + +### `dagster_get_run_logs` + +Fetch execution event logs for a Dagster run. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `runId` | string | Yes | The ID of the run to fetch logs for | +| `afterCursor` | string | No | Cursor for paginating through log events \(from a previous response\) | +| `limit` | number | No | Maximum number of log events to return | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `events` | json | Array of log events \(type, message, timestamp, level, stepKey, eventType\) | +| ↳ `type` | string | GraphQL typename of the event | +| ↳ `message` | string | Human-readable log message | +| ↳ `timestamp` | string | Event timestamp as a Unix epoch string | +| ↳ `level` | string | Log level \(DEBUG, INFO, WARNING, ERROR, CRITICAL\) | +| ↳ `stepKey` | string | Step key, if the event is step-scoped | +| ↳ `eventType` | string | Dagster event type enum value | +| `cursor` | string | Cursor for fetching the next page of log events | +| `hasMore` | boolean | Whether more log events are available beyond this page | + +### `dagster_list_runs` + +List recent Dagster runs, optionally filtered by job name. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `jobName` | string | No | Filter runs by job name \(optional\) | +| `statuses` | string | No | Comma-separated run statuses to filter by, e.g. "SUCCESS,FAILURE" \(optional\) | +| `limit` | number | No | Maximum number of runs to return \(default 20\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runs` | json | Array of runs | +| ↳ `runId` | string | Run ID | +| ↳ `jobName` | string | Job name | +| ↳ `status` | string | Run status | +| ↳ `tags` | json | Run tags as array of \{key, value\} objects | +| ↳ `startTime` | number | Start time as Unix timestamp | +| ↳ `endTime` | number | End time as Unix timestamp | + +### `dagster_list_jobs` + +List all jobs across repositories in a Dagster instance. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `jobs` | json | Array of jobs with name and repositoryName | +| ↳ `name` | string | Job name | +| ↳ `repositoryName` | string | Repository name | + +### `dagster_reexecute_run` + +Reexecute an existing Dagster run, optionally resuming only from failed steps. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `parentRunId` | string | Yes | The ID of the run to reexecute | +| `strategy` | string | Yes | Reexecution strategy: ALL_STEPS reruns everything, FROM_FAILURE resumes from failed steps, FROM_ASSET_FAILURE resumes from failed assets | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | The ID of the newly launched reexecution run | + +### `dagster_terminate_run` + +Terminate an in-progress Dagster run. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `runId` | string | Yes | The ID of the run to terminate | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `success` | boolean | Whether the run was successfully terminated | +| `runId` | string | The ID of the terminated run | +| `message` | string | Error or status message if termination failed | + +### `dagster_delete_run` + +Permanently delete a Dagster run record. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `runId` | string | Yes | The ID of the run to delete | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | The ID of the deleted run | + +### `dagster_list_schedules` + +List all schedules in a Dagster repository, optionally filtered by status. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `scheduleStatus` | string | No | Filter schedules by status: RUNNING or STOPPED \(omit to return all\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `schedules` | json | Array of schedules \(name, cronSchedule, jobName, status, id, description, executionTimezone\) | +| ↳ `name` | string | Schedule name | +| ↳ `cronSchedule` | string | Cron expression for the schedule | +| ↳ `jobName` | string | Job the schedule targets | +| ↳ `status` | string | Schedule status: RUNNING or STOPPED | +| ↳ `id` | string | Instigator state ID — use this to start or stop the schedule | +| ↳ `description` | string | Human-readable schedule description | +| ↳ `executionTimezone` | string | Timezone for cron evaluation | + +### `dagster_start_schedule` + +Enable (start) a schedule in a Dagster repository. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `scheduleName` | string | Yes | Name of the schedule to start | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `id` | string | Instigator state ID of the schedule | +| `status` | string | Updated schedule status \(RUNNING or STOPPED\) | + +### `dagster_stop_schedule` + +Disable (stop) a running schedule in Dagster. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `instigationStateId` | string | Yes | InstigationState ID of the schedule to stop — available from dagster_list_schedules output | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `id` | string | Instigator state ID of the schedule | +| `status` | string | Updated schedule status \(RUNNING or STOPPED\) | + +### `dagster_list_sensors` + +List all sensors in a Dagster repository, optionally filtered by status. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `sensorStatus` | string | No | Filter sensors by status: RUNNING or STOPPED \(omit to return all\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `sensors` | json | Array of sensors \(name, sensorType, status, id, description\) | +| ↳ `name` | string | Sensor name | +| ↳ `sensorType` | string | Sensor type \(ASSET, AUTO_MATERIALIZE, FRESHNESS_POLICY, MULTI_ASSET, RUN_STATUS, STANDARD\) | +| ↳ `status` | string | Sensor status: RUNNING or STOPPED | +| ↳ `id` | string | Instigator state ID — use this to start or stop the sensor | +| ↳ `description` | string | Human-readable sensor description | + +### `dagster_start_sensor` + +Enable (start) a sensor in a Dagster repository. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `sensorName` | string | Yes | Name of the sensor to start | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `id` | string | Instigator state ID of the sensor | +| `status` | string | Updated sensor status \(RUNNING or STOPPED\) | + +### `dagster_stop_sensor` + +Disable (stop) a running sensor in Dagster. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `instigationStateId` | string | Yes | InstigationState ID of the sensor to stop — available from dagster_list_sensors output | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `id` | string | Instigator state ID of the sensor | +| `status` | string | Updated sensor status \(RUNNING or STOPPED\) | + + diff --git a/apps/docs/content/docs/en/tools/meta.json b/apps/docs/content/docs/en/tools/meta.json index a0f99bf6616..e413b867f71 100644 --- a/apps/docs/content/docs/en/tools/meta.json +++ b/apps/docs/content/docs/en/tools/meta.json @@ -27,6 +27,7 @@ "cloudwatch", "confluence", "cursor", + "dagster", "databricks", "datadog", "devin", diff --git a/apps/sim/app/(landing)/integrations/data/icon-mapping.ts b/apps/sim/app/(landing)/integrations/data/icon-mapping.ts index 503242d8c1e..898ee4287ee 100644 --- a/apps/sim/app/(landing)/integrations/data/icon-mapping.ts +++ b/apps/sim/app/(landing)/integrations/data/icon-mapping.ts @@ -32,6 +32,7 @@ import { CloudWatchIcon, ConfluenceIcon, CursorIcon, + DagsterIcon, DatabricksIcon, DatadogIcon, DevinIcon, @@ -218,6 +219,7 @@ export const blockTypeToIconMap: Record = { cloudwatch: CloudWatchIcon, confluence_v2: ConfluenceIcon, cursor_v2: CursorIcon, + dagster: DagsterIcon, databricks: DatabricksIcon, datadog: DatadogIcon, devin: DevinIcon, diff --git a/apps/sim/app/(landing)/integrations/data/integrations.json b/apps/sim/app/(landing)/integrations/data/integrations.json index 1aedf79474f..7108dacc096 100644 --- a/apps/sim/app/(landing)/integrations/data/integrations.json +++ b/apps/sim/app/(landing)/integrations/data/integrations.json @@ -2345,6 +2345,81 @@ "integrationType": "developer-tools", "tags": ["agentic", "automation"] }, + { + "type": "dagster", + "slug": "dagster", + "name": "Dagster", + "description": "Orchestrate data pipelines and manage job runs with Dagster", + "longDescription": "Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, terminate or delete runs, reexecute failed runs, fetch run logs, and manage schedules and sensors. API token only required for Dagster+.", + "bgColor": "#ffffff", + "iconName": "DagsterIcon", + "docsUrl": "https://docs.sim.ai/tools/dagster", + "operations": [ + { + "name": "Launch Run", + "description": "Launch a job run on a Dagster instance." + }, + { + "name": "Get Run", + "description": "Get the status and details of a Dagster run by its ID." + }, + { + "name": "Get Run Logs", + "description": "Fetch execution event logs for a Dagster run." + }, + { + "name": "List Runs", + "description": "List recent Dagster runs, optionally filtered by job name." + }, + { + "name": "List Jobs", + "description": "List all jobs across repositories in a Dagster instance." + }, + { + "name": "Reexecute Run", + "description": "Reexecute an existing Dagster run, optionally resuming only from failed steps." + }, + { + "name": "Terminate Run", + "description": "Terminate an in-progress Dagster run." + }, + { + "name": "Delete Run", + "description": "Permanently delete a Dagster run record." + }, + { + "name": "List Schedules", + "description": "List all schedules in a Dagster repository, optionally filtered by status." + }, + { + "name": "Start Schedule", + "description": "Enable (start) a schedule in a Dagster repository." + }, + { + "name": "Stop Schedule", + "description": "Disable (stop) a running schedule in Dagster." + }, + { + "name": "List Sensors", + "description": "List all sensors in a Dagster repository, optionally filtered by status." + }, + { + "name": "Start Sensor", + "description": "Enable (start) a sensor in a Dagster repository." + }, + { + "name": "Stop Sensor", + "description": "Disable (stop) a running sensor in Dagster." + } + ], + "operationCount": 14, + "triggers": [], + "triggerCount": 0, + "authType": "api-key", + "category": "tools", + "integrationType": "automation", + "tags": ["data-analytics", "automation"] + }, { "type": "databricks", "slug": "databricks", diff --git a/apps/sim/blocks/blocks/dagster.ts b/apps/sim/blocks/blocks/dagster.ts new file mode 100644 index 00000000000..4121950b2e9 --- /dev/null +++ b/apps/sim/blocks/blocks/dagster.ts @@ -0,0 +1,423 @@ +import { DagsterIcon } from '@/components/icons' +import type { BlockConfig } from '@/blocks/types' +import { IntegrationType } from '@/blocks/types' +import type { DagsterResponse } from '@/tools/dagster/types' + +export const DagsterBlock: BlockConfig = { + type: 'dagster', + name: 'Dagster', + description: 'Orchestrate data pipelines and manage job runs with Dagster', + longDescription: + 'Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, terminate or delete runs, reexecute failed runs, fetch run logs, and manage schedules and sensors. API token only required for Dagster+.', + docsLink: 'https://docs.sim.ai/tools/dagster', + category: 'tools', + integrationType: IntegrationType.Automation, + tags: ['data-analytics', 'automation'], + bgColor: '#ffffff', + icon: DagsterIcon, + + subBlocks: [ + // ── Operation selector ───────────────────────────────────────────────────── + { + id: 'operation', + title: 'Operation', + type: 'dropdown', + options: [ + { label: 'Launch Run', id: 'launch_run' }, + { label: 'Get Run', id: 'get_run' }, + { label: 'Get Run Logs', id: 'get_run_logs' }, + { label: 'List Runs', id: 'list_runs' }, + { label: 'List Jobs', id: 'list_jobs' }, + { label: 'Reexecute Run', id: 'reexecute_run' }, + { label: 'Terminate Run', id: 'terminate_run' }, + { label: 'Delete Run', id: 'delete_run' }, + { label: 'List Schedules', id: 'list_schedules' }, + { label: 'Start Schedule', id: 'start_schedule' }, + { label: 'Stop Schedule', id: 'stop_schedule' }, + { label: 'List Sensors', id: 'list_sensors' }, + { label: 'Start Sensor', id: 'start_sensor' }, + { label: 'Stop Sensor', id: 'stop_sensor' }, + ], + value: () => 'launch_run', + }, + + // ── Repository selectors (launch_run + schedule/sensor operations) ───────── + { + id: 'repositoryLocationName', + title: 'Repository Location', + type: 'short-input', + placeholder: 'e.g., my_code_location', + condition: { + field: 'operation', + value: ['launch_run', 'list_schedules', 'start_schedule', 'list_sensors', 'start_sensor'], + }, + required: { + field: 'operation', + value: ['launch_run', 'list_schedules', 'start_schedule', 'list_sensors', 'start_sensor'], + }, + }, + { + id: 'repositoryName', + title: 'Repository Name', + type: 'short-input', + placeholder: 'e.g., __repository__', + condition: { + field: 'operation', + value: ['launch_run', 'list_schedules', 'start_schedule', 'list_sensors', 'start_sensor'], + }, + required: { + field: 'operation', + value: ['launch_run', 'list_schedules', 'start_schedule', 'list_sensors', 'start_sensor'], + }, + }, + + // ── Launch Run ───────────────────────────────────────────────────────────── + { + id: 'jobName', + title: 'Job Name', + type: 'short-input', + placeholder: 'e.g., my_pipeline_job', + condition: { field: 'operation', value: 'launch_run' }, + required: { field: 'operation', value: 'launch_run' }, + }, + { + id: 'runConfigJson', + title: 'Run Config', + type: 'code', + placeholder: '{"ops": {"my_op": {"config": {"key": "value"}}}}', + condition: { field: 'operation', value: 'launch_run' }, + mode: 'advanced', + wandConfig: { + enabled: true, + prompt: `Generate a Dagster run config JSON object based on the user's description. + +Examples: +- "set partition date to 2024-01-15" -> {"ops": {"load_partition": {"config": {"partition_date": "2024-01-15"}}}} +- "run with debug logging" -> {"execution": {"multiprocess": {"config": {"max_concurrent": 1}}}} + +Return ONLY a valid JSON object - no explanations, no extra text.`, + placeholder: 'Describe the run configuration...', + generationType: 'json-object', + }, + }, + { + id: 'tags', + title: 'Tags', + type: 'code', + placeholder: '[{"key": "env", "value": "prod"}]', + condition: { field: 'operation', value: 'launch_run' }, + mode: 'advanced', + wandConfig: { + enabled: true, + prompt: `Generate a Dagster execution tags JSON array based on the user's description. + +Format: [{"key": "string", "value": "string"}, ...] + +Examples: +- "tag env as prod" -> [{"key": "env", "value": "prod"}] +- "mark as nightly run owned by data team" -> [{"key": "schedule", "value": "nightly"}, {"key": "owner", "value": "data-team"}] + +Return ONLY a valid JSON array - no explanations, no extra text.`, + placeholder: 'Describe the tags to attach to this run...', + generationType: 'json-object', + }, + }, + + // ── Run ID (shared: get_run, get_run_logs, terminate_run, delete_run, reexecute_run) ── + { + id: 'runId', + title: 'Run ID', + type: 'short-input', + placeholder: 'e.g., abc123def456', + condition: { + field: 'operation', + value: ['get_run', 'get_run_logs', 'terminate_run', 'delete_run', 'reexecute_run'], + }, + required: { + field: 'operation', + value: ['get_run', 'get_run_logs', 'terminate_run', 'delete_run', 'reexecute_run'], + }, + }, + + // ── Reexecute Run ────────────────────────────────────────────────────────── + { + id: 'strategy', + title: 'Reexecution Strategy', + type: 'dropdown', + options: [ + { label: 'All Steps', id: 'ALL_STEPS' }, + { label: 'From Failure', id: 'FROM_FAILURE' }, + { label: 'From Asset Failure', id: 'FROM_ASSET_FAILURE' }, + ], + value: () => 'ALL_STEPS', + condition: { field: 'operation', value: 'reexecute_run' }, + required: { field: 'operation', value: 'reexecute_run' }, + }, + + // ── Get Run Logs ─────────────────────────────────────────────────────────── + { + id: 'afterCursor', + title: 'After Cursor', + type: 'short-input', + placeholder: 'Cursor from a previous get_run_logs response (for pagination)', + condition: { field: 'operation', value: 'get_run_logs' }, + mode: 'advanced', + }, + { + id: 'logsLimit', + title: 'Limit', + type: 'short-input', + placeholder: '100', + condition: { field: 'operation', value: 'get_run_logs' }, + mode: 'advanced', + }, + + // ── List Runs ────────────────────────────────────────────────────────────── + { + id: 'listRunsJobName', + title: 'Job Name Filter', + type: 'short-input', + placeholder: 'Filter by job name (optional)', + condition: { field: 'operation', value: 'list_runs' }, + }, + { + id: 'statuses', + title: 'Status Filter', + type: 'short-input', + placeholder: 'e.g. SUCCESS,FAILURE (optional)', + condition: { field: 'operation', value: 'list_runs' }, + mode: 'advanced', + wandConfig: { + enabled: true, + prompt: `Generate a comma-separated list of Dagster run statuses to filter by. + +Valid statuses: QUEUED, NOT_STARTED, STARTING, MANAGED, STARTED, SUCCESS, FAILURE, CANCELING, CANCELED + +Examples: +- "only failed runs" -> FAILURE +- "completed runs (success or failure)" -> SUCCESS,FAILURE +- "runs in progress" -> QUEUED,NOT_STARTED,STARTING,STARTED + +Return ONLY the comma-separated status values - no explanations, no extra text.`, + placeholder: 'Describe which run statuses to include...', + }, + }, + { + id: 'limit', + title: 'Limit', + type: 'short-input', + placeholder: '20', + condition: { field: 'operation', value: 'list_runs' }, + mode: 'advanced', + }, + + // ── Schedule operations ──────────────────────────────────────────────────── + { + id: 'scheduleName', + title: 'Schedule Name', + type: 'short-input', + placeholder: 'e.g., my_daily_schedule', + condition: { field: 'operation', value: 'start_schedule' }, + required: { field: 'operation', value: 'start_schedule' }, + }, + { + id: 'scheduleStatus', + title: 'Status Filter', + type: 'dropdown', + options: [ + { label: 'All', id: '' }, + { label: 'Running', id: 'RUNNING' }, + { label: 'Stopped', id: 'STOPPED' }, + ], + value: () => '', + condition: { field: 'operation', value: 'list_schedules' }, + mode: 'advanced', + }, + + // ── Sensor operations ────────────────────────────────────────────────────── + { + id: 'sensorName', + title: 'Sensor Name', + type: 'short-input', + placeholder: 'e.g., my_asset_sensor', + condition: { field: 'operation', value: 'start_sensor' }, + required: { field: 'operation', value: 'start_sensor' }, + }, + { + id: 'sensorStatus', + title: 'Status Filter', + type: 'dropdown', + options: [ + { label: 'All', id: '' }, + { label: 'Running', id: 'RUNNING' }, + { label: 'Stopped', id: 'STOPPED' }, + ], + value: () => '', + condition: { field: 'operation', value: 'list_sensors' }, + mode: 'advanced', + }, + + // ── Stop schedule / sensor (shared) ──────────────────────────────────────── + { + id: 'instigationStateId', + title: 'Instigator State ID', + type: 'short-input', + placeholder: 'ID from list_schedules or list_sensors output', + condition: { field: 'operation', value: ['stop_schedule', 'stop_sensor'] }, + required: { field: 'operation', value: ['stop_schedule', 'stop_sensor'] }, + }, + + // ── Connection (common to all operations) ────────────────────────────────── + { + id: 'host', + title: 'Host', + type: 'short-input', + placeholder: 'http://localhost:3001 or https://myorg.dagster.cloud/prod', + required: true, + }, + { + id: 'apiKey', + title: 'API Token', + type: 'short-input', + placeholder: 'Dagster+ API token (leave blank for OSS / self-hosted)', + password: true, + }, + ], + + tools: { + access: [ + 'dagster_launch_run', + 'dagster_get_run', + 'dagster_get_run_logs', + 'dagster_list_runs', + 'dagster_list_jobs', + 'dagster_reexecute_run', + 'dagster_terminate_run', + 'dagster_delete_run', + 'dagster_list_schedules', + 'dagster_start_schedule', + 'dagster_stop_schedule', + 'dagster_list_sensors', + 'dagster_start_sensor', + 'dagster_stop_sensor', + ], + config: { + tool: (params) => `dagster_${params.operation}`, + params: (params) => { + const result: Record = {} + + // list_runs: type-coerce limit and remap job name filter + if (params.operation === 'list_runs') { + if (params.limit != null && params.limit !== '') result.limit = Number(params.limit) + result.jobName = params.listRunsJobName || undefined + } + + // get_run_logs: remap logsLimit → limit + if (params.operation === 'get_run_logs') { + if (params.logsLimit != null && params.logsLimit !== '') + result.limit = Number(params.logsLimit) + } + + // reexecute_run: remap runId → parentRunId + if (params.operation === 'reexecute_run') { + if (params.runId) result.parentRunId = params.runId + } + + // list_schedules / list_sensors: drop empty status filter + if (params.operation === 'list_schedules' && !params.scheduleStatus) { + result.scheduleStatus = undefined + } + if (params.operation === 'list_sensors' && !params.sensorStatus) { + result.sensorStatus = undefined + } + + return result + }, + }, + }, + + inputs: { + operation: { type: 'string', description: 'Operation to perform' }, + host: { type: 'string', description: 'Dagster host URL' }, + apiKey: { + type: 'string', + description: 'Dagster Cloud API token (optional for self-hosted instances)', + }, + // Launch Run + repositoryLocationName: { type: 'string', description: 'Repository location name' }, + repositoryName: { type: 'string', description: 'Repository name' }, + jobName: { type: 'string', description: 'Job name to launch' }, + runConfigJson: { type: 'string', description: 'Run configuration as JSON' }, + tags: { type: 'string', description: 'Tags as JSON array of {key, value} objects' }, + // Run ID operations + runId: { type: 'string', description: 'Run ID' }, + // Reexecute Run + strategy: { + type: 'string', + description: 'Reexecution strategy (ALL_STEPS, FROM_FAILURE, FROM_ASSET_FAILURE)', + }, + // Get Run Logs + afterCursor: { type: 'string', description: 'Pagination cursor for run logs' }, + logsLimit: { type: 'number', description: 'Maximum log events to return' }, + // List Runs + listRunsJobName: { type: 'string', description: 'Filter list_runs by job name' }, + statuses: { type: 'string', description: 'Comma-separated run statuses to filter by' }, + limit: { type: 'number', description: 'Maximum results to return' }, + // Schedules + scheduleName: { type: 'string', description: 'Schedule name' }, + scheduleStatus: { + type: 'string', + description: 'Filter schedules by status (RUNNING or STOPPED)', + }, + // Sensors + sensorName: { type: 'string', description: 'Sensor name' }, + sensorStatus: { type: 'string', description: 'Filter sensors by status (RUNNING or STOPPED)' }, + // Stop schedule / sensor + instigationStateId: { type: 'string', description: 'InstigationState ID for stop operations' }, + }, + + outputs: { + // Launch Run / Reexecute Run / Delete Run / Get Run + runId: { type: 'string', description: 'Run ID' }, + // Get Run + jobName: { type: 'string', description: 'Job name the run belongs to' }, + status: { type: 'string', description: 'Run or schedule/sensor status' }, + startTime: { type: 'number', description: 'Run start time (Unix timestamp)' }, + endTime: { type: 'number', description: 'Run end time (Unix timestamp)' }, + runConfigYaml: { type: 'string', description: 'Run configuration as YAML' }, + tags: { type: 'json', description: 'Run tags as array of {key, value} objects' }, + // List Runs + runs: { + type: 'json', + description: 'List of runs (runId, jobName, status, tags, startTime, endTime)', + }, + // List Jobs + jobs: { type: 'json', description: 'List of jobs (name, repositoryName)' }, + // Terminate Run + success: { type: 'boolean', description: 'Whether termination succeeded' }, + message: { type: 'string', description: 'Termination status or error message' }, + // Get Run Logs + events: { + type: 'json', + description: 'Log events (type, message, timestamp, level, stepKey, eventType)', + }, + cursor: { type: 'string', description: 'Pagination cursor for the next page of logs' }, + hasMore: { + type: 'boolean', + description: 'Whether more log events are available beyond this page', + }, + // List Schedules + schedules: { + type: 'json', + description: + 'List of schedules (name, cronSchedule, jobName, status, id, description, executionTimezone)', + }, + // List Sensors + sensors: { + type: 'json', + description: 'List of sensors (name, sensorType, status, id, description)', + }, + // Start/Stop schedule or sensor + id: { type: 'string', description: 'Instigator state ID of the schedule or sensor' }, + }, +} diff --git a/apps/sim/blocks/registry.ts b/apps/sim/blocks/registry.ts index e78578b176b..d37f90edfcf 100644 --- a/apps/sim/blocks/registry.ts +++ b/apps/sim/blocks/registry.ts @@ -30,6 +30,7 @@ import { ConditionBlock } from '@/blocks/blocks/condition' import { ConfluenceBlock, ConfluenceV2Block } from '@/blocks/blocks/confluence' import { CredentialBlock } from '@/blocks/blocks/credential' import { CursorBlock, CursorV2Block } from '@/blocks/blocks/cursor' +import { DagsterBlock } from '@/blocks/blocks/dagster' import { DatabricksBlock } from '@/blocks/blocks/databricks' import { DatadogBlock } from '@/blocks/blocks/datadog' import { DevinBlock } from '@/blocks/blocks/devin' @@ -254,6 +255,7 @@ export const registry: Record = { confluence_v2: ConfluenceV2Block, cursor: CursorBlock, cursor_v2: CursorV2Block, + dagster: DagsterBlock, databricks: DatabricksBlock, datadog: DatadogBlock, devin: DevinBlock, diff --git a/apps/sim/components/icons.tsx b/apps/sim/components/icons.tsx index cbc3b5edd85..8b0ed3aabb1 100644 --- a/apps/sim/components/icons.tsx +++ b/apps/sim/components/icons.tsx @@ -4929,6 +4929,49 @@ export function SSHIcon(props: SVGProps) { ) } +export function DagsterIcon(props: SVGProps) { + return ( + + + + + + + + + + + + ) +} + export function DatabricksIcon(props: SVGProps) { return ( diff --git a/apps/sim/tools/dagster/delete_run.ts b/apps/sim/tools/dagster/delete_run.ts new file mode 100644 index 00000000000..202bebecff4 --- /dev/null +++ b/apps/sim/tools/dagster/delete_run.ts @@ -0,0 +1,95 @@ +import type { DagsterDeleteRunParams, DagsterDeleteRunResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface DeleteRunResult { + type: string + runId?: string + message?: string +} + +const DELETE_RUN_MUTATION = ` + mutation DeleteRun($runId: String!) { + deleteRun(runId: $runId) { + type: __typename + ... on DeletePipelineRunSuccess { + runId + } + ... on RunNotFoundError { + message + } + ... on UnauthorizedError { + message + } + ... on PythonError { + message + } + } + } +` + +export const deleteRunTool: ToolConfig = { + id: 'dagster_delete_run', + name: 'Dagster Delete Run', + description: 'Permanently delete a Dagster run record.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to delete', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: DELETE_RUN_MUTATION, + variables: { runId: params.runId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ deleteRun?: unknown }>(response) + + const result = data.data?.deleteRun as DeleteRunResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'DeletePipelineRunSuccess' && result.runId) { + return { + success: true, + output: { runId: result.runId }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Delete run failed')}`) + }, + + outputs: { + runId: { + type: 'string', + description: 'The ID of the deleted run', + }, + }, +} diff --git a/apps/sim/tools/dagster/get_run.ts b/apps/sim/tools/dagster/get_run.ts new file mode 100644 index 00000000000..ca22ae2dfba --- /dev/null +++ b/apps/sim/tools/dagster/get_run.ts @@ -0,0 +1,149 @@ +import type { DagsterGetRunParams, DagsterGetRunResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +/** Fields selected on `runOrError` when the union resolves to `Run`. */ +interface DagsterGetRunGraphqlRun { + runId: string + jobName: string | null + status: string + startTime: number | null + endTime: number | null + runConfigYaml: string | null + tags: Array<{ key: string; value: string }> | null +} + +const GET_RUN_QUERY = ` + query GetRun($runId: ID!) { + runOrError(runId: $runId) { + ... on Run { + runId + jobName + status + startTime + endTime + runConfigYaml + tags { + key + value + } + } + ... on RunNotFoundError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } +` + +export const getRunTool: ToolConfig = { + id: 'dagster_get_run', + name: 'Dagster Get Run', + description: 'Get the status and details of a Dagster run by its ID.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3000)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to retrieve', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: GET_RUN_QUERY, + variables: { runId: params.runId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ runOrError?: unknown }>(response) + + const raw = data.data?.runOrError + if (!raw || typeof raw !== 'object') throw new Error('Unexpected response from Dagster') + + if (!('runId' in raw) || typeof (raw as { runId: unknown }).runId !== 'string') { + throw new Error( + dagsterUnionErrorMessage(raw as { message?: string }, 'Run not found or Dagster error') + ) + } + + const run = raw as DagsterGetRunGraphqlRun + + return { + success: true, + output: { + runId: run.runId, + jobName: run.jobName ?? null, + status: run.status, + startTime: run.startTime ?? null, + endTime: run.endTime ?? null, + runConfigYaml: run.runConfigYaml ?? null, + tags: run.tags ?? null, + }, + } + }, + + outputs: { + runId: { + type: 'string', + description: 'Run ID', + }, + jobName: { + type: 'string', + description: 'Name of the job this run belongs to', + optional: true, + }, + status: { + type: 'string', + description: + 'Run status (QUEUED, NOT_STARTED, STARTING, MANAGED, STARTED, SUCCESS, FAILURE, CANCELING, CANCELED)', + }, + startTime: { + type: 'number', + description: 'Run start time as Unix timestamp', + optional: true, + }, + endTime: { + type: 'number', + description: 'Run end time as Unix timestamp', + optional: true, + }, + runConfigYaml: { + type: 'string', + description: 'Run configuration as YAML', + optional: true, + }, + tags: { + type: 'json', + description: 'Run tags as array of {key, value} objects', + optional: true, + }, + }, +} diff --git a/apps/sim/tools/dagster/get_run_logs.ts b/apps/sim/tools/dagster/get_run_logs.ts new file mode 100644 index 00000000000..accbc24038f --- /dev/null +++ b/apps/sim/tools/dagster/get_run_logs.ts @@ -0,0 +1,166 @@ +import type { DagsterGetRunLogsParams, DagsterGetRunLogsResponse } from '@/tools/dagster/types' +import { parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface DagsterRunEvent { + __typename?: string + message?: string + timestamp?: string + level?: string + stepKey?: string | null + eventType?: string | null +} + +interface DagsterEventConnection { + events?: DagsterRunEvent[] + cursor?: string + hasMore?: boolean +} + +const GET_RUN_LOGS_QUERY = ` + query GetRunLogs($runId: ID!, $afterCursor: String, $limit: Int) { + logsForRun(runId: $runId, afterCursor: $afterCursor, limit: $limit) { + ... on EventConnection { + events { + __typename + ... on MessageEvent { + message + timestamp + level + stepKey + eventType + } + } + cursor + hasMore + } + ... on RunNotFoundError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } +` + +export const getRunLogsTool: ToolConfig = { + id: 'dagster_get_run_logs', + name: 'Dagster Get Run Logs', + description: 'Fetch execution event logs for a Dagster run.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to fetch logs for', + }, + afterCursor: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Cursor for paginating through log events (from a previous response)', + }, + limit: { + type: 'number', + required: false, + visibility: 'user-or-llm', + description: 'Maximum number of log events to return', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const variables: Record = { runId: params.runId } + if (params.afterCursor) variables.afterCursor = params.afterCursor + if (params.limit != null) variables.limit = params.limit + return { query: GET_RUN_LOGS_QUERY, variables } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ logsForRun?: unknown }>(response) + + const result = data.data?.logsForRun as + | DagsterEventConnection + | { message?: string } + | undefined + if (!result || typeof result !== 'object') throw new Error('Unexpected response from Dagster') + + if (!('events' in result)) { + const errResult = result as { message?: string } + throw new Error(errResult.message ?? 'Failed to fetch run logs') + } + + const conn = result as DagsterEventConnection + const events = (conn.events ?? []).map((e) => ({ + type: e.__typename ?? 'Unknown', + message: e.message ?? '', + timestamp: e.timestamp ?? '', + level: e.level ?? 'INFO', + stepKey: e.stepKey ?? null, + eventType: e.eventType ?? null, + })) + + return { + success: true, + output: { + events, + cursor: conn.cursor ?? null, + hasMore: conn.hasMore ?? false, + }, + } + }, + + outputs: { + events: { + type: 'json', + description: 'Array of log events (type, message, timestamp, level, stepKey, eventType)', + properties: { + type: { type: 'string', description: 'GraphQL typename of the event' }, + message: { type: 'string', description: 'Human-readable log message' }, + timestamp: { type: 'string', description: 'Event timestamp as a Unix epoch string' }, + level: { type: 'string', description: 'Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)' }, + stepKey: { + type: 'string', + description: 'Step key, if the event is step-scoped', + optional: true, + }, + eventType: { type: 'string', description: 'Dagster event type enum value', optional: true }, + }, + }, + cursor: { + type: 'string', + description: 'Cursor for fetching the next page of log events', + optional: true, + }, + hasMore: { + type: 'boolean', + description: 'Whether more log events are available beyond this page', + }, + }, +} diff --git a/apps/sim/tools/dagster/index.ts b/apps/sim/tools/dagster/index.ts new file mode 100644 index 00000000000..d189f4a4ae0 --- /dev/null +++ b/apps/sim/tools/dagster/index.ts @@ -0,0 +1,31 @@ +import { deleteRunTool } from '@/tools/dagster/delete_run' +import { getRunTool } from '@/tools/dagster/get_run' +import { getRunLogsTool } from '@/tools/dagster/get_run_logs' +import { launchRunTool } from '@/tools/dagster/launch_run' +import { listJobsTool } from '@/tools/dagster/list_jobs' +import { listRunsTool } from '@/tools/dagster/list_runs' +import { listSchedulesTool } from '@/tools/dagster/list_schedules' +import { listSensorsTool } from '@/tools/dagster/list_sensors' +import { reexecuteRunTool } from '@/tools/dagster/reexecute_run' +import { startScheduleTool } from '@/tools/dagster/start_schedule' +import { startSensorTool } from '@/tools/dagster/start_sensor' +import { stopScheduleTool } from '@/tools/dagster/stop_schedule' +import { stopSensorTool } from '@/tools/dagster/stop_sensor' +import { terminateRunTool } from '@/tools/dagster/terminate_run' + +export const dagsterLaunchRunTool = launchRunTool +export const dagsterGetRunTool = getRunTool +export const dagsterListRunsTool = listRunsTool +export const dagsterListJobsTool = listJobsTool +export const dagsterTerminateRunTool = terminateRunTool +export const dagsterGetRunLogsTool = getRunLogsTool +export const dagsterReexecuteRunTool = reexecuteRunTool +export const dagsterDeleteRunTool = deleteRunTool +export const dagsterListSchedulesTool = listSchedulesTool +export const dagsterStartScheduleTool = startScheduleTool +export const dagsterStopScheduleTool = stopScheduleTool +export const dagsterListSensorsTool = listSensorsTool +export const dagsterStartSensorTool = startSensorTool +export const dagsterStopSensorTool = stopSensorTool + +export * from './types' diff --git a/apps/sim/tools/dagster/launch_run.ts b/apps/sim/tools/dagster/launch_run.ts new file mode 100644 index 00000000000..c71cb17d1d3 --- /dev/null +++ b/apps/sim/tools/dagster/launch_run.ts @@ -0,0 +1,222 @@ +import type { DagsterLaunchRunParams, DagsterLaunchRunResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface LaunchRunResult { + type: string + run?: { runId: string } + message?: string + /** Present when type === 'InvalidStepError' */ + invalidStepKey?: string + /** Present when type === 'InvalidOutputError' */ + stepKey?: string + invalidOutputName?: string + /** Present when type === 'RunConfigValidationInvalid' */ + errors?: Array<{ message: string }> +} + +function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { + const varDefs = [ + '$repositoryLocationName: String!', + '$repositoryName: String!', + '$jobName: String!', + ] + if (hasConfig) varDefs.push('$runConfigData: RunConfigData') + if (hasTags) varDefs.push('$tags: [ExecutionTag!]') + + const execParams = [ + `selector: { + repositoryLocationName: $repositoryLocationName + repositoryName: $repositoryName + jobName: $jobName + }`, + ] + if (hasConfig) execParams.push('runConfigData: $runConfigData') + if (hasTags) execParams.push('executionMetadata: { tags: $tags }') + + return ` + mutation LaunchRun(${varDefs.join(', ')}) { + launchRun( + executionParams: { + ${execParams.join('\n ')} + } + ) { + type: __typename + ... on LaunchRunSuccess { + run { + runId + } + } + ... on InvalidStepError { + __typename + invalidStepKey + } + ... on InvalidOutputError { + __typename + stepKey + invalidOutputName + } + ... on RunConfigValidationInvalid { + errors { + message + } + } + ... on PipelineNotFoundError { + message + } + ... on RunConflict { + message + } + ... on UnauthorizedError { + message + } + ... on InvalidSubsetError { + message + } + ... on PresetNotFoundError { + message + } + ... on ConflictingExecutionParamsError { + message + } + ... on NoModeProvidedError { + message + } + ... on PythonError { + message + } + } + } + ` +} + +export const launchRunTool: ToolConfig = { + id: 'dagster_launch_run', + name: 'Dagster Launch Run', + description: 'Launch a job run on a Dagster instance.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3000)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + jobName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Name of the job to launch', + }, + runConfigJson: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Run configuration as a JSON object (optional)', + }, + tags: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Tags as a JSON array of {key, value} objects (optional)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const variables: Record = { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + jobName: params.jobName, + } + + let hasConfig = false + if (params.runConfigJson) { + try { + variables.runConfigData = JSON.parse(params.runConfigJson) + hasConfig = true + } catch { + throw new Error('Invalid JSON in runConfigJson') + } + } + + let hasTags = false + if (params.tags) { + try { + variables.tags = JSON.parse(params.tags) + hasTags = true + } catch { + throw new Error('Invalid JSON in tags') + } + } + + return { + query: buildLaunchRunMutation(hasConfig, hasTags), + variables, + } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ launchRun?: unknown }>(response) + + const result = data.data?.launchRun as LaunchRunResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'LaunchRunSuccess' && result.run) { + return { + success: true, + output: { runId: result.run.runId }, + } + } + + if (result.type === 'InvalidStepError' && result.invalidStepKey) { + throw new Error(`InvalidStepError: invalid step key "${result.invalidStepKey}"`) + } + if (result.type === 'InvalidOutputError' && result.stepKey) { + throw new Error( + `InvalidOutputError: invalid output "${result.invalidOutputName ?? 'unknown'}" on step "${result.stepKey}"` + ) + } + if (result.type === 'RunConfigValidationInvalid' && result.errors?.length) { + throw new Error( + `RunConfigValidationInvalid: ${result.errors.map((e) => e.message).join('; ')}` + ) + } + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Launch run failed')}`) + }, + + outputs: { + runId: { + type: 'string', + description: 'The globally unique ID of the launched run', + }, + }, +} diff --git a/apps/sim/tools/dagster/list_jobs.ts b/apps/sim/tools/dagster/list_jobs.ts new file mode 100644 index 00000000000..87df1cdae47 --- /dev/null +++ b/apps/sim/tools/dagster/list_jobs.ts @@ -0,0 +1,103 @@ +import type { DagsterBaseParams, DagsterListJobsResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +const LIST_JOBS_QUERY = ` + query ListJobNames { + repositoriesOrError { + ... on RepositoryConnection { + nodes { + name + jobs { + name + } + } + } + ... on RepositoryNotFoundError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } +` + +export const listJobsTool: ToolConfig = { + id: 'dagster_list_jobs', + name: 'Dagster List Jobs', + description: 'List all jobs across repositories in a Dagster instance.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: () => ({ + query: LIST_JOBS_QUERY, + variables: {}, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ repositoriesOrError?: unknown }>(response) + + const result = data.data?.repositoriesOrError as + | { nodes?: Array<{ name: string; jobs?: Array<{ name: string }> }>; message?: string } + | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (!Array.isArray(result.nodes)) { + throw new Error(dagsterUnionErrorMessage(result, 'List jobs failed')) + } + + const jobs: Array<{ name: string; repositoryName: string }> = [] + + for (const repo of result.nodes) { + for (const job of repo.jobs ?? []) { + jobs.push({ + name: job.name, + repositoryName: repo.name, + }) + } + } + + return { + success: true, + output: { jobs }, + } + }, + + outputs: { + jobs: { + type: 'json', + description: 'Array of jobs with name and repositoryName', + properties: { + name: { type: 'string', description: 'Job name' }, + repositoryName: { type: 'string', description: 'Repository name' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/list_runs.ts b/apps/sim/tools/dagster/list_runs.ts new file mode 100644 index 00000000000..49bb06f927c --- /dev/null +++ b/apps/sim/tools/dagster/list_runs.ts @@ -0,0 +1,155 @@ +import type { DagsterListRunsParams, DagsterListRunsResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +/** Shape of each run in the `runsOrError` → `Runs.results` GraphQL selection set. */ +interface DagsterListRunsGraphqlRow { + runId: string + jobName: string | null + status: string + tags: Array<{ key: string; value: string }> | null + startTime: number | null + endTime: number | null +} + +function buildListRunsQuery(hasFilter: boolean) { + return ` + query ListRuns($limit: Int${hasFilter ? ', $filter: RunsFilter' : ''}) { + runsOrError(limit: $limit${hasFilter ? ', filter: $filter' : ''}) { + ... on Runs { + results { + runId + jobName + status + tags { + key + value + } + startTime + endTime + } + } + ... on InvalidPipelineRunsFilterError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } + ` +} + +export const listRunsTool: ToolConfig = { + id: 'dagster_list_runs', + name: 'Dagster List Runs', + description: 'List recent Dagster runs, optionally filtered by job name.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + jobName: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter runs by job name (optional)', + }, + statuses: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Comma-separated run statuses to filter by, e.g. "SUCCESS,FAILURE" (optional)', + }, + limit: { + type: 'number', + required: false, + visibility: 'user-or-llm', + description: 'Maximum number of runs to return (default 20)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const filter: Record = {} + if (params.jobName) filter.pipelineName = params.jobName + if (params.statuses) { + filter.statuses = params.statuses + .split(',') + .map((s: string) => s.trim()) + .filter(Boolean) + } + + const hasFilter = Object.keys(filter).length > 0 + const variables: Record = { limit: params.limit || 20 } + if (hasFilter) variables.filter = filter + + return { + query: buildListRunsQuery(hasFilter), + variables, + } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ runsOrError?: unknown }>(response) + + const result = data.data?.runsOrError as + | { results?: DagsterListRunsGraphqlRow[]; message?: string } + | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (!Array.isArray(result.results)) { + throw new Error(dagsterUnionErrorMessage(result, 'Dagster returned an error listing runs')) + } + + const runs = result.results.map((r: DagsterListRunsGraphqlRow) => ({ + runId: r.runId, + jobName: r.jobName ?? null, + status: r.status, + tags: r.tags ?? null, + startTime: r.startTime ?? null, + endTime: r.endTime ?? null, + })) + + return { + success: true, + output: { runs }, + } + }, + + outputs: { + runs: { + type: 'json', + description: 'Array of runs', + properties: { + runId: { type: 'string', description: 'Run ID' }, + jobName: { type: 'string', description: 'Job name' }, + status: { type: 'string', description: 'Run status' }, + tags: { type: 'json', description: 'Run tags as array of {key, value} objects' }, + startTime: { type: 'number', description: 'Start time as Unix timestamp' }, + endTime: { type: 'number', description: 'End time as Unix timestamp' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/list_schedules.ts b/apps/sim/tools/dagster/list_schedules.ts new file mode 100644 index 00000000000..ec3dd30aea8 --- /dev/null +++ b/apps/sim/tools/dagster/list_schedules.ts @@ -0,0 +1,161 @@ +import type { + DagsterListSchedulesParams, + DagsterListSchedulesResponse, +} from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface DagsterScheduleGraphql { + name: string + cronSchedule: string | null + pipelineName: string | null + description: string | null + executionTimezone: string | null + scheduleState?: { + id: string + status: string + } | null +} + +function buildListSchedulesQuery(hasStatus: boolean) { + return ` + query ListSchedules($repositorySelector: RepositorySelector!${hasStatus ? ', $scheduleStatus: InstigationStatus' : ''}) { + schedulesOrError(repositorySelector: $repositorySelector${hasStatus ? ', scheduleStatus: $scheduleStatus' : ''}) { + ... on Schedules { + results { + name + cronSchedule + pipelineName + description + executionTimezone + scheduleState { + id + status + } + } + } + ... on RepositoryNotFoundError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } + ` +} + +export const listSchedulesTool: ToolConfig< + DagsterListSchedulesParams, + DagsterListSchedulesResponse +> = { + id: 'dagster_list_schedules', + name: 'Dagster List Schedules', + description: 'List all schedules in a Dagster repository, optionally filtered by status.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + scheduleStatus: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter schedules by status: RUNNING or STOPPED (omit to return all)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const hasStatus = Boolean(params.scheduleStatus) + const variables: Record = { + repositorySelector: { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + }, + } + if (hasStatus) variables.scheduleStatus = params.scheduleStatus + return { query: buildListSchedulesQuery(hasStatus), variables } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ schedulesOrError?: unknown }>(response) + + const result = data.data?.schedulesOrError as + | { results?: DagsterScheduleGraphql[]; message?: string } + | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (!Array.isArray(result.results)) { + throw new Error(dagsterUnionErrorMessage(result, 'List schedules failed')) + } + + const schedules = result.results.map((s) => ({ + name: s.name, + cronSchedule: s.cronSchedule ?? null, + jobName: s.pipelineName ?? null, + status: s.scheduleState?.status ?? 'UNKNOWN', + id: s.scheduleState?.id ?? null, + description: s.description ?? null, + executionTimezone: s.executionTimezone ?? null, + })) + + return { + success: true, + output: { schedules }, + } + }, + + outputs: { + schedules: { + type: 'json', + description: + 'Array of schedules (name, cronSchedule, jobName, status, id, description, executionTimezone)', + properties: { + name: { type: 'string', description: 'Schedule name' }, + cronSchedule: { type: 'string', description: 'Cron expression for the schedule' }, + jobName: { type: 'string', description: 'Job the schedule targets' }, + status: { type: 'string', description: 'Schedule status: RUNNING or STOPPED' }, + id: { + type: 'string', + description: 'Instigator state ID — use this to start or stop the schedule', + }, + description: { type: 'string', description: 'Human-readable schedule description' }, + executionTimezone: { type: 'string', description: 'Timezone for cron evaluation' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/list_sensors.ts b/apps/sim/tools/dagster/list_sensors.ts new file mode 100644 index 00000000000..34372a6bdad --- /dev/null +++ b/apps/sim/tools/dagster/list_sensors.ts @@ -0,0 +1,150 @@ +import type { DagsterListSensorsParams, DagsterListSensorsResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface DagsterSensorGraphql { + name: string + sensorType: string | null + description: string | null + sensorState?: { + id: string + status: string + } | null +} + +function buildListSensorsQuery(hasStatus: boolean) { + return ` + query ListSensors($repositorySelector: RepositorySelector!${hasStatus ? ', $sensorStatus: InstigationStatus' : ''}) { + sensorsOrError(repositorySelector: $repositorySelector${hasStatus ? ', sensorStatus: $sensorStatus' : ''}) { + ... on Sensors { + results { + name + sensorType + description + sensorState { + id + status + } + } + } + ... on RepositoryNotFoundError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } + ` +} + +export const listSensorsTool: ToolConfig = { + id: 'dagster_list_sensors', + name: 'Dagster List Sensors', + description: 'List all sensors in a Dagster repository, optionally filtered by status.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + sensorStatus: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter sensors by status: RUNNING or STOPPED (omit to return all)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const hasStatus = Boolean(params.sensorStatus) + const variables: Record = { + repositorySelector: { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + }, + } + if (hasStatus) variables.sensorStatus = params.sensorStatus + return { query: buildListSensorsQuery(hasStatus), variables } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ sensorsOrError?: unknown }>(response) + + const result = data.data?.sensorsOrError as + | { results?: DagsterSensorGraphql[]; message?: string } + | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (!Array.isArray(result.results)) { + throw new Error(dagsterUnionErrorMessage(result, 'List sensors failed')) + } + + const sensors = result.results.map((s) => ({ + name: s.name, + sensorType: s.sensorType ?? null, + status: s.sensorState?.status ?? 'UNKNOWN', + id: s.sensorState?.id ?? null, + description: s.description ?? null, + })) + + return { + success: true, + output: { sensors }, + } + }, + + outputs: { + sensors: { + type: 'json', + description: 'Array of sensors (name, sensorType, status, id, description)', + properties: { + name: { type: 'string', description: 'Sensor name' }, + sensorType: { + type: 'string', + description: + 'Sensor type (ASSET, AUTO_MATERIALIZE, FRESHNESS_POLICY, MULTI_ASSET, RUN_STATUS, STANDARD)', + }, + status: { type: 'string', description: 'Sensor status: RUNNING or STOPPED' }, + id: { + type: 'string', + description: 'Instigator state ID — use this to start or stop the sensor', + }, + description: { type: 'string', description: 'Human-readable sensor description' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/reexecute_run.ts b/apps/sim/tools/dagster/reexecute_run.ts new file mode 100644 index 00000000000..f3ce6d3a366 --- /dev/null +++ b/apps/sim/tools/dagster/reexecute_run.ts @@ -0,0 +1,158 @@ +import type { DagsterReexecuteRunParams, DagsterReexecuteRunResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface ReexecuteRunResult { + type: string + run?: { runId: string } + message?: string + /** Returned by InvalidStepError */ + invalidStepKey?: string + /** Returned by InvalidOutputError */ + invalidOutputName?: string + stepKey?: string + /** Returned by RunConfigValidationInvalid */ + errors?: Array<{ message: string }> +} + +const REEXECUTE_RUN_MUTATION = ` + mutation LaunchRunReexecution($parentRunId: String!, $strategy: ReexecutionStrategy!) { + launchRunReexecution( + reexecutionParams: { + parentRunId: $parentRunId + strategy: $strategy + } + ) { + type: __typename + ... on LaunchRunSuccess { + run { + runId + } + } + ... on InvalidStepError { + invalidStepKey + } + ... on InvalidOutputError { + stepKey + invalidOutputName + } + ... on RunConfigValidationInvalid { + errors { + message + } + } + ... on PipelineNotFoundError { + message + } + ... on RunConflict { + message + } + ... on UnauthorizedError { + message + } + ... on InvalidSubsetError { + message + } + ... on PresetNotFoundError { + message + } + ... on ConflictingExecutionParamsError { + message + } + ... on NoModeProvidedError { + message + } + ... on PythonError { + message + } + } + } +` + +export const reexecuteRunTool: ToolConfig = + { + id: 'dagster_reexecute_run', + name: 'Dagster Reexecute Run', + description: 'Reexecute an existing Dagster run, optionally resuming only from failed steps.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + parentRunId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to reexecute', + }, + strategy: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: + 'Reexecution strategy: ALL_STEPS reruns everything, FROM_FAILURE resumes from failed steps, FROM_ASSET_FAILURE resumes from failed assets', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: REEXECUTE_RUN_MUTATION, + variables: { + parentRunId: params.parentRunId, + strategy: params.strategy, + }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ launchRunReexecution?: unknown }>(response) + + const result = data.data?.launchRunReexecution as ReexecuteRunResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'LaunchRunSuccess' && result.run) { + return { + success: true, + output: { runId: result.run.runId }, + } + } + + let detail: string + if (result.type === 'InvalidStepError' && result.invalidStepKey) { + detail = `Invalid step key: ${result.invalidStepKey}` + } else if (result.type === 'InvalidOutputError' && result.invalidOutputName) { + detail = `Invalid output "${result.invalidOutputName}" on step "${result.stepKey}"` + } else if (result.type === 'RunConfigValidationInvalid' && result.errors?.length) { + detail = result.errors.map((e) => e.message).join('; ') + } else { + detail = dagsterUnionErrorMessage(result, 'Reexecute run failed') + } + + throw new Error(`${result.type}: ${detail}`) + }, + + outputs: { + runId: { + type: 'string', + description: 'The ID of the newly launched reexecution run', + }, + }, + } diff --git a/apps/sim/tools/dagster/start_schedule.ts b/apps/sim/tools/dagster/start_schedule.ts new file mode 100644 index 00000000000..83ea7fdd1e3 --- /dev/null +++ b/apps/sim/tools/dagster/start_schedule.ts @@ -0,0 +1,135 @@ +import type { + DagsterScheduleMutationResponse, + DagsterStartScheduleParams, +} from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface ScheduleMutationResult { + type: string + scheduleState?: { + id: string + status: string + } + message?: string +} + +const START_SCHEDULE_MUTATION = ` + mutation StartSchedule($scheduleSelector: ScheduleSelector!) { + startSchedule(scheduleSelector: $scheduleSelector) { + type: __typename + ... on ScheduleStateResult { + scheduleState { + id + status + } + } + ... on UnauthorizedError { + __typename + message + } + ... on ScheduleNotFoundError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } +` + +export const startScheduleTool: ToolConfig< + DagsterStartScheduleParams, + DagsterScheduleMutationResponse +> = { + id: 'dagster_start_schedule', + name: 'Dagster Start Schedule', + description: 'Enable (start) a schedule in a Dagster repository.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + scheduleName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Name of the schedule to start', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: START_SCHEDULE_MUTATION, + variables: { + scheduleSelector: { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + scheduleName: params.scheduleName, + }, + }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ startSchedule?: unknown }>(response) + + const result = data.data?.startSchedule as ScheduleMutationResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'ScheduleStateResult' && result.scheduleState) { + return { + success: true, + output: { + id: result.scheduleState.id, + status: result.scheduleState.status, + }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Start schedule failed')}`) + }, + + outputs: { + id: { + type: 'string', + description: 'Instigator state ID of the schedule', + }, + status: { + type: 'string', + description: 'Updated schedule status (RUNNING or STOPPED)', + }, + }, +} diff --git a/apps/sim/tools/dagster/start_sensor.ts b/apps/sim/tools/dagster/start_sensor.ts new file mode 100644 index 00000000000..66a07541dbd --- /dev/null +++ b/apps/sim/tools/dagster/start_sensor.ts @@ -0,0 +1,130 @@ +import type { DagsterSensorMutationResponse, DagsterStartSensorParams } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface SensorMutationResult { + type: string + sensorState?: { + id: string + status: string + } + message?: string +} + +const START_SENSOR_MUTATION = ` + mutation StartSensor($sensorSelector: SensorSelector!) { + startSensor(sensorSelector: $sensorSelector) { + type: __typename + ... on Sensor { + sensorState { + id + status + } + } + ... on SensorNotFoundError { + __typename + message + } + ... on UnauthorizedError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } +` + +export const startSensorTool: ToolConfig = + { + id: 'dagster_start_sensor', + name: 'Dagster Start Sensor', + description: 'Enable (start) a sensor in a Dagster repository.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + sensorName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Name of the sensor to start', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: START_SENSOR_MUTATION, + variables: { + sensorSelector: { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + sensorName: params.sensorName, + }, + }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ startSensor?: unknown }>(response) + + const result = data.data?.startSensor as SensorMutationResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'Sensor' && result.sensorState) { + return { + success: true, + output: { + id: result.sensorState.id, + status: result.sensorState.status, + }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Start sensor failed')}`) + }, + + outputs: { + id: { + type: 'string', + description: 'Instigator state ID of the sensor', + }, + status: { + type: 'string', + description: 'Updated sensor status (RUNNING or STOPPED)', + }, + }, + } diff --git a/apps/sim/tools/dagster/stop_schedule.ts b/apps/sim/tools/dagster/stop_schedule.ts new file mode 100644 index 00000000000..d6e89848ef2 --- /dev/null +++ b/apps/sim/tools/dagster/stop_schedule.ts @@ -0,0 +1,118 @@ +import type { + DagsterScheduleMutationResponse, + DagsterStopScheduleParams, +} from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface ScheduleMutationResult { + type: string + scheduleState?: { + id: string + status: string + } + message?: string +} + +const STOP_SCHEDULE_MUTATION = ` + mutation StopSchedule($id: String) { + stopRunningSchedule(id: $id) { + type: __typename + ... on ScheduleStateResult { + scheduleState { + id + status + } + } + ... on UnauthorizedError { + __typename + message + } + ... on ScheduleNotFoundError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } +` + +export const stopScheduleTool: ToolConfig< + DagsterStopScheduleParams, + DagsterScheduleMutationResponse +> = { + id: 'dagster_stop_schedule', + name: 'Dagster Stop Schedule', + description: 'Disable (stop) a running schedule in Dagster.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + instigationStateId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: + 'InstigationState ID of the schedule to stop — available from dagster_list_schedules output', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: STOP_SCHEDULE_MUTATION, + variables: { id: params.instigationStateId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ stopRunningSchedule?: unknown }>(response) + + const result = data.data?.stopRunningSchedule as ScheduleMutationResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'ScheduleStateResult' && result.scheduleState) { + return { + success: true, + output: { + id: result.scheduleState.id, + status: result.scheduleState.status, + }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Stop schedule failed')}`) + }, + + outputs: { + id: { + type: 'string', + description: 'Instigator state ID of the schedule', + }, + status: { + type: 'string', + description: 'Updated schedule status (RUNNING or STOPPED)', + }, + }, +} diff --git a/apps/sim/tools/dagster/stop_sensor.ts b/apps/sim/tools/dagster/stop_sensor.ts new file mode 100644 index 00000000000..cf31c26ac1e --- /dev/null +++ b/apps/sim/tools/dagster/stop_sensor.ts @@ -0,0 +1,108 @@ +import type { DagsterSensorMutationResponse, DagsterStopSensorParams } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface StopSensorResult { + type: string + instigationState?: { + id: string + status: string + } + message?: string +} + +const STOP_SENSOR_MUTATION = ` + mutation StopSensor($id: String) { + stopSensor(id: $id) { + type: __typename + ... on StopSensorMutationResult { + instigationState { + id + status + } + } + ... on UnauthorizedError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } +` + +export const stopSensorTool: ToolConfig = { + id: 'dagster_stop_sensor', + name: 'Dagster Stop Sensor', + description: 'Disable (stop) a running sensor in Dagster.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + instigationStateId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: + 'InstigationState ID of the sensor to stop — available from dagster_list_sensors output', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: STOP_SENSOR_MUTATION, + variables: { id: params.instigationStateId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ stopSensor?: unknown }>(response) + + const result = data.data?.stopSensor as StopSensorResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'StopSensorMutationResult' && result.instigationState) { + return { + success: true, + output: { + id: result.instigationState.id, + status: result.instigationState.status, + }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Stop sensor failed')}`) + }, + + outputs: { + id: { + type: 'string', + description: 'Instigator state ID of the sensor', + }, + status: { + type: 'string', + description: 'Updated sensor status (RUNNING or STOPPED)', + }, + }, +} diff --git a/apps/sim/tools/dagster/terminate_run.ts b/apps/sim/tools/dagster/terminate_run.ts new file mode 100644 index 00000000000..30f5ccb0473 --- /dev/null +++ b/apps/sim/tools/dagster/terminate_run.ts @@ -0,0 +1,126 @@ +import type { DagsterTerminateRunParams, DagsterTerminateRunResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +/** Fields returned from `terminateRun` for all union members. */ +interface DagsterTerminateRunPayload { + __typename?: string + run?: { runId: string } + message?: string +} + +const TERMINATE_RUN_MUTATION = ` + mutation TerminateRun($runId: String!) { + terminateRun(runId: $runId) { + __typename + ... on TerminateRunSuccess { + run { + runId + } + } + ... on TerminateRunFailure { + run { + runId + } + message + } + ... on RunNotFoundError { + message + } + ... on UnauthorizedError { + message + } + ... on PythonError { + message + } + } + } +` + +export const terminateRunTool: ToolConfig = + { + id: 'dagster_terminate_run', + name: 'Dagster Terminate Run', + description: 'Terminate an in-progress Dagster run.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to terminate', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: TERMINATE_RUN_MUTATION, + variables: { runId: params.runId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ terminateRun?: DagsterTerminateRunPayload }>( + response + ) + + const result = data.data?.terminateRun + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.__typename === 'TerminateRunSuccess' && result.run?.runId) { + return { + success: true, + output: { + success: true, + runId: result.run.runId, + message: null, + }, + } + } + + if (result.__typename === 'TerminateRunFailure') { + throw new Error( + `TerminateRunFailure: ${dagsterUnionErrorMessage(result, 'Terminate run failed')}` + ) + } + + throw new Error(dagsterUnionErrorMessage(result, 'Terminate run failed')) + }, + + outputs: { + success: { + type: 'boolean', + description: 'Whether the run was successfully terminated', + }, + runId: { + type: 'string', + description: 'The ID of the terminated run', + }, + message: { + type: 'string', + description: 'Error or status message if termination failed', + optional: true, + }, + }, + } diff --git a/apps/sim/tools/dagster/types.ts b/apps/sim/tools/dagster/types.ts new file mode 100644 index 00000000000..89fa8f6c2d5 --- /dev/null +++ b/apps/sim/tools/dagster/types.ts @@ -0,0 +1,204 @@ +import type { ToolResponse } from '@/tools/types' + +export interface DagsterBaseParams { + host: string + apiKey?: string +} + +export interface DagsterLaunchRunParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + jobName: string + runConfigJson?: string + tags?: string +} + +export interface DagsterLaunchRunResponse extends ToolResponse { + output: { + runId: string + } +} + +export interface DagsterGetRunParams extends DagsterBaseParams { + runId: string +} + +export interface DagsterGetRunResponse extends ToolResponse { + output: { + runId: string + jobName: string | null + status: string + startTime: number | null + endTime: number | null + runConfigYaml: string | null + tags: Array<{ key: string; value: string }> | null + } +} + +export interface DagsterListRunsParams extends DagsterBaseParams { + jobName?: string + statuses?: string + limit?: number +} + +export interface DagsterListRunsResponse extends ToolResponse { + output: { + runs: Array<{ + runId: string + jobName: string | null + status: string + tags: Array<{ key: string; value: string }> | null + startTime: number | null + endTime: number | null + }> + } +} + +export interface DagsterListJobsResponse extends ToolResponse { + output: { + jobs: Array<{ + name: string + repositoryName: string + }> + } +} + +export interface DagsterTerminateRunParams extends DagsterBaseParams { + runId: string +} + +export interface DagsterTerminateRunResponse extends ToolResponse { + output: { + success: boolean + runId: string + message: string | null + } +} + +export interface DagsterGetRunLogsParams extends DagsterBaseParams { + runId: string + afterCursor?: string + limit?: number +} + +export interface DagsterGetRunLogsResponse extends ToolResponse { + output: { + events: Array<{ + type: string + message: string + timestamp: string + level: string + stepKey: string | null + eventType: string | null + }> + cursor: string | null + hasMore: boolean + } +} + +export interface DagsterReexecuteRunParams extends DagsterBaseParams { + parentRunId: string + strategy: string +} + +export interface DagsterReexecuteRunResponse extends ToolResponse { + output: { + runId: string + } +} + +export interface DagsterDeleteRunParams extends DagsterBaseParams { + runId: string +} + +export interface DagsterDeleteRunResponse extends ToolResponse { + output: { + runId: string + } +} + +export interface DagsterListSchedulesParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + scheduleStatus?: string +} + +export interface DagsterListSchedulesResponse extends ToolResponse { + output: { + schedules: Array<{ + name: string + cronSchedule: string | null + jobName: string | null + status: string + id: string | null + description: string | null + executionTimezone: string | null + }> + } +} + +export interface DagsterStartScheduleParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + scheduleName: string +} + +export interface DagsterScheduleMutationResponse extends ToolResponse { + output: { + id: string + status: string + } +} + +export interface DagsterStopScheduleParams extends DagsterBaseParams { + instigationStateId: string +} + +export interface DagsterListSensorsParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + sensorStatus?: string +} + +export interface DagsterListSensorsResponse extends ToolResponse { + output: { + sensors: Array<{ + name: string + sensorType: string | null + status: string + id: string | null + description: string | null + }> + } +} + +export interface DagsterStartSensorParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + sensorName: string +} + +export interface DagsterSensorMutationResponse extends ToolResponse { + output: { + id: string + status: string + } +} + +export interface DagsterStopSensorParams extends DagsterBaseParams { + instigationStateId: string +} + +export type DagsterResponse = + | DagsterLaunchRunResponse + | DagsterGetRunResponse + | DagsterListRunsResponse + | DagsterListJobsResponse + | DagsterTerminateRunResponse + | DagsterGetRunLogsResponse + | DagsterReexecuteRunResponse + | DagsterDeleteRunResponse + | DagsterListSchedulesResponse + | DagsterScheduleMutationResponse + | DagsterListSensorsResponse + | DagsterSensorMutationResponse diff --git a/apps/sim/tools/dagster/utils.ts b/apps/sim/tools/dagster/utils.ts new file mode 100644 index 00000000000..b41fc135339 --- /dev/null +++ b/apps/sim/tools/dagster/utils.ts @@ -0,0 +1,41 @@ +/** + * Parses a Dagster GraphQL JSON body and throws if the HTTP status is not OK or the payload + * contains top-level GraphQL errors. + * + * Field errors should be requested with `... on Error { __typename message }` (or at least + * `message`) so union failures are not returned as empty objects. + */ +export async function parseDagsterGraphqlResponse>( + response: Response +): Promise<{ data?: TData }> { + let payload: { + data?: TData + errors?: ReadonlyArray<{ message?: string }> + } + try { + payload = (await response.json()) as { + data?: TData + errors?: ReadonlyArray<{ message?: string }> + } + } catch { + throw new Error('Invalid JSON response from Dagster') + } + if (!response.ok) { + throw new Error(payload.errors?.[0]?.message || 'Dagster GraphQL request failed') + } + if (payload.errors?.length) { + throw new Error(payload.errors[0]?.message ?? 'Dagster GraphQL request failed') + } + return { data: payload.data } +} + +/** + * Message from a field that includes `... on Error { message }`, or a fallback when the + * payload is not a GraphQL `Error` type with a string message. + */ +export function dagsterUnionErrorMessage( + result: { message?: string } | undefined, + fallback: string +): string { + return typeof result?.message === 'string' ? result.message : fallback +} diff --git a/apps/sim/tools/registry.ts b/apps/sim/tools/registry.ts index 22c04f0b797..85d08da46c5 100644 --- a/apps/sim/tools/registry.ts +++ b/apps/sim/tools/registry.ts @@ -361,6 +361,22 @@ import { cursorStopAgentTool, cursorStopAgentV2Tool, } from '@/tools/cursor' +import { + dagsterDeleteRunTool, + dagsterGetRunLogsTool, + dagsterGetRunTool, + dagsterLaunchRunTool, + dagsterListJobsTool, + dagsterListRunsTool, + dagsterListSchedulesTool, + dagsterListSensorsTool, + dagsterReexecuteRunTool, + dagsterStartScheduleTool, + dagsterStartSensorTool, + dagsterStopScheduleTool, + dagsterStopSensorTool, + dagsterTerminateRunTool, +} from '@/tools/dagster' import { databricksCancelRunTool, databricksExecuteSqlTool, @@ -3444,6 +3460,20 @@ export const tools: Record = { devin_get_session: devinGetSessionTool, devin_list_sessions: devinListSessionsTool, devin_send_message: devinSendMessageTool, + dagster_delete_run: dagsterDeleteRunTool, + dagster_get_run: dagsterGetRunTool, + dagster_get_run_logs: dagsterGetRunLogsTool, + dagster_launch_run: dagsterLaunchRunTool, + dagster_list_jobs: dagsterListJobsTool, + dagster_list_runs: dagsterListRunsTool, + dagster_list_schedules: dagsterListSchedulesTool, + dagster_list_sensors: dagsterListSensorsTool, + dagster_reexecute_run: dagsterReexecuteRunTool, + dagster_start_schedule: dagsterStartScheduleTool, + dagster_start_sensor: dagsterStartSensorTool, + dagster_stop_schedule: dagsterStopScheduleTool, + dagster_stop_sensor: dagsterStopSensorTool, + dagster_terminate_run: dagsterTerminateRunTool, databricks_cancel_run: databricksCancelRunTool, databricks_execute_sql: databricksExecuteSqlTool, databricks_get_run: databricksGetRunTool,