From e9c8a8797fa1853da9da4de616fef5e02ca81d23 Mon Sep 17 00:00:00 2001 From: Anton Nekipelov <226657+anton-107@users.noreply.github.com> Date: Wed, 17 Jun 2026 15:41:46 +0000 Subject: [PATCH] Add jobs/runs/submit handler to testserver The test server only handled jobs/create, run-now, and runs.get/get-output/list, so commands that call client.Jobs.Submit (the one-time run endpoint) could not be exercised in local acceptance tests. Add a JobsSubmit handler that records the submitted spec, returns a run ID, and stores a retrievable run. Submitted tasks are reported as RUNNING in both the V1 run state (so the `jobs submit` waiter is driven to TERMINATED by JobsGetRun) and the V2 per-task status, and are not executed locally. This lets acceptance tests assert the submitted environments spec (e.g. base_environment) via recorded requests. Includes unit tests for the handler and an acceptance test for `jobs submit`. Co-authored-by: Isaac --- .../workspace/jobs/runs-submit/out.test.toml | 3 + .../cmd/workspace/jobs/runs-submit/output.txt | 52 +++++++++ .../cmd/workspace/jobs/runs-submit/script | 25 +++++ .../cmd/workspace/jobs/runs-submit/test.toml | 7 ++ libs/testserver/handlers.go | 4 + libs/testserver/jobs.go | 58 ++++++++++ libs/testserver/jobs_test.go | 102 ++++++++++++++++++ 7 files changed, 251 insertions(+) create mode 100644 acceptance/cmd/workspace/jobs/runs-submit/out.test.toml create mode 100644 acceptance/cmd/workspace/jobs/runs-submit/output.txt create mode 100644 acceptance/cmd/workspace/jobs/runs-submit/script create mode 100644 acceptance/cmd/workspace/jobs/runs-submit/test.toml create mode 100644 libs/testserver/jobs_test.go diff --git a/acceptance/cmd/workspace/jobs/runs-submit/out.test.toml b/acceptance/cmd/workspace/jobs/runs-submit/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/cmd/workspace/jobs/runs-submit/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/cmd/workspace/jobs/runs-submit/output.txt b/acceptance/cmd/workspace/jobs/runs-submit/output.txt new file mode 100644 index 00000000000..ad78389be4c --- /dev/null +++ b/acceptance/cmd/workspace/jobs/runs-submit/output.txt @@ -0,0 +1,52 @@ + +>>> [CLI] jobs submit --no-wait --json { + "run_name": "ssh-tunnel", + "tasks": [ + { + "task_key": "main", + "environment_key": "default", + "notebook_task": { + "notebook_path": "/Workspace/notebook" + } + } + ], + "environments": [ + { + "environment_key": "default", + "spec": { + "base_environment": "workspace-base-environments/databricks_ai_v4", + "environment_version": "4" + } + } + ] +} +{ + "run_id": [NUMID] +} + +>>> print_requests.py //jobs/runs/submit +{ + "method": "POST", + "path": "/api/2.2/jobs/runs/submit", + "body": { + "environments": [ + { + "environment_key": "default", + "spec": { + "base_environment": "workspace-base-environments/databricks_ai_v4", + "environment_version": "4" + } + } + ], + "run_name": "ssh-tunnel", + "tasks": [ + { + "environment_key": "default", + "notebook_task": { + "notebook_path": "/Workspace/notebook" + }, + "task_key": "main" + } + ] + } +} diff --git a/acceptance/cmd/workspace/jobs/runs-submit/script b/acceptance/cmd/workspace/jobs/runs-submit/script new file mode 100644 index 00000000000..68c7a47b6fc --- /dev/null +++ b/acceptance/cmd/workspace/jobs/runs-submit/script @@ -0,0 +1,25 @@ +trace $CLI jobs submit --no-wait --json '{ + "run_name": "ssh-tunnel", + "tasks": [ + { + "task_key": "main", + "environment_key": "default", + "notebook_task": { + "notebook_path": "/Workspace/notebook" + } + } + ], + "environments": [ + { + "environment_key": "default", + "spec": { + "base_environment": "workspace-base-environments/databricks_ai_v4", + "environment_version": "4" + } + } + ] +}' + +# The test server records the submitted spec so tests can assert that +# base_environment is present in the environments block. +trace print_requests.py //jobs/runs/submit diff --git a/acceptance/cmd/workspace/jobs/runs-submit/test.toml b/acceptance/cmd/workspace/jobs/runs-submit/test.toml new file mode 100644 index 00000000000..9c561a4e32f --- /dev/null +++ b/acceptance/cmd/workspace/jobs/runs-submit/test.toml @@ -0,0 +1,7 @@ +Local = true +Cloud = false + +RecordRequests = true + +# This command does not touch the bundle engine; run it once. +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index 8f611a7c7c9..4a9e5698684 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -253,6 +253,10 @@ func AddDefaultHandlers(server *Server) { return req.Workspace.JobsRunNow(req) }) + server.Handle("POST", "/api/2.2/jobs/runs/submit", func(req Request) any { + return req.Workspace.JobsSubmit(req) + }) + server.Handle("GET", "/api/2.2/jobs/runs/get", func(req Request) any { return req.Workspace.JobsGetRun(req) }) diff --git a/libs/testserver/jobs.go b/libs/testserver/jobs.go index e9468f7f681..749d34ba231 100644 --- a/libs/testserver/jobs.go +++ b/libs/testserver/jobs.go @@ -336,6 +336,64 @@ func (s *FakeWorkspace) JobsRunNow(req Request) Response { return Response{Body: jobs.RunNowResponse{RunId: runId}} } +// JobsSubmit handles jobs/runs/submit, the one-time run endpoint used by +// `databricks ssh connect` (via client.Jobs.Submit) and the generic +// `databricks jobs submit` command. It records the submitted spec and returns a +// run ID so acceptance tests can assert the request body (e.g. the serverless +// environments / base_environment) and poll runs/get for the resulting run. +// +// Unlike JobsRunNow, the submitted tasks are not executed locally: the SSH +// bootstrap submits a notebook task that only exists in the workspace, and the +// value of this handler for tests is the recorded request, not task output. +func (s *FakeWorkspace) JobsSubmit(req Request) Response { + var request jobs.SubmitRun + if err := json.Unmarshal(req.Body, &request); err != nil { + return Response{ + StatusCode: 400, + Body: fmt.Sprintf("request parsing error: %s", err), + } + } + if response := validateJobGitSource(request.GitSource); response != nil { + return *response + } + + defer s.LockUnlock()() + + runId := nextID() + + // The default run name for one-time runs is "Untitled" (Jobs API behavior). + runName := cmp.Or(request.RunName, "Untitled") + + // Report each task as RUNNING in both the V1 (state) and V2 (status) shapes. + // The generic `jobs submit` waiter polls the V1 run-level state, which + // JobsGetRun drives to TERMINATED on the next poll, while `ssh connect`'s + // waitForJobToStart polls the V2 per-task status. + var tasks []jobs.RunTask + for _, t := range request.Tasks { + tasks = append(tasks, jobs.RunTask{ + RunId: nextID(), + TaskKey: t.TaskKey, + State: &jobs.RunState{ + LifeCycleState: jobs.RunLifeCycleStateRunning, + }, + Status: &jobs.RunStatus{ + State: jobs.RunLifecycleStateV2StateRunning, + }, + }) + } + + s.JobRuns[runId] = jobs.Run{ + RunId: runId, + State: &jobs.RunState{LifeCycleState: jobs.RunLifeCycleStateRunning}, + RunPageUrl: fmt.Sprintf("%s/?o=900800700600#job/run/%d", s.url, runId), + RunType: jobs.RunTypeSubmitRun, + RunName: runName, + Tasks: tasks, + } + + return Response{Body: jobs.SubmitRunResponse{RunId: runId}} +} + // executePythonWheelTask runs a python wheel task locally using uv. // For tasks using existing_cluster_id, the venv is cached per cluster to match // cloud behavior where libraries are cached on running clusters. diff --git a/libs/testserver/jobs_test.go b/libs/testserver/jobs_test.go new file mode 100644 index 00000000000..38e47b68b08 --- /dev/null +++ b/libs/testserver/jobs_test.go @@ -0,0 +1,102 @@ +package testserver + +import ( + "encoding/json" + "net/url" + "strconv" + "testing" + + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func submitRun(t *testing.T, workspace *FakeWorkspace, request jobs.SubmitRun) jobs.SubmitRunResponse { + t.Helper() + body, err := json.Marshal(request) + require.NoError(t, err) + + response := workspace.JobsSubmit(Request{Body: body}) + require.Equal(t, 0, response.StatusCode) + + submitResponse, ok := response.Body.(jobs.SubmitRunResponse) + require.True(t, ok) + return submitResponse +} + +func getRun(t *testing.T, workspace *FakeWorkspace, runID int64) jobs.Run { + t.Helper() + response := workspace.JobsGetRun(Request{ + URL: &url.URL{RawQuery: "run_id=" + strconv.FormatInt(runID, 10)}, + }) + require.Equal(t, 0, response.StatusCode) + + run, ok := response.Body.(jobs.Run) + require.True(t, ok) + return run +} + +func TestJobsSubmit_RecordsRunAndReportsRunningTasks(t *testing.T) { + workspace := NewFakeWorkspace("http://test", "dbapi123") + + submitResponse := submitRun(t, workspace, jobs.SubmitRun{ + RunName: "ssh-tunnel", + Tasks: []jobs.SubmitTask{ + {TaskKey: "main", EnvironmentKey: "default"}, + }, + }) + require.NotZero(t, submitResponse.RunId) + + run := getRun(t, workspace, submitResponse.RunId) + assert.Equal(t, "ssh-tunnel", run.RunName) + assert.Equal(t, jobs.RunTypeSubmitRun, run.RunType) + + require.Len(t, run.Tasks, 1) + task := run.Tasks[0] + assert.Equal(t, "main", task.TaskKey) + // ssh connect's waitForJobToStart polls the V2 per-task status. + require.NotNil(t, task.Status) + assert.Equal(t, jobs.RunLifecycleStateV2StateRunning, task.Status.State) +} + +func TestJobsSubmit_DefaultsRunNameToUntitled(t *testing.T) { + workspace := NewFakeWorkspace("http://test", "dbapi123") + + submitResponse := submitRun(t, workspace, jobs.SubmitRun{ + Tasks: []jobs.SubmitTask{{TaskKey: "main"}}, + }) + + run := getRun(t, workspace, submitResponse.RunId) + assert.Equal(t, "Untitled", run.RunName) +} + +func TestJobsSubmit_RunReachesTerminalStateOnPoll(t *testing.T) { + workspace := NewFakeWorkspace("http://test", "dbapi123") + + submitResponse := submitRun(t, workspace, jobs.SubmitRun{ + Tasks: []jobs.SubmitTask{{TaskKey: "main"}}, + }) + + // The generic `jobs submit` waiter polls the V1 run-level state: RUNNING first, + // then TERMINATED/SUCCESS. + first := getRun(t, workspace, submitResponse.RunId) + assert.Equal(t, jobs.RunLifeCycleStateRunning, first.State.LifeCycleState) + + second := getRun(t, workspace, submitResponse.RunId) + assert.Equal(t, jobs.RunLifeCycleStateTerminated, second.State.LifeCycleState) + assert.Equal(t, jobs.RunResultStateSuccess, second.State.ResultState) +} + +func TestJobsSubmit_RejectsInvalidGitProvider(t *testing.T) { + workspace := NewFakeWorkspace("http://test", "dbapi123") + + body, err := json.Marshal(jobs.SubmitRun{ + GitSource: &jobs.GitSource{GitUrl: "https://example.com/repo"}, + Tasks: []jobs.SubmitTask{{TaskKey: "main"}}, + }) + require.NoError(t, err) + + response := workspace.JobsSubmit(Request{Body: body}) + assert.Equal(t, 400, response.StatusCode) + assert.Equal(t, missingJobGitProviderMessage, response.Body.(map[string]string)["message"]) +}