diff --git a/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml b/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml index af50cb9b76b..475b179caed 100644 --- a/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml +++ b/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml @@ -1,4 +1,4 @@ -Local = false +Local = true Cloud = true CloudSlow = true RunsOnDbr = true diff --git a/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml b/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml index 927221d7156..3784a7961ab 100644 --- a/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml +++ b/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml @@ -1,4 +1,4 @@ -Local = false +Local = true CloudSlow = true RecordRequests = false RunsOnDbr = true diff --git a/acceptance/bundle/run/basic/output.txt b/acceptance/bundle/run/basic/output.txt index 6d0534d9c2b..77a6a923e1d 100644 --- a/acceptance/bundle/run/basic/output.txt +++ b/acceptance/bundle/run/basic/output.txt @@ -17,6 +17,7 @@ Run URL: [DATABRICKS_URL]/jobs/[NUMID]/runs/[NUMID]?o=[NUMID] [TIMESTAMP] "foo" RUNNING [TIMESTAMP] "foo" TERMINATED SUCCESS +1 === no resource key with -- >>> [CLI] bundle run -- @@ -30,6 +31,7 @@ Run URL: [DATABRICKS_URL]/jobs/[NUMID]/runs/[NUMID]?o=[NUMID] [TIMESTAMP] "foo" RUNNING [TIMESTAMP] "foo" TERMINATED SUCCESS +1 === inline script >>> [CLI] bundle run -- echo hello diff --git a/libs/testserver/jobs.go b/libs/testserver/jobs.go index e9468f7f681..9f42055f079 100644 --- a/libs/testserver/jobs.go +++ b/libs/testserver/jobs.go @@ -308,6 +308,8 @@ func (s *FakeWorkspace) JobsRunNow(req Request) Response { logs, err = s.executePythonWheelTask(job.Settings, taskToExecute) } else if t.NotebookTask != nil { logs, err = s.executeNotebookTask(t, request.NotebookParams) + } else if t.SparkPythonTask != nil { + logs, err = s.executeSparkPythonTask(t) } if err != nil { @@ -511,6 +513,52 @@ func (s *FakeWorkspace) executeNotebookTask(task jobs.Task, notebookParams map[s return strings.TrimRight(string(output), "\r\n") + "\n", nil } +// executeSparkPythonTask runs a spark_python_task locally by reading the +// python_file from the workspace and executing it in a uv-created venv. +// For tasks using existing_cluster_id, the venv is cached per cluster to match +// cloud behavior where libraries are cached on running clusters. +func (s *FakeWorkspace) executeSparkPythonTask(task jobs.Task) (string, error) { + if task.SparkPythonTask == nil { + return "", errors.New("task has no spark_python_task") + } + + // Read python file from workspace (lock already held by caller) + pythonPath := task.SparkPythonTask.PythonFile + if !strings.HasPrefix(pythonPath, "/") { + pythonPath = "/" + pythonPath + } + + pythonData := s.files[pythonPath].Data + if len(pythonData) == 0 { + return "", fmt.Errorf("python file not found in workspace: %s", pythonPath) + } + + env, cleanup, err := s.getOrCreateClusterEnv(task) + if err != nil { + return "", err + } + if cleanup != nil { + defer cleanup() + } + + // Write python file into the cluster env so it can be executed by the venv. + pythonFile := filepath.Join(env.dir, filepath.Base(pythonPath)) + if err := os.WriteFile(pythonFile, pythonData, 0o644); err != nil { + return "", fmt.Errorf("failed to write python file: %w", err) + } + + runArgs := []string{pythonFile} + runArgs = append(runArgs, task.SparkPythonTask.Parameters...) + + output, err := exec.Command(venvPython(env.venvDir), runArgs...).CombinedOutput() + if err != nil { + return string(output), fmt.Errorf("spark python task execution failed: %s\n%s", err, output) + } + + // Normalize trailing newlines to match cloud behavior (exactly one trailing newline) + return strings.TrimRight(string(output), "\r\n") + "\n", nil +} + // getOrCreateClusterEnv returns a cached venv for existing clusters or creates // a fresh one for new clusters. The cleanup function is non-nil only for new // clusters (whose venvs should be removed after use).