Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Local = false
Local = true
CloudSlow = true
RecordRequests = false
RunsOnDbr = true
Expand Down
2 changes: 2 additions & 0 deletions acceptance/bundle/run/basic/output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Run URL: [DATABRICKS_URL]/jobs/[NUMID]/runs/[NUMID]?o=[NUMID]

[TIMESTAMP] "foo" RUNNING
[TIMESTAMP] "foo" TERMINATED SUCCESS
1

=== no resource key with --
>>> [CLI] bundle run --
Expand All @@ -30,6 +31,7 @@ Run URL: [DATABRICKS_URL]/jobs/[NUMID]/runs/[NUMID]?o=[NUMID]

[TIMESTAMP] "foo" RUNNING
[TIMESTAMP] "foo" TERMINATED SUCCESS
1

=== inline script
>>> [CLI] bundle run -- echo hello
Expand Down
48 changes: 48 additions & 0 deletions libs/testserver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ func (s *FakeWorkspace) JobsRunNow(req Request) Response {
logs, err = s.executePythonWheelTask(job.Settings, taskToExecute)
} else if t.NotebookTask != nil {
logs, err = s.executeNotebookTask(t, request.NotebookParams)
} else if t.SparkPythonTask != nil {
logs, err = s.executeSparkPythonTask(t)
}

if err != nil {
Expand Down Expand Up @@ -511,6 +513,52 @@ func (s *FakeWorkspace) executeNotebookTask(task jobs.Task, notebookParams map[s
return strings.TrimRight(string(output), "\r\n") + "\n", nil
}

// executeSparkPythonTask runs a spark_python_task locally by reading the
// python_file from the workspace and executing it in a uv-created venv.
// For tasks using existing_cluster_id, the venv is cached per cluster to match
// cloud behavior where libraries are cached on running clusters.
func (s *FakeWorkspace) executeSparkPythonTask(task jobs.Task) (string, error) {
if task.SparkPythonTask == nil {
return "", errors.New("task has no spark_python_task")
}

// Read python file from workspace (lock already held by caller)
pythonPath := task.SparkPythonTask.PythonFile
if !strings.HasPrefix(pythonPath, "/") {
pythonPath = "/" + pythonPath
}

pythonData := s.files[pythonPath].Data
if len(pythonData) == 0 {
return "", fmt.Errorf("python file not found in workspace: %s", pythonPath)
}

env, cleanup, err := s.getOrCreateClusterEnv(task)
if err != nil {
return "", err
}
if cleanup != nil {
defer cleanup()
}

// Write python file into the cluster env so it can be executed by the venv.
pythonFile := filepath.Join(env.dir, filepath.Base(pythonPath))
if err := os.WriteFile(pythonFile, pythonData, 0o644); err != nil {
return "", fmt.Errorf("failed to write python file: %w", err)
}

runArgs := []string{pythonFile}
runArgs = append(runArgs, task.SparkPythonTask.Parameters...)

output, err := exec.Command(venvPython(env.venvDir), runArgs...).CombinedOutput()
if err != nil {
return string(output), fmt.Errorf("spark python task execution failed: %s\n%s", err, output)
}

// Normalize trailing newlines to match cloud behavior (exactly one trailing newline)
return strings.TrimRight(string(output), "\r\n") + "\n", nil
}

// getOrCreateClusterEnv returns a cached venv for existing clusters or creates
// a fresh one for new clusters. The cleanup function is non-nil only for new
// clusters (whose venvs should be removed after use).
Expand Down
Loading