From 721d9df5c268c26b746b6371507832580754b436 Mon Sep 17 00:00:00 2001 From: abhinavDhulipala Date: Fri, 3 Apr 2026 16:18:49 -0700 Subject: [PATCH 01/12] feat(blocks): add dagster block --- apps/docs/components/icons.tsx | 15 ++ apps/docs/components/ui/icon-mapping.ts | 2 + apps/docs/content/docs/en/tools/dagster.mdx | 141 +++++++++++++ apps/docs/content/docs/en/tools/meta.json | 1 + .../integrations/data/icon-mapping.ts | 2 + .../integrations/data/integrations.json | 39 ++++ apps/sim/blocks/blocks/dagster.ts | 196 ++++++++++++++++++ apps/sim/blocks/registry.ts | 2 + apps/sim/components/icons.tsx | 71 +++++++ apps/sim/tools/dagster/get_run.ts | 135 ++++++++++++ apps/sim/tools/dagster/index.ts | 11 + apps/sim/tools/dagster/launch_run.ts | 174 ++++++++++++++++ apps/sim/tools/dagster/list_jobs.ts | 100 +++++++++ apps/sim/tools/dagster/list_runs.ts | 147 +++++++++++++ apps/sim/tools/dagster/terminate_run.ts | 125 +++++++++++ apps/sim/tools/dagster/types.ts | 92 ++++++++ apps/sim/tools/registry.ts | 12 ++ 17 files changed, 1265 insertions(+) create mode 100644 apps/docs/content/docs/en/tools/dagster.mdx create mode 100644 apps/sim/blocks/blocks/dagster.ts create mode 100644 apps/sim/tools/dagster/get_run.ts create mode 100644 apps/sim/tools/dagster/index.ts create mode 100644 apps/sim/tools/dagster/launch_run.ts create mode 100644 apps/sim/tools/dagster/list_jobs.ts create mode 100644 apps/sim/tools/dagster/list_runs.ts create mode 100644 apps/sim/tools/dagster/terminate_run.ts create mode 100644 apps/sim/tools/dagster/types.ts diff --git a/apps/docs/components/icons.tsx b/apps/docs/components/icons.tsx index cbc3b5edd85..2ed99b081de 100644 --- a/apps/docs/components/icons.tsx +++ b/apps/docs/components/icons.tsx @@ -4929,6 +4929,21 @@ export function SSHIcon(props: SVGProps) { ) } +export function DagsterIcon(props: SVGProps) { + return ( + + + + + ) +} + export function DatabricksIcon(props: SVGProps) { return ( diff --git a/apps/docs/components/ui/icon-mapping.ts b/apps/docs/components/ui/icon-mapping.ts index c65d55ed9d4..237e5941316 100644 --- a/apps/docs/components/ui/icon-mapping.ts +++ b/apps/docs/components/ui/icon-mapping.ts @@ -32,6 +32,7 @@ import { CloudWatchIcon, ConfluenceIcon, CursorIcon, + DagsterIcon, DatabricksIcon, DatadogIcon, DevinIcon, @@ -218,6 +219,7 @@ export const blockTypeToIconMap: Record = { cloudwatch: CloudWatchIcon, confluence_v2: ConfluenceIcon, cursor_v2: CursorIcon, + dagster: DagsterIcon, databricks: DatabricksIcon, datadog: DatadogIcon, devin: DevinIcon, diff --git a/apps/docs/content/docs/en/tools/dagster.mdx b/apps/docs/content/docs/en/tools/dagster.mdx new file mode 100644 index 00000000000..8c4123eb6a0 --- /dev/null +++ b/apps/docs/content/docs/en/tools/dagster.mdx @@ -0,0 +1,141 @@ +--- +title: Dagster +description: Orchestrate data pipelines and manage job runs on Dagster+ +--- + +import { BlockInfoCard } from "@/components/ui/block-info-card" + + + +## Usage Instructions + +Connect to Dagster+ to launch job runs, monitor run status, list available jobs across repositories, and terminate in-progress runs. Requires a Dagster Cloud API token. + + + +## Tools + +### `dagster_launch_run` + +Launch a Dagster job run in your Dagster+ deployment. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain, e.g., "myorg"\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `jobName` | string | Yes | Name of the job to launch | +| `runConfigJson` | string | No | Run configuration as a JSON object \(optional\) | +| `tags` | string | No | Tags as a JSON array of \{key, value\} objects \(optional\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | The globally unique ID of the launched run | + +### `dagster_get_run` + +Get the status and details of a Dagster run by its ID. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `runId` | string | Yes | The ID of the run to retrieve | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | Run ID | +| `jobName` | string | Name of the job this run belongs to | +| `status` | string | Run status \(QUEUED, NOT_STARTED, STARTING, MANAGED, STARTED, SUCCESS, FAILURE, CANCELING, CANCELED\) | +| `startTime` | number | Run start time as Unix timestamp | +| `endTime` | number | Run end time as Unix timestamp | +| `runConfigYaml` | string | Run configuration as YAML | +| `tags` | json | Run tags as array of \{key, value\} objects | + +### `dagster_list_runs` + +List recent Dagster runs, optionally filtered by job name. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `jobName` | string | No | Filter runs by job name \(optional\) | +| `limit` | number | No | Maximum number of runs to return \(default 20\) | +| `cursor` | string | No | Pagination cursor from a previous list_runs response | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runs` | json | Array of runs with runId, jobName, status, startTime, endTime | +| ↳ `runId` | string | Run ID | +| ↳ `jobName` | string | Job name | +| ↳ `status` | string | Run status | +| ↳ `startTime` | number | Start time as Unix timestamp | +| ↳ `endTime` | number | End time as Unix timestamp | +| `cursor` | string | Pagination cursor to retrieve the next page | +| `hasMore` | boolean | Whether more runs are available | + +### `dagster_list_jobs` + +List all jobs across repositories in a Dagster+ deployment. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `repositoryLocationName` | string | No | Filter by repository location name \(optional\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `jobs` | json | Array of jobs with name, repositoryName, repositoryLocationName, and description | +| ↳ `name` | string | Job name | +| ↳ `repositoryName` | string | Repository name | +| ↳ `repositoryLocationName` | string | Repository location name | +| ↳ `description` | string | Job description | + +### `dagster_terminate_run` + +Terminate an in-progress Dagster run. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | +| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | +| `apiKey` | string | Yes | Dagster Cloud API token | +| `runId` | string | Yes | The ID of the run to terminate | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `success` | boolean | Whether the run was successfully terminated | +| `runId` | string | The ID of the terminated run | +| `message` | string | Error or status message if termination failed | + + diff --git a/apps/docs/content/docs/en/tools/meta.json b/apps/docs/content/docs/en/tools/meta.json index a0f99bf6616..e413b867f71 100644 --- a/apps/docs/content/docs/en/tools/meta.json +++ b/apps/docs/content/docs/en/tools/meta.json @@ -27,6 +27,7 @@ "cloudwatch", "confluence", "cursor", + "dagster", "databricks", "datadog", "devin", diff --git a/apps/sim/app/(landing)/integrations/data/icon-mapping.ts b/apps/sim/app/(landing)/integrations/data/icon-mapping.ts index 503242d8c1e..898ee4287ee 100644 --- a/apps/sim/app/(landing)/integrations/data/icon-mapping.ts +++ b/apps/sim/app/(landing)/integrations/data/icon-mapping.ts @@ -32,6 +32,7 @@ import { CloudWatchIcon, ConfluenceIcon, CursorIcon, + DagsterIcon, DatabricksIcon, DatadogIcon, DevinIcon, @@ -218,6 +219,7 @@ export const blockTypeToIconMap: Record = { cloudwatch: CloudWatchIcon, confluence_v2: ConfluenceIcon, cursor_v2: CursorIcon, + dagster: DagsterIcon, databricks: DatabricksIcon, datadog: DatadogIcon, devin: DevinIcon, diff --git a/apps/sim/app/(landing)/integrations/data/integrations.json b/apps/sim/app/(landing)/integrations/data/integrations.json index 1aedf79474f..2f30fed8d05 100644 --- a/apps/sim/app/(landing)/integrations/data/integrations.json +++ b/apps/sim/app/(landing)/integrations/data/integrations.json @@ -2345,6 +2345,45 @@ "integrationType": "developer-tools", "tags": ["agentic", "automation"] }, + { + "type": "dagster", + "slug": "dagster", + "name": "Dagster", + "description": "Orchestrate data pipelines and manage job runs on Dagster+", + "longDescription": "Connect to Dagster+ to launch job runs, monitor run status, list available jobs across repositories, and terminate in-progress runs. Requires a Dagster Cloud API token.", + "bgColor": "#191A23", + "iconName": "DagsterIcon", + "docsUrl": "https://docs.sim.ai/tools/dagster", + "operations": [ + { + "name": "Launch Run", + "description": "Launch a Dagster job run in your Dagster+ deployment." + }, + { + "name": "Get Run", + "description": "Get the status and details of a Dagster run by its ID." + }, + { + "name": "List Runs", + "description": "List recent Dagster runs, optionally filtered by job name." + }, + { + "name": "List Jobs", + "description": "List all jobs across repositories in a Dagster+ deployment." + }, + { + "name": "Terminate Run", + "description": "Terminate an in-progress Dagster run." + } + ], + "operationCount": 5, + "triggers": [], + "triggerCount": 0, + "authType": "api-key", + "category": "tools", + "integrationType": "automation", + "tags": ["data-analytics", "cloud", "automation"] + }, { "type": "databricks", "slug": "databricks", diff --git a/apps/sim/blocks/blocks/dagster.ts b/apps/sim/blocks/blocks/dagster.ts new file mode 100644 index 00000000000..08b1be6f5e0 --- /dev/null +++ b/apps/sim/blocks/blocks/dagster.ts @@ -0,0 +1,196 @@ +import { DagsterIcon } from '@/components/icons' +import type { BlockConfig } from '@/blocks/types' +import { IntegrationType } from '@/blocks/types' +import type { DagsterResponse } from '@/tools/dagster/types' + +export const DagsterBlock: BlockConfig = { + type: 'dagster', + name: 'Dagster', + description: 'Orchestrate data pipelines and manage job runs with Dagster', + longDescription: + 'Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, and terminate in-progress runs. API token only required for Dagster+.', + docsLink: 'https://docs.sim.ai/tools/dagster', + category: 'tools', + integrationType: IntegrationType.Automation, + tags: ['data-analytics', 'automation'], + bgColor: '#191A23', + icon: DagsterIcon, + + subBlocks: [ + { + id: 'operation', + title: 'Operation', + type: 'dropdown', + options: [ + { label: 'Launch Run', id: 'launch_run' }, + { label: 'Get Run', id: 'get_run' }, + { label: 'List Runs', id: 'list_runs' }, + { label: 'List Jobs', id: 'list_jobs' }, + { label: 'Terminate Run', id: 'terminate_run' }, + ], + value: () => 'launch_run', + }, + + // ── Launch Run ── + { + id: 'repositoryLocationName', + title: 'Repository Location', + type: 'short-input', + placeholder: 'e.g., my_code_location', + condition: { field: 'operation', value: 'launch_run' }, + required: { field: 'operation', value: 'launch_run' }, + }, + { + id: 'repositoryName', + title: 'Repository Name', + type: 'short-input', + placeholder: 'e.g., __repository__', + condition: { field: 'operation', value: 'launch_run' }, + required: { field: 'operation', value: 'launch_run' }, + }, + { + id: 'jobName', + title: 'Job Name', + type: 'short-input', + placeholder: 'e.g., my_pipeline_job', + condition: { field: 'operation', value: 'launch_run' }, + required: { field: 'operation', value: 'launch_run' }, + }, + { + id: 'runConfigJson', + title: 'Run Config', + type: 'code', + placeholder: '{"ops": {"my_op": {"config": {"key": "value"}}}}', + condition: { field: 'operation', value: 'launch_run' }, + mode: 'advanced', + wandConfig: { + enabled: true, + prompt: `Generate a Dagster run config JSON object based on the user's description. + +Examples: +- "set partition date to 2024-01-15" -> {"ops": {"load_partition": {"config": {"partition_date": "2024-01-15"}}}} +- "run with debug logging" -> {"execution": {"multiprocess": {"config": {"max_concurrent": 1}}}} + +Return ONLY a valid JSON object - no explanations, no extra text.`, + placeholder: 'Describe the run configuration...', + generationType: 'json-object', + }, + }, + { + id: 'tags', + title: 'Tags', + type: 'code', + placeholder: '[{"key": "env", "value": "prod"}]', + condition: { field: 'operation', value: 'launch_run' }, + mode: 'advanced', + }, + + // ── Get Run / Terminate Run ── + { + id: 'runId', + title: 'Run ID', + type: 'short-input', + placeholder: 'e.g., abc123def456', + condition: { field: 'operation', value: ['get_run', 'terminate_run'] }, + required: { field: 'operation', value: ['get_run', 'terminate_run'] }, + }, + + // ── List Runs ── + { + id: 'listRunsJobName', + title: 'Job Name Filter', + type: 'short-input', + placeholder: 'Filter by job name (optional)', + condition: { field: 'operation', value: 'list_runs' }, + }, + { + id: 'statuses', + title: 'Status Filter', + type: 'short-input', + placeholder: 'e.g. SUCCESS,FAILURE (optional)', + condition: { field: 'operation', value: 'list_runs' }, + mode: 'advanced', + }, + { + id: 'limit', + title: 'Limit', + type: 'short-input', + placeholder: '20', + condition: { field: 'operation', value: 'list_runs' }, + mode: 'advanced', + }, + + // ── Connection (common) ── + { + id: 'host', + title: 'Host', + type: 'short-input', + placeholder: 'http://localhost:3001 or https://myorg.dagster.cloud/prod', + required: true, + }, + { + id: 'apiKey', + title: 'API Token', + type: 'short-input', + placeholder: 'Dagster+ API token (leave blank for OSS / self-hosted)', + password: true, + }, + ], + + tools: { + access: [ + 'dagster_launch_run', + 'dagster_get_run', + 'dagster_list_runs', + 'dagster_list_jobs', + 'dagster_terminate_run', + ], + config: { + tool: (params) => `dagster_${params.operation}`, + params: (params) => { + const result: Record = {} + if (params.limit) result.limit = Number(params.limit) + // Map list_runs job name filter to the correct param + if (params.listRunsJobName) result.jobName = params.listRunsJobName + return result + }, + }, + }, + + inputs: { + operation: { type: 'string', description: 'Operation to perform' }, + host: { type: 'string', description: 'Dagster host URL' }, + apiKey: { + type: 'string', + description: 'Dagster Cloud API token (optional for self-hosted instances)', + }, + repositoryLocationName: { type: 'string', description: 'Repository location name' }, + repositoryName: { type: 'string', description: 'Repository name' }, + jobName: { type: 'string', description: 'Job name to launch' }, + runConfigJson: { type: 'string', description: 'Run configuration as JSON' }, + tags: { type: 'string', description: 'Tags as JSON array of {key, value} objects' }, + runId: { type: 'string', description: 'Run ID' }, + listRunsJobName: { type: 'string', description: 'Filter list_runs by job name' }, + statuses: { type: 'string', description: 'Comma-separated run statuses to filter by' }, + limit: { type: 'number', description: 'Maximum results to return' }, + }, + + outputs: { + // Launch Run + runId: { type: 'string', description: 'Launched or queried run ID' }, + // Get Run + jobName: { type: 'string', description: 'Job name the run belongs to' }, + status: { type: 'string', description: 'Run status' }, + startTime: { type: 'number', description: 'Run start time (Unix timestamp)' }, + endTime: { type: 'number', description: 'Run end time (Unix timestamp)' }, + runConfigYaml: { type: 'string', description: 'Run configuration as YAML' }, + tags: { type: 'json', description: 'Run tags as array of {key, value} objects' }, + // List Runs + runs: { type: 'json', description: 'List of runs' }, + // List Jobs + jobs: { type: 'json', description: 'List of jobs across all repositories' }, + // Terminate Run + success: { type: 'boolean', description: 'Whether termination succeeded' }, + message: { type: 'string', description: 'Termination status or error message' }, + }, +} diff --git a/apps/sim/blocks/registry.ts b/apps/sim/blocks/registry.ts index e78578b176b..d37f90edfcf 100644 --- a/apps/sim/blocks/registry.ts +++ b/apps/sim/blocks/registry.ts @@ -30,6 +30,7 @@ import { ConditionBlock } from '@/blocks/blocks/condition' import { ConfluenceBlock, ConfluenceV2Block } from '@/blocks/blocks/confluence' import { CredentialBlock } from '@/blocks/blocks/credential' import { CursorBlock, CursorV2Block } from '@/blocks/blocks/cursor' +import { DagsterBlock } from '@/blocks/blocks/dagster' import { DatabricksBlock } from '@/blocks/blocks/databricks' import { DatadogBlock } from '@/blocks/blocks/datadog' import { DevinBlock } from '@/blocks/blocks/devin' @@ -254,6 +255,7 @@ export const registry: Record = { confluence_v2: ConfluenceV2Block, cursor: CursorBlock, cursor_v2: CursorV2Block, + dagster: DagsterBlock, databricks: DatabricksBlock, datadog: DatadogBlock, devin: DevinBlock, diff --git a/apps/sim/components/icons.tsx b/apps/sim/components/icons.tsx index cbc3b5edd85..850aeaa7116 100644 --- a/apps/sim/components/icons.tsx +++ b/apps/sim/components/icons.tsx @@ -4929,6 +4929,77 @@ export function SSHIcon(props: SVGProps) { ) } +export function DagsterIcon(props: SVGProps) { + return ( + + + + + + + + + + + + + + + + + + + ) +} + export function DatabricksIcon(props: SVGProps) { return ( diff --git a/apps/sim/tools/dagster/get_run.ts b/apps/sim/tools/dagster/get_run.ts new file mode 100644 index 00000000000..fe2a0ea36ec --- /dev/null +++ b/apps/sim/tools/dagster/get_run.ts @@ -0,0 +1,135 @@ +import type { DagsterGetRunParams, DagsterGetRunResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +const GET_RUN_QUERY = ` + query GetRun($runId: ID!) { + runOrError(runId: $runId) { + ... on Run { + runId + jobName + status + startTime + endTime + runConfigYaml + tags { + key + value + } + } + ... on RunNotFoundError { + message + } + } + } +` + +export const getRunTool: ToolConfig = { + id: 'dagster_get_run', + name: 'Dagster Get Run', + description: 'Get the status and details of a Dagster run by its ID.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3000)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to retrieve', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: GET_RUN_QUERY, + variables: { runId: params.runId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await response.json() + + if (!response.ok) { + throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') + } + + if (data.errors?.length) { + throw new Error(data.errors[0].message) + } + + const result = data.data?.runOrError + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.message && !result.runId) { + throw new Error(result.message) + } + + return { + success: true, + output: { + runId: result.runId, + jobName: result.jobName ?? null, + status: result.status, + startTime: result.startTime ?? null, + endTime: result.endTime ?? null, + runConfigYaml: result.runConfigYaml ?? null, + tags: result.tags ?? null, + }, + } + }, + + outputs: { + runId: { + type: 'string', + description: 'Run ID', + }, + jobName: { + type: 'string', + description: 'Name of the job this run belongs to', + }, + status: { + type: 'string', + description: + 'Run status (QUEUED, NOT_STARTED, STARTING, MANAGED, STARTED, SUCCESS, FAILURE, CANCELING, CANCELED)', + }, + startTime: { + type: 'number', + description: 'Run start time as Unix timestamp', + optional: true, + }, + endTime: { + type: 'number', + description: 'Run end time as Unix timestamp', + optional: true, + }, + runConfigYaml: { + type: 'string', + description: 'Run configuration as YAML', + optional: true, + }, + tags: { + type: 'json', + description: 'Run tags as array of {key, value} objects', + optional: true, + }, + }, +} diff --git a/apps/sim/tools/dagster/index.ts b/apps/sim/tools/dagster/index.ts new file mode 100644 index 00000000000..d65455af2a5 --- /dev/null +++ b/apps/sim/tools/dagster/index.ts @@ -0,0 +1,11 @@ +import { getRunTool } from '@/tools/dagster/get_run' +import { launchRunTool } from '@/tools/dagster/launch_run' +import { listJobsTool } from '@/tools/dagster/list_jobs' +import { listRunsTool } from '@/tools/dagster/list_runs' +import { terminateRunTool } from '@/tools/dagster/terminate_run' + +export const dagsterLaunchRunTool = launchRunTool +export const dagsterGetRunTool = getRunTool +export const dagsterListRunsTool = listRunsTool +export const dagsterListJobsTool = listJobsTool +export const dagsterTerminateRunTool = terminateRunTool diff --git a/apps/sim/tools/dagster/launch_run.ts b/apps/sim/tools/dagster/launch_run.ts new file mode 100644 index 00000000000..bd1b9269f26 --- /dev/null +++ b/apps/sim/tools/dagster/launch_run.ts @@ -0,0 +1,174 @@ +import type { DagsterLaunchRunParams, DagsterLaunchRunResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +interface LaunchRunResult { + type: string + run?: { runId: string } + message?: string +} + +function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { + const varDefs = [ + '$repositoryLocationName: String!', + '$repositoryName: String!', + '$jobName: String!', + ] + if (hasConfig) varDefs.push('$runConfigData: RunConfigData') + if (hasTags) varDefs.push('$tags: [ExecutionTag!]') + + const execParams = [ + `selector: { + repositoryLocationName: $repositoryLocationName + repositoryName: $repositoryName + jobName: $jobName + }`, + ] + if (hasConfig) execParams.push('runConfigData: $runConfigData') + if (hasTags) execParams.push('executionMetadata: { tags: $tags }') + + return ` + mutation LaunchRun(${varDefs.join(', ')}) { + launchRun( + executionParams: { + ${execParams.join('\n ')} + } + ) { + type: __typename + ... on LaunchRunSuccess { + run { + runId + } + } + ... on Error { + message + } + } + } + ` +} + +export const launchRunTool: ToolConfig = { + id: 'dagster_launch_run', + name: 'Dagster Launch Run', + description: 'Launch a job run on a Dagster instance.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3000)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + jobName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Name of the job to launch', + }, + runConfigJson: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Run configuration as a JSON object (optional)', + }, + tags: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Tags as a JSON array of {key, value} objects (optional)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const variables: Record = { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + jobName: params.jobName, + } + + let hasConfig = false + if (params.runConfigJson) { + try { + variables.runConfigData = JSON.parse(params.runConfigJson) + hasConfig = true + } catch { + throw new Error('Invalid JSON in runConfigJson') + } + } + + let hasTags = false + if (params.tags) { + try { + variables.tags = JSON.parse(params.tags) + hasTags = true + } catch { + throw new Error('Invalid JSON in tags') + } + } + + return { + query: buildLaunchRunMutation(hasConfig, hasTags), + variables, + } + }, + }, + + transformResponse: async (response: Response) => { + const data = await response.json() + + if (!response.ok) { + throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') + } + + if (data.errors?.length) { + throw new Error(data.errors[0].message) + } + + const result = data.data?.launchRun as LaunchRunResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'LaunchRunSuccess' && result.run) { + return { + success: true, + output: { runId: result.run.runId }, + } + } + + throw new Error(`${result.type}: ${result.message ?? 'Launch run failed'}`) + }, + + outputs: { + runId: { + type: 'string', + description: 'The globally unique ID of the launched run', + }, + }, +} diff --git a/apps/sim/tools/dagster/list_jobs.ts b/apps/sim/tools/dagster/list_jobs.ts new file mode 100644 index 00000000000..292f42f0259 --- /dev/null +++ b/apps/sim/tools/dagster/list_jobs.ts @@ -0,0 +1,100 @@ +import type { DagsterBaseParams, DagsterListJobsResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +const LIST_JOBS_QUERY = ` + query ListJobNames { + repositoriesOrError { + ... on RepositoryConnection { + nodes { + name + jobs { + name + } + } + } + } + } +` + +export const listJobsTool: ToolConfig = { + id: 'dagster_list_jobs', + name: 'Dagster List Jobs', + description: 'List all jobs across repositories in a Dagster instance.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: () => ({ + query: LIST_JOBS_QUERY, + variables: {}, + }), + }, + + transformResponse: async (response: Response) => { + const data = await response.json() + + if (!response.ok) { + throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') + } + + if (data.errors?.length) { + throw new Error(data.errors[0].message) + } + + const result = data.data?.repositoriesOrError + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.nodes != null) { + const jobs: Array<{ name: string; repositoryName: string }> = [] + + for (const repo of result.nodes ?? []) { + for (const job of repo.jobs ?? []) { + jobs.push({ + name: job.name, + repositoryName: repo.name, + }) + } + } + + return { + success: true, + output: { jobs }, + } + } + + throw new Error(result.message || 'List jobs failed') + }, + + outputs: { + jobs: { + type: 'json', + description: 'Array of jobs with name and repositoryName', + properties: { + name: { type: 'string', description: 'Job name' }, + repositoryName: { type: 'string', description: 'Repository name' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/list_runs.ts b/apps/sim/tools/dagster/list_runs.ts new file mode 100644 index 00000000000..83ca22934f0 --- /dev/null +++ b/apps/sim/tools/dagster/list_runs.ts @@ -0,0 +1,147 @@ +import type { DagsterListRunsParams, DagsterListRunsResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +function buildListRunsQuery(hasFilter: boolean) { + return ` + query ListRuns($limit: Int${hasFilter ? ', $filter: RunsFilter' : ''}) { + runsOrError(limit: $limit${hasFilter ? ', filter: $filter' : ''}) { + ... on Runs { + results { + runId + jobName + status + tags { + key + value + } + startTime + endTime + } + } + } + } + ` +} + +export const listRunsTool: ToolConfig = { + id: 'dagster_list_runs', + name: 'Dagster List Runs', + description: 'List recent Dagster runs, optionally filtered by job name.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + jobName: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter runs by job name (optional)', + }, + statuses: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Comma-separated run statuses to filter by, e.g. "SUCCESS,FAILURE" (optional)', + }, + limit: { + type: 'number', + required: false, + visibility: 'user-or-llm', + description: 'Maximum number of runs to return (default 20)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const filter: Record = {} + if (params.jobName) filter.pipelineName = params.jobName + if (params.statuses) { + filter.statuses = params.statuses + .split(',') + .map((s: string) => s.trim()) + .filter(Boolean) + } + + const hasFilter = Object.keys(filter).length > 0 + const variables: Record = { limit: params.limit ?? 20 } + if (hasFilter) variables.filter = filter + + return { + query: buildListRunsQuery(hasFilter), + variables, + } + }, + }, + + transformResponse: async (response: Response) => { + const data = await response.json() + + if (!response.ok) { + throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') + } + + if (data.errors?.length) { + throw new Error(data.errors[0].message) + } + + const result = data.data?.runsOrError + if (!result) throw new Error('Unexpected response from Dagster') + + const runs = (result.results ?? []).map( + (r: { + runId: string + jobName: string | null + status: string + tags: Array<{ key: string; value: string }> | null + startTime: number | null + endTime: number | null + }) => ({ + runId: r.runId, + jobName: r.jobName ?? null, + status: r.status, + tags: r.tags ?? null, + startTime: r.startTime ?? null, + endTime: r.endTime ?? null, + }) + ) + + return { + success: true, + output: { runs }, + } + }, + + outputs: { + runs: { + type: 'json', + description: 'Array of runs', + properties: { + runId: { type: 'string', description: 'Run ID' }, + jobName: { type: 'string', description: 'Job name' }, + status: { type: 'string', description: 'Run status' }, + tags: { type: 'json', description: 'Run tags as array of {key, value} objects' }, + startTime: { type: 'number', description: 'Start time as Unix timestamp' }, + endTime: { type: 'number', description: 'End time as Unix timestamp' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/terminate_run.ts b/apps/sim/tools/dagster/terminate_run.ts new file mode 100644 index 00000000000..ce5e3b75737 --- /dev/null +++ b/apps/sim/tools/dagster/terminate_run.ts @@ -0,0 +1,125 @@ +import type { DagsterTerminateRunParams, DagsterTerminateRunResponse } from '@/tools/dagster/types' +import type { ToolConfig } from '@/tools/types' + +const TERMINATE_RUN_MUTATION = ` + mutation TerminateRun($runId: String!) { + terminateRun(runId: $runId) { + ... on TerminateRunSuccess { + run { + runId + } + } + ... on TerminateRunFailure { + run { + runId + } + message + } + ... on RunNotFoundError { + message + } + } + } +` + +export const terminateRunTool: ToolConfig = + { + id: 'dagster_terminate_run', + name: 'Dagster Terminate Run', + description: 'Terminate an in-progress Dagster run.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to terminate', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: TERMINATE_RUN_MUTATION, + variables: { runId: params.runId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await response.json() + + if (!response.ok) { + throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') + } + + if (data.errors?.length) { + throw new Error(data.errors[0].message) + } + + const result = data.data?.terminateRun + if (!result) throw new Error('Unexpected response from Dagster') + + // TerminateRunSuccess: has run.runId, no message + if (result.run?.runId && !result.message) { + return { + success: true, + output: { + success: true, + runId: result.run.runId, + message: null, + }, + } + } + + // TerminateRunFailure: has run.runId and message + if (result.run?.runId && result.message) { + return { + success: true, + output: { + success: false, + runId: result.run.runId, + message: result.message, + }, + } + } + + // RunNotFoundError: only has message + throw new Error(result.message || 'Terminate run failed') + }, + + outputs: { + success: { + type: 'boolean', + description: 'Whether the run was successfully terminated', + }, + runId: { + type: 'string', + description: 'The ID of the terminated run', + }, + message: { + type: 'string', + description: 'Error or status message if termination failed', + optional: true, + }, + }, + } diff --git a/apps/sim/tools/dagster/types.ts b/apps/sim/tools/dagster/types.ts new file mode 100644 index 00000000000..2cefac6f17d --- /dev/null +++ b/apps/sim/tools/dagster/types.ts @@ -0,0 +1,92 @@ +import type { ToolResponse } from '@/tools/types' + +/** Base parameters shared by all Dagster tools */ +export interface DagsterBaseParams { + /** Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001) */ + host: string + /** Dagster+ API token */ + apiKey: string +} + +/** Launch Run */ +export interface DagsterLaunchRunParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + jobName: string + runConfigJson?: string + tags?: string +} + +export interface DagsterLaunchRunResponse extends ToolResponse { + output: { + runId: string + } +} + +/** Get Run */ +export interface DagsterGetRunParams extends DagsterBaseParams { + runId: string +} + +export interface DagsterGetRunResponse extends ToolResponse { + output: { + runId: string + jobName: string + status: string + startTime: number | null + endTime: number | null + runConfigYaml: string | null + tags: Array<{ key: string; value: string }> | null + } +} + +/** List Runs */ +export interface DagsterListRunsParams extends DagsterBaseParams { + jobName?: string + statuses?: string + limit?: number +} + +export interface DagsterListRunsResponse extends ToolResponse { + output: { + runs: Array<{ + runId: string + jobName: string | null + status: string + tags: Array<{ key: string; value: string }> | null + startTime: number | null + endTime: number | null + }> + } +} + +/** List Jobs */ +export interface DagsterListJobsResponse extends ToolResponse { + output: { + jobs: Array<{ + name: string + repositoryName: string + }> + } +} + +/** Terminate Run */ +export interface DagsterTerminateRunParams extends DagsterBaseParams { + runId: string +} + +export interface DagsterTerminateRunResponse extends ToolResponse { + output: { + success: boolean + runId: string + message: string | null + } +} + +/** Union type for all Dagster responses */ +export type DagsterResponse = + | DagsterLaunchRunResponse + | DagsterGetRunResponse + | DagsterListRunsResponse + | DagsterListJobsResponse + | DagsterTerminateRunResponse diff --git a/apps/sim/tools/registry.ts b/apps/sim/tools/registry.ts index 22c04f0b797..b7277e1c967 100644 --- a/apps/sim/tools/registry.ts +++ b/apps/sim/tools/registry.ts @@ -361,6 +361,13 @@ import { cursorStopAgentTool, cursorStopAgentV2Tool, } from '@/tools/cursor' +import { + dagsterGetRunTool, + dagsterLaunchRunTool, + dagsterListJobsTool, + dagsterListRunsTool, + dagsterTerminateRunTool, +} from '@/tools/dagster' import { databricksCancelRunTool, databricksExecuteSqlTool, @@ -3444,6 +3451,11 @@ export const tools: Record = { devin_get_session: devinGetSessionTool, devin_list_sessions: devinListSessionsTool, devin_send_message: devinSendMessageTool, + dagster_get_run: dagsterGetRunTool, + dagster_launch_run: dagsterLaunchRunTool, + dagster_list_jobs: dagsterListJobsTool, + dagster_list_runs: dagsterListRunsTool, + dagster_terminate_run: dagsterTerminateRunTool, databricks_cancel_run: databricksCancelRunTool, databricks_execute_sql: databricksExecuteSqlTool, databricks_get_run: databricksGetRunTool, From 3ca1cb5bb036e8273f71cbb527d3ea90eb64bb8d Mon Sep 17 00:00:00 2001 From: abhinavDhulipala <46908860+abhinavDhulipala@users.noreply.github.com> Date: Fri, 3 Apr 2026 17:29:27 -0700 Subject: [PATCH 02/12] type safety improvements Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> --- apps/sim/blocks/blocks/dagster.ts | 2 +- apps/sim/tools/dagster/types.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/sim/blocks/blocks/dagster.ts b/apps/sim/blocks/blocks/dagster.ts index 08b1be6f5e0..fbf7541148f 100644 --- a/apps/sim/blocks/blocks/dagster.ts +++ b/apps/sim/blocks/blocks/dagster.ts @@ -149,7 +149,7 @@ Return ONLY a valid JSON object - no explanations, no extra text.`, tool: (params) => `dagster_${params.operation}`, params: (params) => { const result: Record = {} - if (params.limit) result.limit = Number(params.limit) + if (params.limit != null && params.limit !== '') result.limit = Number(params.limit) // Map list_runs job name filter to the correct param if (params.listRunsJobName) result.jobName = params.listRunsJobName return result diff --git a/apps/sim/tools/dagster/types.ts b/apps/sim/tools/dagster/types.ts index 2cefac6f17d..e705dc618c7 100644 --- a/apps/sim/tools/dagster/types.ts +++ b/apps/sim/tools/dagster/types.ts @@ -5,7 +5,7 @@ export interface DagsterBaseParams { /** Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001) */ host: string /** Dagster+ API token */ - apiKey: string + apiKey?: string } /** Launch Run */ From 2dee111dad26b8b0250ebb9bf2d7f853ea1f2d09 Mon Sep 17 00:00:00 2001 From: abhinavDhulipala Date: Sat, 4 Apr 2026 09:40:37 -0700 Subject: [PATCH 03/12] unify error handeling across dg tool --- apps/sim/tools/dagster/get_run.ts | 50 +++++++++++-------- apps/sim/tools/dagster/graphql.ts | 41 +++++++++++++++ apps/sim/tools/dagster/launch_run.ts | 16 +++--- apps/sim/tools/dagster/list_jobs.ts | 47 +++++++++--------- apps/sim/tools/dagster/list_runs.ts | 56 +++++++++++---------- apps/sim/tools/dagster/terminate_run.ts | 66 ++++++++++++++----------- apps/sim/tools/dagster/types.ts | 2 +- apps/sim/tools/utils.server.ts | 2 +- apps/sim/tools/utils.test.ts | 2 + apps/sim/tools/utils.ts | 4 +- 10 files changed, 173 insertions(+), 113 deletions(-) create mode 100644 apps/sim/tools/dagster/graphql.ts diff --git a/apps/sim/tools/dagster/get_run.ts b/apps/sim/tools/dagster/get_run.ts index fe2a0ea36ec..b515d839a63 100644 --- a/apps/sim/tools/dagster/get_run.ts +++ b/apps/sim/tools/dagster/get_run.ts @@ -1,6 +1,18 @@ +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' import type { DagsterGetRunParams, DagsterGetRunResponse } from '@/tools/dagster/types' import type { ToolConfig } from '@/tools/types' +/** Fields selected on `runOrError` when the union resolves to `Run`. */ +interface DagsterGetRunGraphqlRun { + runId: string + jobName: string | null + status: string + startTime: number | null + endTime: number | null + runConfigYaml: string | null + tags: Array<{ key: string; value: string }> | null +} + const GET_RUN_QUERY = ` query GetRun($runId: ID!) { runOrError(runId: $runId) { @@ -16,7 +28,8 @@ const GET_RUN_QUERY = ` value } } - ... on RunNotFoundError { + ... on Error { + __typename message } } @@ -66,33 +79,29 @@ export const getRunTool: ToolConfig }, transformResponse: async (response: Response) => { - const data = await response.json() + const data = await parseDagsterGraphqlResponse<{ runOrError?: unknown }>(response) - if (!response.ok) { - throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') - } + const raw = data.data?.runOrError + if (!raw || typeof raw !== 'object') throw new Error('Unexpected response from Dagster') - if (data.errors?.length) { - throw new Error(data.errors[0].message) + if (!('runId' in raw) || typeof (raw as { runId: unknown }).runId !== 'string') { + throw new Error( + dagsterUnionErrorMessage(raw as { message?: string }, 'Run not found or Dagster error') + ) } - const result = data.data?.runOrError - if (!result) throw new Error('Unexpected response from Dagster') - - if (result.message && !result.runId) { - throw new Error(result.message) - } + const run = raw as DagsterGetRunGraphqlRun return { success: true, output: { - runId: result.runId, - jobName: result.jobName ?? null, - status: result.status, - startTime: result.startTime ?? null, - endTime: result.endTime ?? null, - runConfigYaml: result.runConfigYaml ?? null, - tags: result.tags ?? null, + runId: run.runId, + jobName: run.jobName ?? null, + status: run.status, + startTime: run.startTime ?? null, + endTime: run.endTime ?? null, + runConfigYaml: run.runConfigYaml ?? null, + tags: run.tags ?? null, }, } }, @@ -105,6 +114,7 @@ export const getRunTool: ToolConfig jobName: { type: 'string', description: 'Name of the job this run belongs to', + optional: true, }, status: { type: 'string', diff --git a/apps/sim/tools/dagster/graphql.ts b/apps/sim/tools/dagster/graphql.ts new file mode 100644 index 00000000000..b41fc135339 --- /dev/null +++ b/apps/sim/tools/dagster/graphql.ts @@ -0,0 +1,41 @@ +/** + * Parses a Dagster GraphQL JSON body and throws if the HTTP status is not OK or the payload + * contains top-level GraphQL errors. + * + * Field errors should be requested with `... on Error { __typename message }` (or at least + * `message`) so union failures are not returned as empty objects. + */ +export async function parseDagsterGraphqlResponse>( + response: Response +): Promise<{ data?: TData }> { + let payload: { + data?: TData + errors?: ReadonlyArray<{ message?: string }> + } + try { + payload = (await response.json()) as { + data?: TData + errors?: ReadonlyArray<{ message?: string }> + } + } catch { + throw new Error('Invalid JSON response from Dagster') + } + if (!response.ok) { + throw new Error(payload.errors?.[0]?.message || 'Dagster GraphQL request failed') + } + if (payload.errors?.length) { + throw new Error(payload.errors[0]?.message ?? 'Dagster GraphQL request failed') + } + return { data: payload.data } +} + +/** + * Message from a field that includes `... on Error { message }`, or a fallback when the + * payload is not a GraphQL `Error` type with a string message. + */ +export function dagsterUnionErrorMessage( + result: { message?: string } | undefined, + fallback: string +): string { + return typeof result?.message === 'string' ? result.message : fallback +} diff --git a/apps/sim/tools/dagster/launch_run.ts b/apps/sim/tools/dagster/launch_run.ts index bd1b9269f26..9ed534e3988 100644 --- a/apps/sim/tools/dagster/launch_run.ts +++ b/apps/sim/tools/dagster/launch_run.ts @@ -1,3 +1,4 @@ +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' import type { DagsterLaunchRunParams, DagsterLaunchRunResponse } from '@/tools/dagster/types' import type { ToolConfig } from '@/tools/types' @@ -40,6 +41,7 @@ function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { } } ... on Error { + __typename message } } @@ -142,15 +144,7 @@ export const launchRunTool: ToolConfig { - const data = await response.json() - - if (!response.ok) { - throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') - } - - if (data.errors?.length) { - throw new Error(data.errors[0].message) - } + const data = await parseDagsterGraphqlResponse<{ launchRun?: unknown }>(response) const result = data.data?.launchRun as LaunchRunResult | undefined if (!result) throw new Error('Unexpected response from Dagster') @@ -162,7 +156,9 @@ export const launchRunTool: ToolConfig { - const data = await response.json() - - if (!response.ok) { - throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') - } - - if (data.errors?.length) { - throw new Error(data.errors[0].message) - } + const data = await parseDagsterGraphqlResponse<{ repositoriesOrError?: unknown }>(response) - const result = data.data?.repositoriesOrError + const result = data.data?.repositoriesOrError as + | { nodes?: Array<{ name: string; jobs?: Array<{ name: string }> }>; message?: string } + | undefined if (!result) throw new Error('Unexpected response from Dagster') - if (result.nodes != null) { - const jobs: Array<{ name: string; repositoryName: string }> = [] + if (!Array.isArray(result.nodes)) { + throw new Error(dagsterUnionErrorMessage(result, 'List jobs failed')) + } - for (const repo of result.nodes ?? []) { - for (const job of repo.jobs ?? []) { - jobs.push({ - name: job.name, - repositoryName: repo.name, - }) - } - } + const jobs: Array<{ name: string; repositoryName: string }> = [] - return { - success: true, - output: { jobs }, + for (const repo of result.nodes) { + for (const job of repo.jobs ?? []) { + jobs.push({ + name: job.name, + repositoryName: repo.name, + }) } } - throw new Error(result.message || 'List jobs failed') + return { + success: true, + output: { jobs }, + } }, outputs: { diff --git a/apps/sim/tools/dagster/list_runs.ts b/apps/sim/tools/dagster/list_runs.ts index 83ca22934f0..646e6beb6b3 100644 --- a/apps/sim/tools/dagster/list_runs.ts +++ b/apps/sim/tools/dagster/list_runs.ts @@ -1,6 +1,17 @@ +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' import type { DagsterListRunsParams, DagsterListRunsResponse } from '@/tools/dagster/types' import type { ToolConfig } from '@/tools/types' +/** Shape of each run in the `runsOrError` → `Runs.results` GraphQL selection set. */ +interface DagsterListRunsGraphqlRow { + runId: string + jobName: string | null + status: string + tags: Array<{ key: string; value: string }> | null + startTime: number | null + endTime: number | null +} + function buildListRunsQuery(hasFilter: boolean) { return ` query ListRuns($limit: Int${hasFilter ? ', $filter: RunsFilter' : ''}) { @@ -18,6 +29,10 @@ function buildListRunsQuery(hasFilter: boolean) { endTime } } + ... on Error { + __typename + message + } } } ` @@ -93,36 +108,25 @@ export const listRunsTool: ToolConfig { - const data = await response.json() + const data = await parseDagsterGraphqlResponse<{ runsOrError?: unknown }>(response) - if (!response.ok) { - throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') - } + const result = data.data?.runsOrError as + | { results?: DagsterListRunsGraphqlRow[]; message?: string } + | undefined + if (!result) throw new Error('Unexpected response from Dagster') - if (data.errors?.length) { - throw new Error(data.errors[0].message) + if (!Array.isArray(result.results)) { + throw new Error(dagsterUnionErrorMessage(result, 'Dagster returned an error listing runs')) } - const result = data.data?.runsOrError - if (!result) throw new Error('Unexpected response from Dagster') - - const runs = (result.results ?? []).map( - (r: { - runId: string - jobName: string | null - status: string - tags: Array<{ key: string; value: string }> | null - startTime: number | null - endTime: number | null - }) => ({ - runId: r.runId, - jobName: r.jobName ?? null, - status: r.status, - tags: r.tags ?? null, - startTime: r.startTime ?? null, - endTime: r.endTime ?? null, - }) - ) + const runs = result.results.map((r: DagsterListRunsGraphqlRow) => ({ + runId: r.runId, + jobName: r.jobName ?? null, + status: r.status, + tags: r.tags ?? null, + startTime: r.startTime ?? null, + endTime: r.endTime ?? null, + })) return { success: true, diff --git a/apps/sim/tools/dagster/terminate_run.ts b/apps/sim/tools/dagster/terminate_run.ts index ce5e3b75737..c79d6ccfb45 100644 --- a/apps/sim/tools/dagster/terminate_run.ts +++ b/apps/sim/tools/dagster/terminate_run.ts @@ -1,6 +1,14 @@ +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' import type { DagsterTerminateRunParams, DagsterTerminateRunResponse } from '@/tools/dagster/types' import type { ToolConfig } from '@/tools/types' +/** Fields returned from `terminateRun` for success and `Error` union members. */ +interface DagsterTerminateRunPayload { + __typename?: string + run?: { runId: string } + message?: string +} + const TERMINATE_RUN_MUTATION = ` mutation TerminateRun($runId: String!) { terminateRun(runId: $runId) { @@ -9,13 +17,8 @@ const TERMINATE_RUN_MUTATION = ` runId } } - ... on TerminateRunFailure { - run { - runId - } - message - } - ... on RunNotFoundError { + ... on Error { + __typename message } } @@ -65,22 +68,15 @@ export const terminateRunTool: ToolConfig { - const data = await response.json() - - if (!response.ok) { - throw new Error(data.errors?.[0]?.message || 'Dagster GraphQL request failed') - } - - if (data.errors?.length) { - throw new Error(data.errors[0].message) - } + transformResponse: async (response: Response, params?: DagsterTerminateRunParams) => { + const data = await parseDagsterGraphqlResponse<{ terminateRun?: DagsterTerminateRunPayload }>( + response + ) const result = data.data?.terminateRun if (!result) throw new Error('Unexpected response from Dagster') - // TerminateRunSuccess: has run.runId, no message - if (result.run?.runId && !result.message) { + if (result.run?.runId) { return { success: true, output: { @@ -91,20 +87,30 @@ export const terminateRunTool: ToolConfig { method: 'GET', headers: { 'Content-Type': 'application/json' }, body: undefined, // No body for GET + toolParams: params, }) expect(mockTool.request.headers).toHaveBeenCalledWith(params) @@ -165,6 +166,7 @@ describe('formatRequestParams', () => { method: 'GET', headers: { 'Content-Type': 'application/json' }, body: undefined, + toolParams: params, }) }) diff --git a/apps/sim/tools/utils.ts b/apps/sim/tools/utils.ts index 397309bbe2f..c8985179c14 100644 --- a/apps/sim/tools/utils.ts +++ b/apps/sim/tools/utils.ts @@ -76,6 +76,8 @@ export interface RequestParams { headers: Record body?: string timeout?: number + /** Original tool invocation params; forwarded to `transformResponse` when using `executeRequest`. */ + toolParams?: Record } /** @@ -131,7 +133,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record Date: Sat, 4 Apr 2026 11:26:36 -0700 Subject: [PATCH 04/12] update icon to daggy --- apps/docs/components/icons.tsx | 57 ++++++++++++++----------- apps/sim/components/icons.tsx | 52 ++++++---------------- apps/sim/tools/dagster/terminate_run.ts | 25 +---------- apps/sim/tools/utils.server.ts | 2 +- apps/sim/tools/utils.test.ts | 2 - apps/sim/tools/utils.ts | 4 +- 6 files changed, 46 insertions(+), 96 deletions(-) diff --git a/apps/docs/components/icons.tsx b/apps/docs/components/icons.tsx index 2ed99b081de..502f749d71e 100644 --- a/apps/docs/components/icons.tsx +++ b/apps/docs/components/icons.tsx @@ -124,29 +124,6 @@ export function ConditionalIcon(props: SVGProps) { ) } -export function CredentialIcon(props: SVGProps) { - return ( - - - - - - - ) -} - export function NoteIcon(props: SVGProps) { return ( ) { export function DagsterIcon(props: SVGProps) { return ( - + + + + + + + + ) } diff --git a/apps/sim/components/icons.tsx b/apps/sim/components/icons.tsx index 850aeaa7116..c4b927b2f7b 100644 --- a/apps/sim/components/icons.tsx +++ b/apps/sim/components/icons.tsx @@ -4931,70 +4931,42 @@ export function SSHIcon(props: SVGProps) { export function DagsterIcon(props: SVGProps) { return ( - + - - - - - - - ) diff --git a/apps/sim/tools/dagster/terminate_run.ts b/apps/sim/tools/dagster/terminate_run.ts index c79d6ccfb45..543b9195c4f 100644 --- a/apps/sim/tools/dagster/terminate_run.ts +++ b/apps/sim/tools/dagster/terminate_run.ts @@ -68,7 +68,7 @@ export const terminateRunTool: ToolConfig { + transformResponse: async (response: Response) => { const data = await parseDagsterGraphqlResponse<{ terminateRun?: DagsterTerminateRunPayload }>( response ) @@ -87,29 +87,6 @@ export const terminateRunTool: ToolConfig { method: 'GET', headers: { 'Content-Type': 'application/json' }, body: undefined, // No body for GET - toolParams: params, }) expect(mockTool.request.headers).toHaveBeenCalledWith(params) @@ -166,7 +165,6 @@ describe('formatRequestParams', () => { method: 'GET', headers: { 'Content-Type': 'application/json' }, body: undefined, - toolParams: params, }) }) diff --git a/apps/sim/tools/utils.ts b/apps/sim/tools/utils.ts index c8985179c14..397309bbe2f 100644 --- a/apps/sim/tools/utils.ts +++ b/apps/sim/tools/utils.ts @@ -76,8 +76,6 @@ export interface RequestParams { headers: Record body?: string timeout?: number - /** Original tool invocation params; forwarded to `transformResponse` when using `executeRequest`. */ - toolParams?: Record } /** @@ -133,7 +131,7 @@ export function formatRequestParams(tool: ToolConfig, params: Record Date: Sat, 4 Apr 2026 11:26:36 -0700 Subject: [PATCH 05/12] update icon to daggy --- apps/sim/tools/dagster/launch_run.ts | 4 +--- apps/sim/tools/dagster/list_runs.ts | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/apps/sim/tools/dagster/launch_run.ts b/apps/sim/tools/dagster/launch_run.ts index 9ed534e3988..847cad7344b 100644 --- a/apps/sim/tools/dagster/launch_run.ts +++ b/apps/sim/tools/dagster/launch_run.ts @@ -156,9 +156,7 @@ export const launchRunTool: ToolConfig 0 - const variables: Record = { limit: params.limit ?? 20 } + const variables: Record = { limit: params.limit || 20 } if (hasFilter) variables.filter = filter return { From 3f7839ccd0f19598d9680849a86b575dd6099fb6 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 7 Apr 2026 09:25:55 -0700 Subject: [PATCH 06/12] feat(dagster): expand integration with 9 new tools and full GraphQL validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add 9 new tools: delete_run, get_run_logs, reexecute_run, list_schedules, start_schedule, stop_schedule, list_sensors, start_sensor, stop_sensor - Fix GraphQL union type handling across all tools (replace invalid `... on Error` with concrete union member fragments per Dagster schema) - Fix TerminateRunFailure, InvalidStepError, InvalidOutputError handling in existing tools - Rename graphql.ts → utils.ts for clarity - Wire all 14 operations into the Dagster block with proper conditions and param remapping - Update icon to dagster logo SVG and set bgColor to white - Add block wiring guidance to the add-tools skill --- .agents/skills/add-tools/SKILL.md | 132 ++++++++- apps/docs/components/icons.tsx | 25 +- apps/docs/content/docs/en/tools/dagster.mdx | 246 ++++++++++++++-- .../integrations/data/integrations.json | 50 +++- apps/sim/blocks/blocks/dagster.ts | 267 ++++++++++++++++-- apps/sim/components/icons.tsx | 2 +- apps/sim/tools/dagster/delete_run.ts | 89 ++++++ apps/sim/tools/dagster/get_run.ts | 2 +- apps/sim/tools/dagster/get_run_logs.ts | 162 +++++++++++ apps/sim/tools/dagster/index.ts | 20 ++ apps/sim/tools/dagster/launch_run.ts | 26 +- apps/sim/tools/dagster/list_jobs.ts | 2 +- apps/sim/tools/dagster/list_runs.ts | 2 +- apps/sim/tools/dagster/list_schedules.ts | 161 +++++++++++ apps/sim/tools/dagster/list_sensors.ts | 146 ++++++++++ apps/sim/tools/dagster/reexecute_run.ts | 137 +++++++++ apps/sim/tools/dagster/start_schedule.ts | 135 +++++++++ apps/sim/tools/dagster/start_sensor.ts | 122 ++++++++ apps/sim/tools/dagster/stop_schedule.ts | 118 ++++++++ apps/sim/tools/dagster/stop_sensor.ts | 104 +++++++ apps/sim/tools/dagster/terminate_run.ts | 20 +- apps/sim/tools/dagster/types.ts | 130 ++++++++- .../tools/dagster/{graphql.ts => utils.ts} | 0 apps/sim/tools/registry.ts | 18 ++ 24 files changed, 2038 insertions(+), 78 deletions(-) create mode 100644 apps/sim/tools/dagster/delete_run.ts create mode 100644 apps/sim/tools/dagster/get_run_logs.ts create mode 100644 apps/sim/tools/dagster/list_schedules.ts create mode 100644 apps/sim/tools/dagster/list_sensors.ts create mode 100644 apps/sim/tools/dagster/reexecute_run.ts create mode 100644 apps/sim/tools/dagster/start_schedule.ts create mode 100644 apps/sim/tools/dagster/start_sensor.ts create mode 100644 apps/sim/tools/dagster/stop_schedule.ts create mode 100644 apps/sim/tools/dagster/stop_sensor.ts rename apps/sim/tools/dagster/{graphql.ts => utils.ts} (100%) diff --git a/.agents/skills/add-tools/SKILL.md b/.agents/skills/add-tools/SKILL.md index 66f6f88d047..03dbd68456f 100644 --- a/.agents/skills/add-tools/SKILL.md +++ b/.agents/skills/add-tools/SKILL.md @@ -266,9 +266,9 @@ export * from './types' ## Registering Tools -After creating tools, remind the user to: +After creating tools: 1. Import tools in `apps/sim/tools/registry.ts` -2. Add to the `tools` object with snake_case keys: +2. Add to the `tools` object with snake_case keys (alphabetically): ```typescript import { serviceActionTool } from '@/tools/{service}' @@ -278,6 +278,130 @@ export const tools = { } ``` +## Wiring Tools into the Block (Required) + +After registering in `tools/registry.ts`, you MUST also update the block definition at `apps/sim/blocks/blocks/{service}.ts`. This is not optional — tools are only usable from the UI if they are wired into the block. + +### 1. Add to `tools.access` + +```typescript +tools: { + access: [ + // existing tools... + 'service_new_action', // Add every new tool ID here + ], + config: { ... } +} +``` + +### 2. Add operation dropdown options + +If the block uses an operation dropdown, add an option for each new tool: + +```typescript +{ + id: 'operation', + type: 'dropdown', + options: [ + // existing options... + { label: 'New Action', id: 'new_action' }, // id maps to what tools.config.tool returns + ], +} +``` + +### 3. Add subBlocks for new tool params + +For each new tool, add subBlocks covering all its required params (and optional ones where useful). Apply `condition` to show them only for the right operation, and mark required params with `required`: + +```typescript +// Required param for new_action +{ + id: 'someParam', + title: 'Some Param', + type: 'short-input', + placeholder: 'e.g., value', + condition: { field: 'operation', value: 'new_action' }, + required: { field: 'operation', value: 'new_action' }, +}, +// Optional param — put in advanced mode +{ + id: 'optionalParam', + title: 'Optional Param', + type: 'short-input', + condition: { field: 'operation', value: 'new_action' }, + mode: 'advanced', +}, +``` + +### 4. Update `tools.config.tool` + +Ensure the tool selector returns the correct tool ID for every new operation. The simplest pattern: + +```typescript +tool: (params) => `service_${params.operation}`, +// If operation dropdown IDs already match tool IDs, this requires no change. +``` + +If the dropdown IDs differ from tool IDs, add explicit mappings: + +```typescript +tool: (params) => { + const map: Record = { + new_action: 'service_new_action', + // ... + } + return map[params.operation] ?? `service_${params.operation}` +}, +``` + +### 5. Update `tools.config.params` + +Add any type coercions needed for new params (runs at execution time, after variable resolution): + +```typescript +params: (params) => { + const result: Record = {} + if (params.limit != null && params.limit !== '') result.limit = Number(params.limit) + if (params.newParamName) result.toolParamName = params.newParamName // rename if IDs differ + return result +}, +``` + +### 6. Add new outputs + +Add any new fields returned by the new tools to the block `outputs`: + +```typescript +outputs: { + // existing outputs... + newField: { type: 'string', description: 'Description of new field' }, +} +``` + +### 7. Add new inputs + +Add new subBlock param IDs to the block `inputs` section: + +```typescript +inputs: { + // existing inputs... + someParam: { type: 'string', description: 'Param description' }, + optionalParam: { type: 'string', description: 'Optional param description' }, +} +``` + +### Block wiring checklist + +- [ ] New tool IDs added to `tools.access` +- [ ] Operation dropdown has an option for each new tool +- [ ] SubBlocks cover all required params for each new tool +- [ ] SubBlocks have correct `condition` (only show for the right operation) +- [ ] Optional/rarely-used params set to `mode: 'advanced'` +- [ ] `tools.config.tool` returns correct ID for every new operation +- [ ] `tools.config.params` handles any ID remapping or type coercions +- [ ] New outputs added to block `outputs` +- [ ] New params added to block `inputs` + ## V2 Tool Pattern If creating V2 tools (API-aligned outputs), use `_v2` suffix: @@ -299,7 +423,9 @@ All tool IDs MUST use `snake_case`: `{service}_{action}` (e.g., `x_create_tweet` - [ ] All optional outputs have `optional: true` - [ ] No raw JSON dumps in outputs - [ ] Types file has all interfaces -- [ ] Index.ts exports all tools +- [ ] Index.ts exports all tools and re-exports types (`export * from './types'`) +- [ ] Tools registered in `tools/registry.ts` +- [ ] Block wired: `tools.access`, dropdown options, subBlocks, `tools.config`, outputs, inputs ## Final Validation (Required) diff --git a/apps/docs/components/icons.tsx b/apps/docs/components/icons.tsx index 502f749d71e..8b0ed3aabb1 100644 --- a/apps/docs/components/icons.tsx +++ b/apps/docs/components/icons.tsx @@ -124,6 +124,29 @@ export function ConditionalIcon(props: SVGProps) { ) } +export function CredentialIcon(props: SVGProps) { + return ( + + + + + + + ) +} + export function NoteIcon(props: SVGProps) { return ( ) { export function DagsterIcon(props: SVGProps) { return ( - + ## Usage Instructions -Connect to Dagster+ to launch job runs, monitor run status, list available jobs across repositories, and terminate in-progress runs. Requires a Dagster Cloud API token. +Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, terminate or delete runs, reexecute failed runs, fetch run logs, and manage schedules and sensors. API token only required for Dagster+. @@ -20,15 +20,14 @@ Connect to Dagster+ to launch job runs, monitor run status, list available jobs ### `dagster_launch_run` -Launch a Dagster job run in your Dagster+ deployment. +Launch a job run on a Dagster instance. #### Input | Parameter | Type | Required | Description | | --------- | ---- | -------- | ----------- | -| `organizationName` | string | Yes | Dagster+ organization name \(subdomain, e.g., "myorg"\) | -| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | -| `apiKey` | string | Yes | Dagster Cloud API token | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3000\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | | `repositoryLocationName` | string | Yes | Repository location \(code location\) name | | `repositoryName` | string | Yes | Repository name within the code location | | `jobName` | string | Yes | Name of the job to launch | @@ -49,9 +48,8 @@ Get the status and details of a Dagster run by its ID. | Parameter | Type | Required | Description | | --------- | ---- | -------- | ----------- | -| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | -| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | -| `apiKey` | string | Yes | Dagster Cloud API token | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3000\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | | `runId` | string | Yes | The ID of the run to retrieve | #### Output @@ -66,6 +64,34 @@ Get the status and details of a Dagster run by its ID. | `runConfigYaml` | string | Run configuration as YAML | | `tags` | json | Run tags as array of \{key, value\} objects | +### `dagster_get_run_logs` + +Fetch execution event logs for a Dagster run. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `runId` | string | Yes | The ID of the run to fetch logs for | +| `afterCursor` | string | No | Cursor for paginating through log events \(from a previous response\) | +| `limit` | number | No | Maximum number of log events to return | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `events` | json | Array of log events \(type, message, timestamp, level, stepKey, eventType\) | +| ↳ `type` | string | GraphQL typename of the event | +| ↳ `message` | string | Human-readable log message | +| ↳ `timestamp` | string | Event timestamp as a Unix epoch string | +| ↳ `level` | string | Log level \(DEBUG, INFO, WARNING, ERROR, CRITICAL\) | +| ↳ `stepKey` | string | Step key, if the event is step-scoped | +| ↳ `eventType` | string | Dagster event type enum value | +| `cursor` | string | Cursor for fetching the next page of log events | +| `hasMore` | boolean | Whether more log events are available beyond this page | + ### `dagster_list_runs` List recent Dagster runs, optionally filtered by job name. @@ -74,48 +100,61 @@ List recent Dagster runs, optionally filtered by job name. | Parameter | Type | Required | Description | | --------- | ---- | -------- | ----------- | -| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | -| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | -| `apiKey` | string | Yes | Dagster Cloud API token | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | | `jobName` | string | No | Filter runs by job name \(optional\) | +| `statuses` | string | No | Comma-separated run statuses to filter by, e.g. "SUCCESS,FAILURE" \(optional\) | | `limit` | number | No | Maximum number of runs to return \(default 20\) | -| `cursor` | string | No | Pagination cursor from a previous list_runs response | #### Output | Parameter | Type | Description | | --------- | ---- | ----------- | -| `runs` | json | Array of runs with runId, jobName, status, startTime, endTime | +| `runs` | json | Array of runs | | ↳ `runId` | string | Run ID | | ↳ `jobName` | string | Job name | | ↳ `status` | string | Run status | +| ↳ `tags` | json | Run tags as array of \{key, value\} objects | | ↳ `startTime` | number | Start time as Unix timestamp | | ↳ `endTime` | number | End time as Unix timestamp | -| `cursor` | string | Pagination cursor to retrieve the next page | -| `hasMore` | boolean | Whether more runs are available | ### `dagster_list_jobs` -List all jobs across repositories in a Dagster+ deployment. +List all jobs across repositories in a Dagster instance. #### Input | Parameter | Type | Required | Description | | --------- | ---- | -------- | ----------- | -| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | -| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | -| `apiKey` | string | Yes | Dagster Cloud API token | -| `repositoryLocationName` | string | No | Filter by repository location name \(optional\) | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | #### Output | Parameter | Type | Description | | --------- | ---- | ----------- | -| `jobs` | json | Array of jobs with name, repositoryName, repositoryLocationName, and description | +| `jobs` | json | Array of jobs with name and repositoryName | | ↳ `name` | string | Job name | | ↳ `repositoryName` | string | Repository name | -| ↳ `repositoryLocationName` | string | Repository location name | -| ↳ `description` | string | Job description | + +### `dagster_reexecute_run` + +Reexecute an existing Dagster run, optionally resuming only from failed steps. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `parentRunId` | string | Yes | The ID of the run to reexecute | +| `strategy` | string | Yes | Reexecution strategy: ALL_STEPS reruns everything, FROM_FAILURE resumes from failed steps, FROM_ASSET_FAILURE resumes from failed assets | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | The ID of the newly launched reexecution run | ### `dagster_terminate_run` @@ -125,9 +164,8 @@ Terminate an in-progress Dagster run. | Parameter | Type | Required | Description | | --------- | ---- | -------- | ----------- | -| `organizationName` | string | Yes | Dagster+ organization name \(subdomain\) | -| `deploymentName` | string | Yes | Dagster+ deployment name \(e.g., "prod"\) | -| `apiKey` | string | Yes | Dagster Cloud API token | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | | `runId` | string | Yes | The ID of the run to terminate | #### Output @@ -138,4 +176,154 @@ Terminate an in-progress Dagster run. | `runId` | string | The ID of the terminated run | | `message` | string | Error or status message if termination failed | +### `dagster_delete_run` + +Permanently delete a Dagster run record. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `runId` | string | Yes | The ID of the run to delete | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `runId` | string | The ID of the deleted run | + +### `dagster_list_schedules` + +List all schedules in a Dagster repository, optionally filtered by status. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `scheduleStatus` | string | No | Filter schedules by status: RUNNING or STOPPED \(omit to return all\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `schedules` | json | Array of schedules \(name, cronSchedule, jobName, status, id, description, executionTimezone\) | +| ↳ `name` | string | Schedule name | +| ↳ `cronSchedule` | string | Cron expression for the schedule | +| ↳ `jobName` | string | Job the schedule targets | +| ↳ `status` | string | Schedule status: RUNNING or STOPPED | +| ↳ `id` | string | Instigator state ID — use this to start or stop the schedule | +| ↳ `description` | string | Human-readable schedule description | +| ↳ `executionTimezone` | string | Timezone for cron evaluation | + +### `dagster_start_schedule` + +Enable (start) a schedule in a Dagster repository. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `scheduleName` | string | Yes | Name of the schedule to start | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `id` | string | Instigator state ID of the schedule | +| `status` | string | Updated schedule status \(RUNNING or STOPPED\) | + +### `dagster_stop_schedule` + +Disable (stop) a running schedule in Dagster. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `instigationStateId` | string | Yes | InstigationState ID of the schedule to stop — available from dagster_list_schedules output | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `id` | string | Instigator state ID of the schedule | +| `status` | string | Updated schedule status \(RUNNING or STOPPED\) | + +### `dagster_list_sensors` + +List all sensors in a Dagster repository, optionally filtered by status. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `sensorStatus` | string | No | Filter sensors by status: RUNNING or STOPPED \(omit to return all\) | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `sensors` | json | Array of sensors \(name, sensorType, status, id, description\) | +| ↳ `name` | string | Sensor name | +| ↳ `sensorType` | string | Sensor type \(ASSET, AUTO_MATERIALIZE, FRESHNESS_POLICY, MULTI_ASSET, RUN_STATUS, STANDARD\) | +| ↳ `status` | string | Sensor status: RUNNING or STOPPED | +| ↳ `id` | string | Instigator state ID — use this to start or stop the sensor | +| ↳ `description` | string | Human-readable sensor description | + +### `dagster_start_sensor` + +Enable (start) a sensor in a Dagster repository. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `repositoryLocationName` | string | Yes | Repository location \(code location\) name | +| `repositoryName` | string | Yes | Repository name within the code location | +| `sensorName` | string | Yes | Name of the sensor to start | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `id` | string | Instigator state ID of the sensor | +| `status` | string | Updated sensor status \(RUNNING or STOPPED\) | + +### `dagster_stop_sensor` + +Disable (stop) a running sensor in Dagster. + +#### Input + +| Parameter | Type | Required | Description | +| --------- | ---- | -------- | ----------- | +| `host` | string | Yes | Dagster host URL \(e.g., https://myorg.dagster.cloud/prod or http://localhost:3001\) | +| `apiKey` | string | No | Dagster+ API token \(leave blank for OSS / self-hosted\) | +| `instigationStateId` | string | Yes | InstigationState ID of the sensor to stop — available from dagster_list_sensors output | + +#### Output + +| Parameter | Type | Description | +| --------- | ---- | ----------- | +| `id` | string | Instigator state ID of the sensor | +| `status` | string | Updated sensor status \(RUNNING or STOPPED\) | + diff --git a/apps/sim/app/(landing)/integrations/data/integrations.json b/apps/sim/app/(landing)/integrations/data/integrations.json index 2f30fed8d05..7108dacc096 100644 --- a/apps/sim/app/(landing)/integrations/data/integrations.json +++ b/apps/sim/app/(landing)/integrations/data/integrations.json @@ -2349,40 +2349,76 @@ "type": "dagster", "slug": "dagster", "name": "Dagster", - "description": "Orchestrate data pipelines and manage job runs on Dagster+", - "longDescription": "Connect to Dagster+ to launch job runs, monitor run status, list available jobs across repositories, and terminate in-progress runs. Requires a Dagster Cloud API token.", - "bgColor": "#191A23", + "description": "Orchestrate data pipelines and manage job runs with Dagster", + "longDescription": "Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, terminate or delete runs, reexecute failed runs, fetch run logs, and manage schedules and sensors. API token only required for Dagster+.", + "bgColor": "#ffffff", "iconName": "DagsterIcon", "docsUrl": "https://docs.sim.ai/tools/dagster", "operations": [ { "name": "Launch Run", - "description": "Launch a Dagster job run in your Dagster+ deployment." + "description": "Launch a job run on a Dagster instance." }, { "name": "Get Run", "description": "Get the status and details of a Dagster run by its ID." }, + { + "name": "Get Run Logs", + "description": "Fetch execution event logs for a Dagster run." + }, { "name": "List Runs", "description": "List recent Dagster runs, optionally filtered by job name." }, { "name": "List Jobs", - "description": "List all jobs across repositories in a Dagster+ deployment." + "description": "List all jobs across repositories in a Dagster instance." + }, + { + "name": "Reexecute Run", + "description": "Reexecute an existing Dagster run, optionally resuming only from failed steps." }, { "name": "Terminate Run", "description": "Terminate an in-progress Dagster run." + }, + { + "name": "Delete Run", + "description": "Permanently delete a Dagster run record." + }, + { + "name": "List Schedules", + "description": "List all schedules in a Dagster repository, optionally filtered by status." + }, + { + "name": "Start Schedule", + "description": "Enable (start) a schedule in a Dagster repository." + }, + { + "name": "Stop Schedule", + "description": "Disable (stop) a running schedule in Dagster." + }, + { + "name": "List Sensors", + "description": "List all sensors in a Dagster repository, optionally filtered by status." + }, + { + "name": "Start Sensor", + "description": "Enable (start) a sensor in a Dagster repository." + }, + { + "name": "Stop Sensor", + "description": "Disable (stop) a running sensor in Dagster." } ], - "operationCount": 5, + "operationCount": 14, "triggers": [], "triggerCount": 0, "authType": "api-key", "category": "tools", "integrationType": "automation", - "tags": ["data-analytics", "cloud", "automation"] + "tags": ["data-analytics", "automation"] }, { "type": "databricks", diff --git a/apps/sim/blocks/blocks/dagster.ts b/apps/sim/blocks/blocks/dagster.ts index fbf7541148f..11c68bdb145 100644 --- a/apps/sim/blocks/blocks/dagster.ts +++ b/apps/sim/blocks/blocks/dagster.ts @@ -8,15 +8,16 @@ export const DagsterBlock: BlockConfig = { name: 'Dagster', description: 'Orchestrate data pipelines and manage job runs with Dagster', longDescription: - 'Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, and terminate in-progress runs. API token only required for Dagster+.', + 'Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, terminate or delete runs, reexecute failed runs, fetch run logs, and manage schedules and sensors. API token only required for Dagster+.', docsLink: 'https://docs.sim.ai/tools/dagster', category: 'tools', integrationType: IntegrationType.Automation, tags: ['data-analytics', 'automation'], - bgColor: '#191A23', + bgColor: '#ffffff', icon: DagsterIcon, subBlocks: [ + // ── Operation selector ───────────────────────────────────────────────────── { id: 'operation', title: 'Operation', @@ -24,30 +25,53 @@ export const DagsterBlock: BlockConfig = { options: [ { label: 'Launch Run', id: 'launch_run' }, { label: 'Get Run', id: 'get_run' }, + { label: 'Get Run Logs', id: 'get_run_logs' }, { label: 'List Runs', id: 'list_runs' }, { label: 'List Jobs', id: 'list_jobs' }, + { label: 'Reexecute Run', id: 'reexecute_run' }, { label: 'Terminate Run', id: 'terminate_run' }, + { label: 'Delete Run', id: 'delete_run' }, + { label: 'List Schedules', id: 'list_schedules' }, + { label: 'Start Schedule', id: 'start_schedule' }, + { label: 'Stop Schedule', id: 'stop_schedule' }, + { label: 'List Sensors', id: 'list_sensors' }, + { label: 'Start Sensor', id: 'start_sensor' }, + { label: 'Stop Sensor', id: 'stop_sensor' }, ], value: () => 'launch_run', }, - // ── Launch Run ── + // ── Repository selectors (launch_run + schedule/sensor operations) ───────── { id: 'repositoryLocationName', title: 'Repository Location', type: 'short-input', placeholder: 'e.g., my_code_location', - condition: { field: 'operation', value: 'launch_run' }, - required: { field: 'operation', value: 'launch_run' }, + condition: { + field: 'operation', + value: ['launch_run', 'list_schedules', 'start_schedule', 'list_sensors', 'start_sensor'], + }, + required: { + field: 'operation', + value: ['launch_run', 'list_schedules', 'start_schedule', 'list_sensors', 'start_sensor'], + }, }, { id: 'repositoryName', title: 'Repository Name', type: 'short-input', placeholder: 'e.g., __repository__', - condition: { field: 'operation', value: 'launch_run' }, - required: { field: 'operation', value: 'launch_run' }, + condition: { + field: 'operation', + value: ['launch_run', 'list_schedules', 'start_schedule', 'list_sensors', 'start_sensor'], + }, + required: { + field: 'operation', + value: ['launch_run', 'list_schedules', 'start_schedule', 'list_sensors', 'start_sensor'], + }, }, + + // ── Launch Run ───────────────────────────────────────────────────────────── { id: 'jobName', title: 'Job Name', @@ -83,19 +107,72 @@ Return ONLY a valid JSON object - no explanations, no extra text.`, placeholder: '[{"key": "env", "value": "prod"}]', condition: { field: 'operation', value: 'launch_run' }, mode: 'advanced', + wandConfig: { + enabled: true, + prompt: `Generate a Dagster execution tags JSON array based on the user's description. + +Format: [{"key": "string", "value": "string"}, ...] + +Examples: +- "tag env as prod" -> [{"key": "env", "value": "prod"}] +- "mark as nightly run owned by data team" -> [{"key": "schedule", "value": "nightly"}, {"key": "owner", "value": "data-team"}] + +Return ONLY a valid JSON array - no explanations, no extra text.`, + placeholder: 'Describe the tags to attach to this run...', + generationType: 'json-object', + }, }, - // ── Get Run / Terminate Run ── + // ── Run ID (shared: get_run, get_run_logs, terminate_run, delete_run, reexecute_run) ── { id: 'runId', title: 'Run ID', type: 'short-input', placeholder: 'e.g., abc123def456', - condition: { field: 'operation', value: ['get_run', 'terminate_run'] }, - required: { field: 'operation', value: ['get_run', 'terminate_run'] }, + condition: { + field: 'operation', + value: ['get_run', 'get_run_logs', 'terminate_run', 'delete_run', 'reexecute_run'], + }, + required: { + field: 'operation', + value: ['get_run', 'get_run_logs', 'terminate_run', 'delete_run', 'reexecute_run'], + }, }, - // ── List Runs ── + // ── Reexecute Run ────────────────────────────────────────────────────────── + { + id: 'strategy', + title: 'Reexecution Strategy', + type: 'dropdown', + options: [ + { label: 'All Steps', id: 'ALL_STEPS' }, + { label: 'From Failure', id: 'FROM_FAILURE' }, + { label: 'From Asset Failure', id: 'FROM_ASSET_FAILURE' }, + ], + value: () => 'ALL_STEPS', + condition: { field: 'operation', value: 'reexecute_run' }, + required: { field: 'operation', value: 'reexecute_run' }, + }, + + // ── Get Run Logs ─────────────────────────────────────────────────────────── + { + id: 'afterCursor', + title: 'After Cursor', + type: 'short-input', + placeholder: 'Cursor from a previous get_run_logs response (for pagination)', + condition: { field: 'operation', value: 'get_run_logs' }, + mode: 'advanced', + }, + { + id: 'logsLimit', + title: 'Limit', + type: 'short-input', + placeholder: '100', + condition: { field: 'operation', value: 'get_run_logs' }, + mode: 'advanced', + }, + + // ── List Runs ────────────────────────────────────────────────────────────── { id: 'listRunsJobName', title: 'Job Name Filter', @@ -110,6 +187,20 @@ Return ONLY a valid JSON object - no explanations, no extra text.`, placeholder: 'e.g. SUCCESS,FAILURE (optional)', condition: { field: 'operation', value: 'list_runs' }, mode: 'advanced', + wandConfig: { + enabled: true, + prompt: `Generate a comma-separated list of Dagster run statuses to filter by. + +Valid statuses: QUEUED, NOT_STARTED, STARTING, MANAGED, STARTED, SUCCESS, FAILURE, CANCELING, CANCELED + +Examples: +- "only failed runs" -> FAILURE +- "completed runs (success or failure)" -> SUCCESS,FAILURE +- "runs in progress" -> QUEUED,NOT_STARTED,STARTING,STARTED + +Return ONLY the comma-separated status values - no explanations, no extra text.`, + placeholder: 'Describe which run statuses to include...', + }, }, { id: 'limit', @@ -120,7 +211,63 @@ Return ONLY a valid JSON object - no explanations, no extra text.`, mode: 'advanced', }, - // ── Connection (common) ── + // ── Schedule operations ──────────────────────────────────────────────────── + { + id: 'scheduleName', + title: 'Schedule Name', + type: 'short-input', + placeholder: 'e.g., my_daily_schedule', + condition: { field: 'operation', value: 'start_schedule' }, + required: { field: 'operation', value: 'start_schedule' }, + }, + { + id: 'scheduleStatus', + title: 'Status Filter', + type: 'dropdown', + options: [ + { label: 'All', id: '' }, + { label: 'Running', id: 'RUNNING' }, + { label: 'Stopped', id: 'STOPPED' }, + ], + value: () => '', + condition: { field: 'operation', value: 'list_schedules' }, + mode: 'advanced', + }, + + // ── Sensor operations ────────────────────────────────────────────────────── + { + id: 'sensorName', + title: 'Sensor Name', + type: 'short-input', + placeholder: 'e.g., my_asset_sensor', + condition: { field: 'operation', value: 'start_sensor' }, + required: { field: 'operation', value: 'start_sensor' }, + }, + { + id: 'sensorStatus', + title: 'Status Filter', + type: 'dropdown', + options: [ + { label: 'All', id: '' }, + { label: 'Running', id: 'RUNNING' }, + { label: 'Stopped', id: 'STOPPED' }, + ], + value: () => '', + condition: { field: 'operation', value: 'list_sensors' }, + mode: 'advanced', + }, + + // ── Stop schedule / sensor (shared) ──────────────────────────────────────── + { + id: 'instigationStateId', + title: 'Instigator State ID', + type: 'short-input', + placeholder: 'ID from list_schedules or list_sensors output', + condition: { field: 'operation', value: ['stop_schedule', 'stop_sensor'] }, + required: { field: 'operation', value: ['stop_schedule', 'stop_sensor'] }, + }, + + // ── Connection (common to all operations) ────────────────────────────────── { id: 'host', title: 'Host', @@ -141,17 +288,49 @@ Return ONLY a valid JSON object - no explanations, no extra text.`, access: [ 'dagster_launch_run', 'dagster_get_run', + 'dagster_get_run_logs', 'dagster_list_runs', 'dagster_list_jobs', + 'dagster_reexecute_run', 'dagster_terminate_run', + 'dagster_delete_run', + 'dagster_list_schedules', + 'dagster_start_schedule', + 'dagster_stop_schedule', + 'dagster_list_sensors', + 'dagster_start_sensor', + 'dagster_stop_sensor', ], config: { tool: (params) => `dagster_${params.operation}`, params: (params) => { const result: Record = {} - if (params.limit != null && params.limit !== '') result.limit = Number(params.limit) - // Map list_runs job name filter to the correct param - if (params.listRunsJobName) result.jobName = params.listRunsJobName + + // list_runs: type-coerce limit and remap job name filter + if (params.operation === 'list_runs') { + if (params.limit != null && params.limit !== '') result.limit = Number(params.limit) + if (params.listRunsJobName) result.jobName = params.listRunsJobName + } + + // get_run_logs: remap logsLimit → limit + if (params.operation === 'get_run_logs') { + if (params.logsLimit != null && params.logsLimit !== '') + result.limit = Number(params.logsLimit) + } + + // reexecute_run: remap runId → parentRunId + if (params.operation === 'reexecute_run') { + if (params.runId) result.parentRunId = params.runId + } + + // list_schedules / list_sensors: drop empty status filter + if (params.operation === 'list_schedules' && !params.scheduleStatus) { + result.scheduleStatus = undefined + } + if (params.operation === 'list_sensors' && !params.sensorStatus) { + result.sensorStatus = undefined + } + return result }, }, @@ -164,33 +343,81 @@ Return ONLY a valid JSON object - no explanations, no extra text.`, type: 'string', description: 'Dagster Cloud API token (optional for self-hosted instances)', }, + // Launch Run repositoryLocationName: { type: 'string', description: 'Repository location name' }, repositoryName: { type: 'string', description: 'Repository name' }, jobName: { type: 'string', description: 'Job name to launch' }, runConfigJson: { type: 'string', description: 'Run configuration as JSON' }, tags: { type: 'string', description: 'Tags as JSON array of {key, value} objects' }, + // Run ID operations runId: { type: 'string', description: 'Run ID' }, + // Reexecute Run + strategy: { + type: 'string', + description: 'Reexecution strategy (ALL_STEPS, FROM_FAILURE, FROM_ASSET_FAILURE)', + }, + // Get Run Logs + afterCursor: { type: 'string', description: 'Pagination cursor for run logs' }, + logsLimit: { type: 'number', description: 'Maximum log events to return' }, + // List Runs listRunsJobName: { type: 'string', description: 'Filter list_runs by job name' }, statuses: { type: 'string', description: 'Comma-separated run statuses to filter by' }, limit: { type: 'number', description: 'Maximum results to return' }, + // Schedules + scheduleName: { type: 'string', description: 'Schedule name' }, + scheduleStatus: { + type: 'string', + description: 'Filter schedules by status (RUNNING or STOPPED)', + }, + // Sensors + sensorName: { type: 'string', description: 'Sensor name' }, + sensorStatus: { type: 'string', description: 'Filter sensors by status (RUNNING or STOPPED)' }, + // Stop schedule / sensor + instigationStateId: { type: 'string', description: 'InstigationState ID for stop operations' }, }, outputs: { - // Launch Run - runId: { type: 'string', description: 'Launched or queried run ID' }, + // Launch Run / Reexecute Run / Delete Run / Get Run + runId: { type: 'string', description: 'Run ID' }, // Get Run jobName: { type: 'string', description: 'Job name the run belongs to' }, - status: { type: 'string', description: 'Run status' }, + status: { type: 'string', description: 'Run or schedule/sensor status' }, startTime: { type: 'number', description: 'Run start time (Unix timestamp)' }, endTime: { type: 'number', description: 'Run end time (Unix timestamp)' }, runConfigYaml: { type: 'string', description: 'Run configuration as YAML' }, tags: { type: 'json', description: 'Run tags as array of {key, value} objects' }, // List Runs - runs: { type: 'json', description: 'List of runs' }, + runs: { + type: 'json', + description: 'List of runs (runId, jobName, status, tags, startTime, endTime)', + }, // List Jobs - jobs: { type: 'json', description: 'List of jobs across all repositories' }, + jobs: { type: 'json', description: 'List of jobs (name, repositoryName)' }, // Terminate Run success: { type: 'boolean', description: 'Whether termination succeeded' }, message: { type: 'string', description: 'Termination status or error message' }, + // Get Run Logs + events: { + type: 'json', + description: 'Log events (type, message, timestamp, level, stepKey, eventType)', + }, + cursor: { type: 'string', description: 'Pagination cursor for the next page of logs' }, + hasMore: { + type: 'boolean', + description: 'Whether more log events are available beyond this page', + }, + // List Schedules + schedules: { + type: 'json', + description: + 'List of schedules (name, cronSchedule, jobName, status, id, description, executionTimezone)', + }, + // List Sensors + sensors: { + type: 'json', + description: 'List of sensors (name, sensorType, status, id, description)', + }, + // Start/Stop schedule or sensor + id: { type: 'string', description: 'Instigator state ID of the schedule or sensor' }, }, } diff --git a/apps/sim/components/icons.tsx b/apps/sim/components/icons.tsx index c4b927b2f7b..8b0ed3aabb1 100644 --- a/apps/sim/components/icons.tsx +++ b/apps/sim/components/icons.tsx @@ -4931,7 +4931,7 @@ export function SSHIcon(props: SVGProps) { export function DagsterIcon(props: SVGProps) { return ( - + = { + id: 'dagster_delete_run', + name: 'Dagster Delete Run', + description: 'Permanently delete a Dagster run record.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to delete', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: DELETE_RUN_MUTATION, + variables: { runId: params.runId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ deleteRun?: unknown }>(response) + + const result = data.data?.deleteRun as DeleteRunResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'DeletePipelineRunSuccess' && result.runId) { + return { + success: true, + output: { runId: result.runId }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Delete run failed')}`) + }, + + outputs: { + runId: { + type: 'string', + description: 'The ID of the deleted run', + }, + }, +} diff --git a/apps/sim/tools/dagster/get_run.ts b/apps/sim/tools/dagster/get_run.ts index b515d839a63..7fb2dd13859 100644 --- a/apps/sim/tools/dagster/get_run.ts +++ b/apps/sim/tools/dagster/get_run.ts @@ -1,5 +1,5 @@ -import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' import type { DagsterGetRunParams, DagsterGetRunResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' import type { ToolConfig } from '@/tools/types' /** Fields selected on `runOrError` when the union resolves to `Run`. */ diff --git a/apps/sim/tools/dagster/get_run_logs.ts b/apps/sim/tools/dagster/get_run_logs.ts new file mode 100644 index 00000000000..82270fac419 --- /dev/null +++ b/apps/sim/tools/dagster/get_run_logs.ts @@ -0,0 +1,162 @@ +import type { DagsterGetRunLogsParams, DagsterGetRunLogsResponse } from '@/tools/dagster/types' +import { parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface DagsterRunEvent { + __typename?: string + message?: string + timestamp?: string + level?: string + stepKey?: string | null + eventType?: string | null +} + +interface DagsterEventConnection { + events?: DagsterRunEvent[] + cursor?: string + hasMore?: boolean +} + +const GET_RUN_LOGS_QUERY = ` + query GetRunLogs($runId: ID!, $afterCursor: String, $limit: Int) { + logsForRun(runId: $runId, afterCursor: $afterCursor, limit: $limit) { + ... on EventConnection { + events { + __typename + ... on MessageEvent { + message + timestamp + level + stepKey + eventType + } + } + cursor + hasMore + } + ... on Error { + __typename + message + } + } + } +` + +export const getRunLogsTool: ToolConfig = { + id: 'dagster_get_run_logs', + name: 'Dagster Get Run Logs', + description: 'Fetch execution event logs for a Dagster run.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + runId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to fetch logs for', + }, + afterCursor: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Cursor for paginating through log events (from a previous response)', + }, + limit: { + type: 'number', + required: false, + visibility: 'user-or-llm', + description: 'Maximum number of log events to return', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const variables: Record = { runId: params.runId } + if (params.afterCursor) variables.afterCursor = params.afterCursor + if (params.limit != null) variables.limit = params.limit + return { query: GET_RUN_LOGS_QUERY, variables } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ logsForRun?: unknown }>(response) + + const result = data.data?.logsForRun as + | DagsterEventConnection + | { message?: string } + | undefined + if (!result || typeof result !== 'object') throw new Error('Unexpected response from Dagster') + + if (!('events' in result)) { + const errResult = result as { message?: string } + throw new Error(errResult.message ?? 'Failed to fetch run logs') + } + + const conn = result as DagsterEventConnection + const events = (conn.events ?? []).map((e) => ({ + type: e.__typename ?? 'Unknown', + message: e.message ?? '', + timestamp: e.timestamp ?? '', + level: e.level ?? 'INFO', + stepKey: e.stepKey ?? null, + eventType: e.eventType ?? null, + })) + + return { + success: true, + output: { + events, + cursor: conn.cursor ?? null, + hasMore: conn.hasMore ?? false, + }, + } + }, + + outputs: { + events: { + type: 'json', + description: 'Array of log events (type, message, timestamp, level, stepKey, eventType)', + properties: { + type: { type: 'string', description: 'GraphQL typename of the event' }, + message: { type: 'string', description: 'Human-readable log message' }, + timestamp: { type: 'string', description: 'Event timestamp as a Unix epoch string' }, + level: { type: 'string', description: 'Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)' }, + stepKey: { + type: 'string', + description: 'Step key, if the event is step-scoped', + optional: true, + }, + eventType: { type: 'string', description: 'Dagster event type enum value', optional: true }, + }, + }, + cursor: { + type: 'string', + description: 'Cursor for fetching the next page of log events', + optional: true, + }, + hasMore: { + type: 'boolean', + description: 'Whether more log events are available beyond this page', + }, + }, +} diff --git a/apps/sim/tools/dagster/index.ts b/apps/sim/tools/dagster/index.ts index d65455af2a5..d189f4a4ae0 100644 --- a/apps/sim/tools/dagster/index.ts +++ b/apps/sim/tools/dagster/index.ts @@ -1,7 +1,16 @@ +import { deleteRunTool } from '@/tools/dagster/delete_run' import { getRunTool } from '@/tools/dagster/get_run' +import { getRunLogsTool } from '@/tools/dagster/get_run_logs' import { launchRunTool } from '@/tools/dagster/launch_run' import { listJobsTool } from '@/tools/dagster/list_jobs' import { listRunsTool } from '@/tools/dagster/list_runs' +import { listSchedulesTool } from '@/tools/dagster/list_schedules' +import { listSensorsTool } from '@/tools/dagster/list_sensors' +import { reexecuteRunTool } from '@/tools/dagster/reexecute_run' +import { startScheduleTool } from '@/tools/dagster/start_schedule' +import { startSensorTool } from '@/tools/dagster/start_sensor' +import { stopScheduleTool } from '@/tools/dagster/stop_schedule' +import { stopSensorTool } from '@/tools/dagster/stop_sensor' import { terminateRunTool } from '@/tools/dagster/terminate_run' export const dagsterLaunchRunTool = launchRunTool @@ -9,3 +18,14 @@ export const dagsterGetRunTool = getRunTool export const dagsterListRunsTool = listRunsTool export const dagsterListJobsTool = listJobsTool export const dagsterTerminateRunTool = terminateRunTool +export const dagsterGetRunLogsTool = getRunLogsTool +export const dagsterReexecuteRunTool = reexecuteRunTool +export const dagsterDeleteRunTool = deleteRunTool +export const dagsterListSchedulesTool = listSchedulesTool +export const dagsterStartScheduleTool = startScheduleTool +export const dagsterStopScheduleTool = stopScheduleTool +export const dagsterListSensorsTool = listSensorsTool +export const dagsterStartSensorTool = startSensorTool +export const dagsterStopSensorTool = stopSensorTool + +export * from './types' diff --git a/apps/sim/tools/dagster/launch_run.ts b/apps/sim/tools/dagster/launch_run.ts index 847cad7344b..11f9f4f35fe 100644 --- a/apps/sim/tools/dagster/launch_run.ts +++ b/apps/sim/tools/dagster/launch_run.ts @@ -1,11 +1,16 @@ -import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' import type { DagsterLaunchRunParams, DagsterLaunchRunResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' import type { ToolConfig } from '@/tools/types' interface LaunchRunResult { type: string run?: { runId: string } message?: string + /** Present when type === 'InvalidStepError' */ + invalidStepKey?: string + /** Present when type === 'InvalidOutputError' */ + stepKey?: string + invalidOutputName?: string } function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { @@ -44,6 +49,15 @@ function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { __typename message } + ... on InvalidStepError { + __typename + invalidStepKey + } + ... on InvalidOutputError { + __typename + stepKey + invalidOutputName + } } } ` @@ -156,6 +170,16 @@ export const launchRunTool: ToolConfig = { + id: 'dagster_list_schedules', + name: 'Dagster List Schedules', + description: 'List all schedules in a Dagster repository, optionally filtered by status.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + scheduleStatus: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter schedules by status: RUNNING or STOPPED (omit to return all)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const hasStatus = Boolean(params.scheduleStatus) + const variables: Record = { + repositorySelector: { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + }, + } + if (hasStatus) variables.scheduleStatus = params.scheduleStatus + return { query: buildListSchedulesQuery(hasStatus), variables } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ schedulesOrError?: unknown }>(response) + + const result = data.data?.schedulesOrError as + | { results?: DagsterScheduleGraphql[]; message?: string } + | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (!Array.isArray(result.results)) { + throw new Error(dagsterUnionErrorMessage(result, 'List schedules failed')) + } + + const schedules = result.results.map((s) => ({ + name: s.name, + cronSchedule: s.cronSchedule ?? null, + jobName: s.pipelineName ?? null, + status: s.scheduleState?.status ?? 'UNKNOWN', + id: s.scheduleState?.id ?? null, + description: s.description ?? null, + executionTimezone: s.executionTimezone ?? null, + })) + + return { + success: true, + output: { schedules }, + } + }, + + outputs: { + schedules: { + type: 'json', + description: + 'Array of schedules (name, cronSchedule, jobName, status, id, description, executionTimezone)', + properties: { + name: { type: 'string', description: 'Schedule name' }, + cronSchedule: { type: 'string', description: 'Cron expression for the schedule' }, + jobName: { type: 'string', description: 'Job the schedule targets' }, + status: { type: 'string', description: 'Schedule status: RUNNING or STOPPED' }, + id: { + type: 'string', + description: 'Instigator state ID — use this to start or stop the schedule', + }, + description: { type: 'string', description: 'Human-readable schedule description' }, + executionTimezone: { type: 'string', description: 'Timezone for cron evaluation' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/list_sensors.ts b/apps/sim/tools/dagster/list_sensors.ts new file mode 100644 index 00000000000..737dcaaef60 --- /dev/null +++ b/apps/sim/tools/dagster/list_sensors.ts @@ -0,0 +1,146 @@ +import type { DagsterListSensorsParams, DagsterListSensorsResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface DagsterSensorGraphql { + name: string + sensorType: string | null + description: string | null + sensorState?: { + id: string + status: string + } | null +} + +function buildListSensorsQuery(hasStatus: boolean) { + return ` + query ListSensors($repositorySelector: RepositorySelector!${hasStatus ? ', $sensorStatus: InstigationStatus' : ''}) { + sensorsOrError(repositorySelector: $repositorySelector${hasStatus ? ', sensorStatus: $sensorStatus' : ''}) { + ... on Sensors { + results { + name + sensorType + description + sensorState { + id + status + } + } + } + ... on Error { + __typename + message + } + } + } + ` +} + +export const listSensorsTool: ToolConfig = { + id: 'dagster_list_sensors', + name: 'Dagster List Sensors', + description: 'List all sensors in a Dagster repository, optionally filtered by status.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + sensorStatus: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: 'Filter sensors by status: RUNNING or STOPPED (omit to return all)', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => { + const hasStatus = Boolean(params.sensorStatus) + const variables: Record = { + repositorySelector: { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + }, + } + if (hasStatus) variables.sensorStatus = params.sensorStatus + return { query: buildListSensorsQuery(hasStatus), variables } + }, + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ sensorsOrError?: unknown }>(response) + + const result = data.data?.sensorsOrError as + | { results?: DagsterSensorGraphql[]; message?: string } + | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (!Array.isArray(result.results)) { + throw new Error(dagsterUnionErrorMessage(result, 'List sensors failed')) + } + + const sensors = result.results.map((s) => ({ + name: s.name, + sensorType: s.sensorType ?? null, + status: s.sensorState?.status ?? 'UNKNOWN', + id: s.sensorState?.id ?? null, + description: s.description ?? null, + })) + + return { + success: true, + output: { sensors }, + } + }, + + outputs: { + sensors: { + type: 'json', + description: 'Array of sensors (name, sensorType, status, id, description)', + properties: { + name: { type: 'string', description: 'Sensor name' }, + sensorType: { + type: 'string', + description: + 'Sensor type (ASSET, AUTO_MATERIALIZE, FRESHNESS_POLICY, MULTI_ASSET, RUN_STATUS, STANDARD)', + }, + status: { type: 'string', description: 'Sensor status: RUNNING or STOPPED' }, + id: { + type: 'string', + description: 'Instigator state ID — use this to start or stop the sensor', + }, + description: { type: 'string', description: 'Human-readable sensor description' }, + }, + }, + }, +} diff --git a/apps/sim/tools/dagster/reexecute_run.ts b/apps/sim/tools/dagster/reexecute_run.ts new file mode 100644 index 00000000000..140751171d8 --- /dev/null +++ b/apps/sim/tools/dagster/reexecute_run.ts @@ -0,0 +1,137 @@ +import type { DagsterReexecuteRunParams, DagsterReexecuteRunResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface ReexecuteRunResult { + type: string + run?: { runId: string } + message?: string + /** Returned by InvalidStepError */ + invalidStepKey?: string + /** Returned by InvalidOutputError */ + invalidOutputName?: string + stepKey?: string + /** Returned by RunConfigValidationInvalid */ + errors?: Array<{ message: string }> +} + +const REEXECUTE_RUN_MUTATION = ` + mutation LaunchRunReexecution($parentRunId: String!, $strategy: ReexecutionStrategy!) { + launchRunReexecution( + reexecutionParams: { + parentRunId: $parentRunId + strategy: $strategy + } + ) { + type: __typename + ... on LaunchRunSuccess { + run { + runId + } + } + ... on InvalidStepError { + invalidStepKey + } + ... on InvalidOutputError { + stepKey + invalidOutputName + } + ... on RunConfigValidationInvalid { + errors { + message + } + } + ... on Error { + message + } + } + } +` + +export const reexecuteRunTool: ToolConfig = + { + id: 'dagster_reexecute_run', + name: 'Dagster Reexecute Run', + description: 'Reexecute an existing Dagster run, optionally resuming only from failed steps.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + parentRunId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'The ID of the run to reexecute', + }, + strategy: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: + 'Reexecution strategy: ALL_STEPS reruns everything, FROM_FAILURE resumes from failed steps, FROM_ASSET_FAILURE resumes from failed assets', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: REEXECUTE_RUN_MUTATION, + variables: { + parentRunId: params.parentRunId, + strategy: params.strategy, + }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ launchRunReexecution?: unknown }>(response) + + const result = data.data?.launchRunReexecution as ReexecuteRunResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'LaunchRunSuccess' && result.run) { + return { + success: true, + output: { runId: result.run.runId }, + } + } + + let detail: string + if (result.type === 'InvalidStepError' && result.invalidStepKey) { + detail = `Invalid step key: ${result.invalidStepKey}` + } else if (result.type === 'InvalidOutputError' && result.invalidOutputName) { + detail = `Invalid output "${result.invalidOutputName}" on step "${result.stepKey}"` + } else if (result.type === 'RunConfigValidationInvalid' && result.errors?.length) { + detail = result.errors.map((e) => e.message).join('; ') + } else { + detail = dagsterUnionErrorMessage(result, 'Reexecute run failed') + } + + throw new Error(`${result.type}: ${detail}`) + }, + + outputs: { + runId: { + type: 'string', + description: 'The ID of the newly launched reexecution run', + }, + }, + } diff --git a/apps/sim/tools/dagster/start_schedule.ts b/apps/sim/tools/dagster/start_schedule.ts new file mode 100644 index 00000000000..83ea7fdd1e3 --- /dev/null +++ b/apps/sim/tools/dagster/start_schedule.ts @@ -0,0 +1,135 @@ +import type { + DagsterScheduleMutationResponse, + DagsterStartScheduleParams, +} from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface ScheduleMutationResult { + type: string + scheduleState?: { + id: string + status: string + } + message?: string +} + +const START_SCHEDULE_MUTATION = ` + mutation StartSchedule($scheduleSelector: ScheduleSelector!) { + startSchedule(scheduleSelector: $scheduleSelector) { + type: __typename + ... on ScheduleStateResult { + scheduleState { + id + status + } + } + ... on UnauthorizedError { + __typename + message + } + ... on ScheduleNotFoundError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } +` + +export const startScheduleTool: ToolConfig< + DagsterStartScheduleParams, + DagsterScheduleMutationResponse +> = { + id: 'dagster_start_schedule', + name: 'Dagster Start Schedule', + description: 'Enable (start) a schedule in a Dagster repository.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + scheduleName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Name of the schedule to start', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: START_SCHEDULE_MUTATION, + variables: { + scheduleSelector: { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + scheduleName: params.scheduleName, + }, + }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ startSchedule?: unknown }>(response) + + const result = data.data?.startSchedule as ScheduleMutationResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'ScheduleStateResult' && result.scheduleState) { + return { + success: true, + output: { + id: result.scheduleState.id, + status: result.scheduleState.status, + }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Start schedule failed')}`) + }, + + outputs: { + id: { + type: 'string', + description: 'Instigator state ID of the schedule', + }, + status: { + type: 'string', + description: 'Updated schedule status (RUNNING or STOPPED)', + }, + }, +} diff --git a/apps/sim/tools/dagster/start_sensor.ts b/apps/sim/tools/dagster/start_sensor.ts new file mode 100644 index 00000000000..7173d52a794 --- /dev/null +++ b/apps/sim/tools/dagster/start_sensor.ts @@ -0,0 +1,122 @@ +import type { DagsterSensorMutationResponse, DagsterStartSensorParams } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface SensorMutationResult { + type: string + sensorState?: { + id: string + status: string + } + message?: string +} + +const START_SENSOR_MUTATION = ` + mutation StartSensor($sensorSelector: SensorSelector!) { + startSensor(sensorSelector: $sensorSelector) { + type: __typename + ... on Sensor { + sensorState { + id + status + } + } + ... on Error { + __typename + message + } + } + } +` + +export const startSensorTool: ToolConfig = + { + id: 'dagster_start_sensor', + name: 'Dagster Start Sensor', + description: 'Enable (start) a sensor in a Dagster repository.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + repositoryLocationName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository location (code location) name', + }, + repositoryName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Repository name within the code location', + }, + sensorName: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Name of the sensor to start', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: START_SENSOR_MUTATION, + variables: { + sensorSelector: { + repositoryLocationName: params.repositoryLocationName, + repositoryName: params.repositoryName, + sensorName: params.sensorName, + }, + }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ startSensor?: unknown }>(response) + + const result = data.data?.startSensor as SensorMutationResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'Sensor' && result.sensorState) { + return { + success: true, + output: { + id: result.sensorState.id, + status: result.sensorState.status, + }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Start sensor failed')}`) + }, + + outputs: { + id: { + type: 'string', + description: 'Instigator state ID of the sensor', + }, + status: { + type: 'string', + description: 'Updated sensor status (RUNNING or STOPPED)', + }, + }, + } diff --git a/apps/sim/tools/dagster/stop_schedule.ts b/apps/sim/tools/dagster/stop_schedule.ts new file mode 100644 index 00000000000..19a691876ac --- /dev/null +++ b/apps/sim/tools/dagster/stop_schedule.ts @@ -0,0 +1,118 @@ +import type { + DagsterScheduleMutationResponse, + DagsterStopScheduleParams, +} from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface ScheduleMutationResult { + type: string + scheduleState?: { + id: string + status: string + } + message?: string +} + +const STOP_SCHEDULE_MUTATION = ` + mutation StopSchedule($id: String!) { + stopRunningSchedule(id: $id) { + type: __typename + ... on ScheduleStateResult { + scheduleState { + id + status + } + } + ... on UnauthorizedError { + __typename + message + } + ... on ScheduleNotFoundError { + __typename + message + } + ... on PythonError { + __typename + message + } + } + } +` + +export const stopScheduleTool: ToolConfig< + DagsterStopScheduleParams, + DagsterScheduleMutationResponse +> = { + id: 'dagster_stop_schedule', + name: 'Dagster Stop Schedule', + description: 'Disable (stop) a running schedule in Dagster.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + instigationStateId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: + 'InstigationState ID of the schedule to stop — available from dagster_list_schedules output', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: STOP_SCHEDULE_MUTATION, + variables: { id: params.instigationStateId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ stopRunningSchedule?: unknown }>(response) + + const result = data.data?.stopRunningSchedule as ScheduleMutationResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'ScheduleStateResult' && result.scheduleState) { + return { + success: true, + output: { + id: result.scheduleState.id, + status: result.scheduleState.status, + }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Stop schedule failed')}`) + }, + + outputs: { + id: { + type: 'string', + description: 'Instigator state ID of the schedule', + }, + status: { + type: 'string', + description: 'Updated schedule status (RUNNING or STOPPED)', + }, + }, +} diff --git a/apps/sim/tools/dagster/stop_sensor.ts b/apps/sim/tools/dagster/stop_sensor.ts new file mode 100644 index 00000000000..3b0857fb817 --- /dev/null +++ b/apps/sim/tools/dagster/stop_sensor.ts @@ -0,0 +1,104 @@ +import type { DagsterSensorMutationResponse, DagsterStopSensorParams } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' +import type { ToolConfig } from '@/tools/types' + +interface StopSensorResult { + type: string + instigationState?: { + id: string + status: string + } + message?: string +} + +const STOP_SENSOR_MUTATION = ` + mutation StopSensor($id: String) { + stopSensor(id: $id) { + type: __typename + ... on StopSensorMutationResult { + instigationState { + id + status + } + } + ... on Error { + __typename + message + } + } + } +` + +export const stopSensorTool: ToolConfig = { + id: 'dagster_stop_sensor', + name: 'Dagster Stop Sensor', + description: 'Disable (stop) a running sensor in Dagster.', + version: '1.0.0', + + params: { + host: { + type: 'string', + required: true, + visibility: 'user-only', + description: + 'Dagster host URL (e.g., https://myorg.dagster.cloud/prod or http://localhost:3001)', + }, + apiKey: { + type: 'string', + required: false, + visibility: 'user-only', + description: 'Dagster+ API token (leave blank for OSS / self-hosted)', + }, + instigationStateId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: + 'InstigationState ID of the sensor to stop — available from dagster_list_sensors output', + }, + }, + + request: { + url: (params) => `${params.host.replace(/\/$/, '')}/graphql`, + method: 'POST', + headers: (params) => { + const headers: Record = { 'Content-Type': 'application/json' } + if (params.apiKey) headers['Dagster-Cloud-Api-Token'] = params.apiKey + return headers + }, + body: (params) => ({ + query: STOP_SENSOR_MUTATION, + variables: { id: params.instigationStateId }, + }), + }, + + transformResponse: async (response: Response) => { + const data = await parseDagsterGraphqlResponse<{ stopSensor?: unknown }>(response) + + const result = data.data?.stopSensor as StopSensorResult | undefined + if (!result) throw new Error('Unexpected response from Dagster') + + if (result.type === 'StopSensorMutationResult' && result.instigationState) { + return { + success: true, + output: { + id: result.instigationState.id, + status: result.instigationState.status, + }, + } + } + + throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Stop sensor failed')}`) + }, + + outputs: { + id: { + type: 'string', + description: 'Instigator state ID of the sensor', + }, + status: { + type: 'string', + description: 'Updated sensor status (RUNNING or STOPPED)', + }, + }, +} diff --git a/apps/sim/tools/dagster/terminate_run.ts b/apps/sim/tools/dagster/terminate_run.ts index 543b9195c4f..cab15651c66 100644 --- a/apps/sim/tools/dagster/terminate_run.ts +++ b/apps/sim/tools/dagster/terminate_run.ts @@ -1,8 +1,8 @@ -import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/graphql' import type { DagsterTerminateRunParams, DagsterTerminateRunResponse } from '@/tools/dagster/types' +import { dagsterUnionErrorMessage, parseDagsterGraphqlResponse } from '@/tools/dagster/utils' import type { ToolConfig } from '@/tools/types' -/** Fields returned from `terminateRun` for success and `Error` union members. */ +/** Fields returned from `terminateRun` for all union members. */ interface DagsterTerminateRunPayload { __typename?: string run?: { runId: string } @@ -12,13 +12,19 @@ interface DagsterTerminateRunPayload { const TERMINATE_RUN_MUTATION = ` mutation TerminateRun($runId: String!) { terminateRun(runId: $runId) { + __typename ... on TerminateRunSuccess { run { runId } } + ... on TerminateRunFailure { + run { + runId + } + message + } ... on Error { - __typename message } } @@ -76,7 +82,7 @@ export const terminateRunTool: ToolConfig + cursor: string | null + hasMore: boolean + } +} + +export interface DagsterReexecuteRunParams extends DagsterBaseParams { + parentRunId: string + strategy: string +} + +export interface DagsterReexecuteRunResponse extends ToolResponse { + output: { + runId: string + } +} + +export interface DagsterDeleteRunParams extends DagsterBaseParams { + runId: string +} + +export interface DagsterDeleteRunResponse extends ToolResponse { + output: { + runId: string + } +} + +export interface DagsterListSchedulesParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + scheduleStatus?: string +} + +export interface DagsterListSchedulesResponse extends ToolResponse { + output: { + schedules: Array<{ + name: string + cronSchedule: string | null + jobName: string | null + status: string + id: string | null + description: string | null + executionTimezone: string | null + }> + } +} + +export interface DagsterStartScheduleParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + scheduleName: string +} + +export interface DagsterScheduleMutationResponse extends ToolResponse { + output: { + id: string + status: string + } +} + +export interface DagsterStopScheduleParams extends DagsterBaseParams { + instigationStateId: string +} + +export interface DagsterListSensorsParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + sensorStatus?: string +} + +export interface DagsterListSensorsResponse extends ToolResponse { + output: { + sensors: Array<{ + name: string + sensorType: string | null + status: string + id: string | null + description: string | null + }> + } +} + +export interface DagsterStartSensorParams extends DagsterBaseParams { + repositoryLocationName: string + repositoryName: string + sensorName: string +} + +export interface DagsterSensorMutationResponse extends ToolResponse { + output: { + id: string + status: string + } +} + +export interface DagsterStopSensorParams extends DagsterBaseParams { + instigationStateId: string +} + export type DagsterResponse = | DagsterLaunchRunResponse | DagsterGetRunResponse | DagsterListRunsResponse | DagsterListJobsResponse | DagsterTerminateRunResponse + | DagsterGetRunLogsResponse + | DagsterReexecuteRunResponse + | DagsterDeleteRunResponse + | DagsterListSchedulesResponse + | DagsterScheduleMutationResponse + | DagsterListSensorsResponse + | DagsterSensorMutationResponse diff --git a/apps/sim/tools/dagster/graphql.ts b/apps/sim/tools/dagster/utils.ts similarity index 100% rename from apps/sim/tools/dagster/graphql.ts rename to apps/sim/tools/dagster/utils.ts diff --git a/apps/sim/tools/registry.ts b/apps/sim/tools/registry.ts index b7277e1c967..85d08da46c5 100644 --- a/apps/sim/tools/registry.ts +++ b/apps/sim/tools/registry.ts @@ -362,10 +362,19 @@ import { cursorStopAgentV2Tool, } from '@/tools/cursor' import { + dagsterDeleteRunTool, + dagsterGetRunLogsTool, dagsterGetRunTool, dagsterLaunchRunTool, dagsterListJobsTool, dagsterListRunsTool, + dagsterListSchedulesTool, + dagsterListSensorsTool, + dagsterReexecuteRunTool, + dagsterStartScheduleTool, + dagsterStartSensorTool, + dagsterStopScheduleTool, + dagsterStopSensorTool, dagsterTerminateRunTool, } from '@/tools/dagster' import { @@ -3451,10 +3460,19 @@ export const tools: Record = { devin_get_session: devinGetSessionTool, devin_list_sessions: devinListSessionsTool, devin_send_message: devinSendMessageTool, + dagster_delete_run: dagsterDeleteRunTool, dagster_get_run: dagsterGetRunTool, + dagster_get_run_logs: dagsterGetRunLogsTool, dagster_launch_run: dagsterLaunchRunTool, dagster_list_jobs: dagsterListJobsTool, dagster_list_runs: dagsterListRunsTool, + dagster_list_schedules: dagsterListSchedulesTool, + dagster_list_sensors: dagsterListSensorsTool, + dagster_reexecute_run: dagsterReexecuteRunTool, + dagster_start_schedule: dagsterStartScheduleTool, + dagster_start_sensor: dagsterStartSensorTool, + dagster_stop_schedule: dagsterStopScheduleTool, + dagster_stop_sensor: dagsterStopSensorTool, dagster_terminate_run: dagsterTerminateRunTool, databricks_cancel_run: databricksCancelRunTool, databricks_execute_sql: databricksExecuteSqlTool, From adf50c6f1c0fb604eb3006e322a954639f542c9c Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 7 Apr 2026 09:32:30 -0700 Subject: [PATCH 07/12] fix(dagster): replace invalid `... on Error` interface spreads with concrete union members MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - list_runs: InvalidPipelineRunsFilterError + PythonError - list_jobs: RepositoryNotFoundError + PythonError - reexecute_run: PipelineNotFoundError, RunConflict, UnauthorizedError, PythonError - terminate_run: RunNotFoundError, UnauthorizedError, PythonError - delete_run: RunNotFoundError, UnauthorizedError, PythonError - list_sensors: RepositoryNotFoundError + PythonError - start_sensor: SensorNotFoundError, UnauthorizedError, PythonError - stop_sensor: UnauthorizedError + PythonError - stop_schedule: fix $id variable type String! → String (matches nullable schema arg) - dagster.mdx: add manual intro description section --- apps/docs/content/docs/en/tools/dagster.mdx | 13 +++++++++++++ apps/sim/tools/dagster/delete_run.ts | 8 +++++++- apps/sim/tools/dagster/list_jobs.ts | 6 +++++- apps/sim/tools/dagster/list_runs.ts | 6 +++++- apps/sim/tools/dagster/list_sensors.ts | 6 +++++- apps/sim/tools/dagster/reexecute_run.ts | 11 ++++++++++- apps/sim/tools/dagster/start_sensor.ts | 10 +++++++++- apps/sim/tools/dagster/stop_schedule.ts | 2 +- apps/sim/tools/dagster/stop_sensor.ts | 6 +++++- apps/sim/tools/dagster/terminate_run.ts | 8 +++++++- 10 files changed, 67 insertions(+), 9 deletions(-) diff --git a/apps/docs/content/docs/en/tools/dagster.mdx b/apps/docs/content/docs/en/tools/dagster.mdx index b3f711b86d3..b3e18c3622d 100644 --- a/apps/docs/content/docs/en/tools/dagster.mdx +++ b/apps/docs/content/docs/en/tools/dagster.mdx @@ -10,6 +10,19 @@ import { BlockInfoCard } from "@/components/ui/block-info-card" color="#ffffff" /> +{/* MANUAL-CONTENT-START:intro */} +[Dagster](https://dagster.io/) is an open-source data orchestration platform designed for building, testing, and monitoring data pipelines. It provides a unified model for defining data assets, scheduling jobs, and observing pipeline execution — whether running locally or deployed to Dagster+. + +With Dagster, you can: + +- **Orchestrate data pipelines**: Define and run jobs composed of ops and assets with full dependency tracking +- **Monitor executions**: Track run status, inspect logs, and debug failures step by step +- **Manage schedules and sensors**: Automate pipeline triggers on a cron schedule or in response to external events +- **Reexecute selectively**: Resume failed pipelines from the point of failure without rerunning successful steps + +In Sim, the Dagster integration enables your agents to interact with a Dagster instance programmatically. Agents can launch and monitor job runs, retrieve execution logs, reexecute failed runs, and manage schedules and sensors — all as part of a larger automated workflow. Use Dagster as an orchestration layer your agents can control and observe, enabling data-driven automation that responds dynamically to pipeline outcomes. +{/* MANUAL-CONTENT-END */} + ## Usage Instructions Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, terminate or delete runs, reexecute failed runs, fetch run logs, and manage schedules and sensors. API token only required for Dagster+. diff --git a/apps/sim/tools/dagster/delete_run.ts b/apps/sim/tools/dagster/delete_run.ts index 2a66549f4fa..202bebecff4 100644 --- a/apps/sim/tools/dagster/delete_run.ts +++ b/apps/sim/tools/dagster/delete_run.ts @@ -15,7 +15,13 @@ const DELETE_RUN_MUTATION = ` ... on DeletePipelineRunSuccess { runId } - ... on Error { + ... on RunNotFoundError { + message + } + ... on UnauthorizedError { + message + } + ... on PythonError { message } } diff --git a/apps/sim/tools/dagster/list_jobs.ts b/apps/sim/tools/dagster/list_jobs.ts index 9b7122bc3d0..87df1cdae47 100644 --- a/apps/sim/tools/dagster/list_jobs.ts +++ b/apps/sim/tools/dagster/list_jobs.ts @@ -13,7 +13,11 @@ const LIST_JOBS_QUERY = ` } } } - ... on Error { + ... on RepositoryNotFoundError { + __typename + message + } + ... on PythonError { __typename message } diff --git a/apps/sim/tools/dagster/list_runs.ts b/apps/sim/tools/dagster/list_runs.ts index d11ae9c6f90..49bb06f927c 100644 --- a/apps/sim/tools/dagster/list_runs.ts +++ b/apps/sim/tools/dagster/list_runs.ts @@ -29,7 +29,11 @@ function buildListRunsQuery(hasFilter: boolean) { endTime } } - ... on Error { + ... on InvalidPipelineRunsFilterError { + __typename + message + } + ... on PythonError { __typename message } diff --git a/apps/sim/tools/dagster/list_sensors.ts b/apps/sim/tools/dagster/list_sensors.ts index 737dcaaef60..34372a6bdad 100644 --- a/apps/sim/tools/dagster/list_sensors.ts +++ b/apps/sim/tools/dagster/list_sensors.ts @@ -27,7 +27,11 @@ function buildListSensorsQuery(hasStatus: boolean) { } } } - ... on Error { + ... on RepositoryNotFoundError { + __typename + message + } + ... on PythonError { __typename message } diff --git a/apps/sim/tools/dagster/reexecute_run.ts b/apps/sim/tools/dagster/reexecute_run.ts index 140751171d8..a85ab1adc2e 100644 --- a/apps/sim/tools/dagster/reexecute_run.ts +++ b/apps/sim/tools/dagster/reexecute_run.ts @@ -41,7 +41,16 @@ const REEXECUTE_RUN_MUTATION = ` message } } - ... on Error { + ... on PipelineNotFoundError { + message + } + ... on RunConflict { + message + } + ... on UnauthorizedError { + message + } + ... on PythonError { message } } diff --git a/apps/sim/tools/dagster/start_sensor.ts b/apps/sim/tools/dagster/start_sensor.ts index 7173d52a794..66a07541dbd 100644 --- a/apps/sim/tools/dagster/start_sensor.ts +++ b/apps/sim/tools/dagster/start_sensor.ts @@ -21,7 +21,15 @@ const START_SENSOR_MUTATION = ` status } } - ... on Error { + ... on SensorNotFoundError { + __typename + message + } + ... on UnauthorizedError { + __typename + message + } + ... on PythonError { __typename message } diff --git a/apps/sim/tools/dagster/stop_schedule.ts b/apps/sim/tools/dagster/stop_schedule.ts index 19a691876ac..d6e89848ef2 100644 --- a/apps/sim/tools/dagster/stop_schedule.ts +++ b/apps/sim/tools/dagster/stop_schedule.ts @@ -15,7 +15,7 @@ interface ScheduleMutationResult { } const STOP_SCHEDULE_MUTATION = ` - mutation StopSchedule($id: String!) { + mutation StopSchedule($id: String) { stopRunningSchedule(id: $id) { type: __typename ... on ScheduleStateResult { diff --git a/apps/sim/tools/dagster/stop_sensor.ts b/apps/sim/tools/dagster/stop_sensor.ts index 3b0857fb817..cf31c26ac1e 100644 --- a/apps/sim/tools/dagster/stop_sensor.ts +++ b/apps/sim/tools/dagster/stop_sensor.ts @@ -21,7 +21,11 @@ const STOP_SENSOR_MUTATION = ` status } } - ... on Error { + ... on UnauthorizedError { + __typename + message + } + ... on PythonError { __typename message } diff --git a/apps/sim/tools/dagster/terminate_run.ts b/apps/sim/tools/dagster/terminate_run.ts index cab15651c66..30f5ccb0473 100644 --- a/apps/sim/tools/dagster/terminate_run.ts +++ b/apps/sim/tools/dagster/terminate_run.ts @@ -24,7 +24,13 @@ const TERMINATE_RUN_MUTATION = ` } message } - ... on Error { + ... on RunNotFoundError { + message + } + ... on UnauthorizedError { + message + } + ... on PythonError { message } } From 60fde9fc2ff52c02dd66ac8727a1c1e8854bd1bb Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 7 Apr 2026 09:32:55 -0700 Subject: [PATCH 08/12] docs --- apps/docs/content/docs/en/tools/dagster.mdx | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/docs/content/docs/en/tools/dagster.mdx b/apps/docs/content/docs/en/tools/dagster.mdx index b3e18c3622d..b82c1a7f4ab 100644 --- a/apps/docs/content/docs/en/tools/dagster.mdx +++ b/apps/docs/content/docs/en/tools/dagster.mdx @@ -23,6 +23,7 @@ With Dagster, you can: In Sim, the Dagster integration enables your agents to interact with a Dagster instance programmatically. Agents can launch and monitor job runs, retrieve execution logs, reexecute failed runs, and manage schedules and sensors — all as part of a larger automated workflow. Use Dagster as an orchestration layer your agents can control and observe, enabling data-driven automation that responds dynamically to pipeline outcomes. {/* MANUAL-CONTENT-END */} + ## Usage Instructions Connect to a Dagster instance to launch job runs, monitor run status, list available jobs across repositories, terminate or delete runs, reexecute failed runs, fetch run logs, and manage schedules and sensors. API token only required for Dagster+. From b736f8ba49e966e09ed1acc0c587f0a7a00eaa54 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 7 Apr 2026 10:34:49 -0700 Subject: [PATCH 09/12] fix(dagster): add RunConfigValidationInvalid handling to launch_run and use concrete error types --- apps/sim/tools/dagster/launch_run.ts | 30 ++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/apps/sim/tools/dagster/launch_run.ts b/apps/sim/tools/dagster/launch_run.ts index 11f9f4f35fe..2f2266483ab 100644 --- a/apps/sim/tools/dagster/launch_run.ts +++ b/apps/sim/tools/dagster/launch_run.ts @@ -11,6 +11,8 @@ interface LaunchRunResult { /** Present when type === 'InvalidOutputError' */ stepKey?: string invalidOutputName?: string + /** Present when type === 'RunConfigValidationInvalid' */ + errors?: Array<{ message: string }> } function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { @@ -45,10 +47,6 @@ function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { runId } } - ... on Error { - __typename - message - } ... on InvalidStepError { __typename invalidStepKey @@ -58,6 +56,23 @@ function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { stepKey invalidOutputName } + ... on RunConfigValidationInvalid { + errors { + message + } + } + ... on PipelineNotFoundError { + message + } + ... on RunConflict { + message + } + ... on UnauthorizedError { + message + } + ... on PythonError { + message + } } } ` @@ -170,8 +185,6 @@ export const launchRunTool: ToolConfig e.message).join('; ')}` + ) + } throw new Error(`${result.type}: ${dagsterUnionErrorMessage(result, 'Launch run failed')}`) }, From 30f34831961aaf5e97793a1d30f95a8103bf99ad Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 7 Apr 2026 10:45:11 -0700 Subject: [PATCH 10/12] fix(dagster): replace ... on Error with concrete RunNotFoundError + PythonError in get_run and get_run_logs --- apps/sim/tools/dagster/get_run.ts | 6 +++++- apps/sim/tools/dagster/get_run_logs.ts | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/apps/sim/tools/dagster/get_run.ts b/apps/sim/tools/dagster/get_run.ts index 7fb2dd13859..ca22ae2dfba 100644 --- a/apps/sim/tools/dagster/get_run.ts +++ b/apps/sim/tools/dagster/get_run.ts @@ -28,7 +28,11 @@ const GET_RUN_QUERY = ` value } } - ... on Error { + ... on RunNotFoundError { + __typename + message + } + ... on PythonError { __typename message } diff --git a/apps/sim/tools/dagster/get_run_logs.ts b/apps/sim/tools/dagster/get_run_logs.ts index 82270fac419..accbc24038f 100644 --- a/apps/sim/tools/dagster/get_run_logs.ts +++ b/apps/sim/tools/dagster/get_run_logs.ts @@ -34,7 +34,11 @@ const GET_RUN_LOGS_QUERY = ` cursor hasMore } - ... on Error { + ... on RunNotFoundError { + __typename + message + } + ... on PythonError { __typename message } From bc2b92a6e8776188a2de357eac2a6c85b76e54dc Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 7 Apr 2026 10:56:40 -0700 Subject: [PATCH 11/12] fix(dagster): add missing LaunchRunResult union members (InvalidSubsetError, PresetNotFoundError, ConflictingExecutionParamsError, NoModeProvidedError) --- apps/sim/tools/dagster/launch_run.ts | 12 ++++++++++++ apps/sim/tools/dagster/reexecute_run.ts | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/apps/sim/tools/dagster/launch_run.ts b/apps/sim/tools/dagster/launch_run.ts index 2f2266483ab..c71cb17d1d3 100644 --- a/apps/sim/tools/dagster/launch_run.ts +++ b/apps/sim/tools/dagster/launch_run.ts @@ -70,6 +70,18 @@ function buildLaunchRunMutation(hasConfig: boolean, hasTags: boolean) { ... on UnauthorizedError { message } + ... on InvalidSubsetError { + message + } + ... on PresetNotFoundError { + message + } + ... on ConflictingExecutionParamsError { + message + } + ... on NoModeProvidedError { + message + } ... on PythonError { message } diff --git a/apps/sim/tools/dagster/reexecute_run.ts b/apps/sim/tools/dagster/reexecute_run.ts index a85ab1adc2e..f3ce6d3a366 100644 --- a/apps/sim/tools/dagster/reexecute_run.ts +++ b/apps/sim/tools/dagster/reexecute_run.ts @@ -50,6 +50,18 @@ const REEXECUTE_RUN_MUTATION = ` ... on UnauthorizedError { message } + ... on InvalidSubsetError { + message + } + ... on PresetNotFoundError { + message + } + ... on ConflictingExecutionParamsError { + message + } + ... on NoModeProvidedError { + message + } ... on PythonError { message } From 7e26b13edb426170c5f6dfd8dbbc01b267ba0533 Mon Sep 17 00:00:00 2001 From: waleed Date: Tue, 7 Apr 2026 11:18:13 -0700 Subject: [PATCH 12/12] fix(dagster): always override jobName in list_runs params to prevent stale launch_run value leaking --- apps/sim/blocks/blocks/dagster.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/blocks/blocks/dagster.ts b/apps/sim/blocks/blocks/dagster.ts index 11c68bdb145..4121950b2e9 100644 --- a/apps/sim/blocks/blocks/dagster.ts +++ b/apps/sim/blocks/blocks/dagster.ts @@ -309,7 +309,7 @@ Return ONLY the comma-separated status values - no explanations, no extra text.` // list_runs: type-coerce limit and remap job name filter if (params.operation === 'list_runs') { if (params.limit != null && params.limit !== '') result.limit = Number(params.limit) - if (params.listRunsJobName) result.jobName = params.listRunsJobName + result.jobName = params.listRunsJobName || undefined } // get_run_logs: remap logsLimit → limit