Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions acceptance/cmd/workspace/jobs/runs-submit/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions acceptance/cmd/workspace/jobs/runs-submit/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

>>> [CLI] jobs submit --no-wait --json {
"run_name": "ssh-tunnel",
"tasks": [
{
"task_key": "main",
"environment_key": "default",
"notebook_task": {
"notebook_path": "/Workspace/notebook"
}
}
],
"environments": [
{
"environment_key": "default",
"spec": {
"base_environment": "workspace-base-environments/databricks_ai_v4",
"environment_version": "4"
}
}
]
}
{
"run_id": [NUMID]
}

>>> print_requests.py //jobs/runs/submit
{
"method": "POST",
"path": "/api/2.2/jobs/runs/submit",
"body": {
"environments": [
{
"environment_key": "default",
"spec": {
"base_environment": "workspace-base-environments/databricks_ai_v4",
"environment_version": "4"
}
}
],
"run_name": "ssh-tunnel",
"tasks": [
{
"environment_key": "default",
"notebook_task": {
"notebook_path": "/Workspace/notebook"
},
"task_key": "main"
}
]
}
}
25 changes: 25 additions & 0 deletions acceptance/cmd/workspace/jobs/runs-submit/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
trace $CLI jobs submit --no-wait --json '{
"run_name": "ssh-tunnel",
"tasks": [
{
"task_key": "main",
"environment_key": "default",
"notebook_task": {
"notebook_path": "/Workspace/notebook"
}
}
],
"environments": [
{
"environment_key": "default",
"spec": {
"base_environment": "workspace-base-environments/databricks_ai_v4",
"environment_version": "4"
}
}
]
}'

# The test server records the submitted spec so tests can assert that
# base_environment is present in the environments block.
trace print_requests.py //jobs/runs/submit
7 changes: 7 additions & 0 deletions acceptance/cmd/workspace/jobs/runs-submit/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Local = true
Cloud = false

RecordRequests = true

# This command does not touch the bundle engine; run it once.
EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"]
4 changes: 4 additions & 0 deletions libs/testserver/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ func AddDefaultHandlers(server *Server) {
return req.Workspace.JobsRunNow(req)
})

server.Handle("POST", "/api/2.2/jobs/runs/submit", func(req Request) any {
return req.Workspace.JobsSubmit(req)
})

server.Handle("GET", "/api/2.2/jobs/runs/get", func(req Request) any {
return req.Workspace.JobsGetRun(req)
})
Expand Down
58 changes: 58 additions & 0 deletions libs/testserver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,64 @@ func (s *FakeWorkspace) JobsRunNow(req Request) Response {
return Response{Body: jobs.RunNowResponse{RunId: runId}}
}

// JobsSubmit handles jobs/runs/submit, the one-time run endpoint used by
// `databricks ssh connect` (via client.Jobs.Submit) and the generic
// `databricks jobs submit` command. It records the submitted spec and returns a
// run ID so acceptance tests can assert the request body (e.g. the serverless
// environments / base_environment) and poll runs/get for the resulting run.
//
// Unlike JobsRunNow, the submitted tasks are not executed locally: the SSH
// bootstrap submits a notebook task that only exists in the workspace, and the
// value of this handler for tests is the recorded request, not task output.
func (s *FakeWorkspace) JobsSubmit(req Request) Response {
var request jobs.SubmitRun
if err := json.Unmarshal(req.Body, &request); err != nil {
return Response{
StatusCode: 400,
Body: fmt.Sprintf("request parsing error: %s", err),
}
}
if response := validateJobGitSource(request.GitSource); response != nil {
return *response
}

defer s.LockUnlock()()

runId := nextID()

// The default run name for one-time runs is "Untitled" (Jobs API behavior).
runName := cmp.Or(request.RunName, "Untitled")

// Report each task as RUNNING in both the V1 (state) and V2 (status) shapes.
// The generic `jobs submit` waiter polls the V1 run-level state, which
// JobsGetRun drives to TERMINATED on the next poll, while `ssh connect`'s
// waitForJobToStart polls the V2 per-task status.
var tasks []jobs.RunTask
for _, t := range request.Tasks {
tasks = append(tasks, jobs.RunTask{
RunId: nextID(),
TaskKey: t.TaskKey,
State: &jobs.RunState{
LifeCycleState: jobs.RunLifeCycleStateRunning,
},
Status: &jobs.RunStatus{
State: jobs.RunLifecycleStateV2StateRunning,
},
})
}

s.JobRuns[runId] = jobs.Run{
RunId: runId,
State: &jobs.RunState{LifeCycleState: jobs.RunLifeCycleStateRunning},
RunPageUrl: fmt.Sprintf("%s/?o=900800700600#job/run/%d", s.url, runId),
RunType: jobs.RunTypeSubmitRun,
RunName: runName,
Tasks: tasks,
}

return Response{Body: jobs.SubmitRunResponse{RunId: runId}}
}

// executePythonWheelTask runs a python wheel task locally using uv.
// For tasks using existing_cluster_id, the venv is cached per cluster to match
// cloud behavior where libraries are cached on running clusters.
Expand Down
102 changes: 102 additions & 0 deletions libs/testserver/jobs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package testserver

import (
"encoding/json"
"net/url"
"strconv"
"testing"

"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func submitRun(t *testing.T, workspace *FakeWorkspace, request jobs.SubmitRun) jobs.SubmitRunResponse {
t.Helper()
body, err := json.Marshal(request)
require.NoError(t, err)

response := workspace.JobsSubmit(Request{Body: body})
require.Equal(t, 0, response.StatusCode)

submitResponse, ok := response.Body.(jobs.SubmitRunResponse)
require.True(t, ok)
return submitResponse
}

func getRun(t *testing.T, workspace *FakeWorkspace, runID int64) jobs.Run {
t.Helper()
response := workspace.JobsGetRun(Request{
URL: &url.URL{RawQuery: "run_id=" + strconv.FormatInt(runID, 10)},
})
require.Equal(t, 0, response.StatusCode)

run, ok := response.Body.(jobs.Run)
require.True(t, ok)
return run
}

func TestJobsSubmit_RecordsRunAndReportsRunningTasks(t *testing.T) {
workspace := NewFakeWorkspace("http://test", "dbapi123")

submitResponse := submitRun(t, workspace, jobs.SubmitRun{
RunName: "ssh-tunnel",
Tasks: []jobs.SubmitTask{
{TaskKey: "main", EnvironmentKey: "default"},
},
})
require.NotZero(t, submitResponse.RunId)

run := getRun(t, workspace, submitResponse.RunId)
assert.Equal(t, "ssh-tunnel", run.RunName)
assert.Equal(t, jobs.RunTypeSubmitRun, run.RunType)

require.Len(t, run.Tasks, 1)
task := run.Tasks[0]
assert.Equal(t, "main", task.TaskKey)
// ssh connect's waitForJobToStart polls the V2 per-task status.
require.NotNil(t, task.Status)
assert.Equal(t, jobs.RunLifecycleStateV2StateRunning, task.Status.State)
}

func TestJobsSubmit_DefaultsRunNameToUntitled(t *testing.T) {
workspace := NewFakeWorkspace("http://test", "dbapi123")

submitResponse := submitRun(t, workspace, jobs.SubmitRun{
Tasks: []jobs.SubmitTask{{TaskKey: "main"}},
})

run := getRun(t, workspace, submitResponse.RunId)
assert.Equal(t, "Untitled", run.RunName)
}

func TestJobsSubmit_RunReachesTerminalStateOnPoll(t *testing.T) {
workspace := NewFakeWorkspace("http://test", "dbapi123")

submitResponse := submitRun(t, workspace, jobs.SubmitRun{
Tasks: []jobs.SubmitTask{{TaskKey: "main"}},
})

// The generic `jobs submit` waiter polls the V1 run-level state: RUNNING first,
// then TERMINATED/SUCCESS.
first := getRun(t, workspace, submitResponse.RunId)
assert.Equal(t, jobs.RunLifeCycleStateRunning, first.State.LifeCycleState)

second := getRun(t, workspace, submitResponse.RunId)
assert.Equal(t, jobs.RunLifeCycleStateTerminated, second.State.LifeCycleState)
assert.Equal(t, jobs.RunResultStateSuccess, second.State.ResultState)
}

func TestJobsSubmit_RejectsInvalidGitProvider(t *testing.T) {
workspace := NewFakeWorkspace("http://test", "dbapi123")

body, err := json.Marshal(jobs.SubmitRun{
GitSource: &jobs.GitSource{GitUrl: "https://example.com/repo"},
Tasks: []jobs.SubmitTask{{TaskKey: "main"}},
})
require.NoError(t, err)

response := workspace.JobsSubmit(Request{Body: body})
assert.Equal(t, 400, response.StatusCode)
assert.Equal(t, missingJobGitProviderMessage, response.Body.(map[string]string)["message"])
}
Loading