From 5542430b3a02e014daaaae1419a7b8a8840908f5 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 18 Jun 2026 15:41:10 +0200 Subject: [PATCH 1/3] acc: run spark_python_task cluster test locally via testserver Add an executeSparkPythonTask branch to the testserver's JobsRunNow so a job whose single task is a spark_python_task executes its python_file in a uv-created venv (with per-cluster venv caching, mirroring the wheel and notebook helpers) and captures stdout into JobRunOutputs. This lets the spark_python_task cluster run test run against the in-process fake instead of requiring real DBR, so it is flipped to Local = true while still running in cloud. Co-authored-by: Isaac --- .../run/spark_python_task/out.test.toml | 4 +- .../clusters/run/spark_python_task/test.toml | 5 +- libs/testserver/jobs.go | 48 +++++++++++++++++++ 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml b/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml index af50cb9b76b..bbc7fcfd1bd 100644 --- a/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml +++ b/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml @@ -1,5 +1,3 @@ -Local = false +Local = true Cloud = true -CloudSlow = true -RunsOnDbr = true EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml b/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml index 927221d7156..8793a9b7ca7 100644 --- a/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml +++ b/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml @@ -1,7 +1,6 @@ -Local = false -CloudSlow = true +Local = true +Cloud = true RecordRequests = false -RunsOnDbr = true Ignore = [ "databricks.yml", diff --git a/libs/testserver/jobs.go b/libs/testserver/jobs.go index e9468f7f681..9f42055f079 100644 --- a/libs/testserver/jobs.go +++ b/libs/testserver/jobs.go @@ -308,6 +308,8 @@ func (s *FakeWorkspace) JobsRunNow(req Request) Response { logs, err = s.executePythonWheelTask(job.Settings, taskToExecute) } else if t.NotebookTask != nil { logs, err = s.executeNotebookTask(t, request.NotebookParams) + } else if t.SparkPythonTask != nil { + logs, err = s.executeSparkPythonTask(t) } if err != nil { @@ -511,6 +513,52 @@ func (s *FakeWorkspace) executeNotebookTask(task jobs.Task, notebookParams map[s return strings.TrimRight(string(output), "\r\n") + "\n", nil } +// executeSparkPythonTask runs a spark_python_task locally by reading the +// python_file from the workspace and executing it in a uv-created venv. +// For tasks using existing_cluster_id, the venv is cached per cluster to match +// cloud behavior where libraries are cached on running clusters. +func (s *FakeWorkspace) executeSparkPythonTask(task jobs.Task) (string, error) { + if task.SparkPythonTask == nil { + return "", errors.New("task has no spark_python_task") + } + + // Read python file from workspace (lock already held by caller) + pythonPath := task.SparkPythonTask.PythonFile + if !strings.HasPrefix(pythonPath, "/") { + pythonPath = "/" + pythonPath + } + + pythonData := s.files[pythonPath].Data + if len(pythonData) == 0 { + return "", fmt.Errorf("python file not found in workspace: %s", pythonPath) + } + + env, cleanup, err := s.getOrCreateClusterEnv(task) + if err != nil { + return "", err + } + if cleanup != nil { + defer cleanup() + } + + // Write python file into the cluster env so it can be executed by the venv. + pythonFile := filepath.Join(env.dir, filepath.Base(pythonPath)) + if err := os.WriteFile(pythonFile, pythonData, 0o644); err != nil { + return "", fmt.Errorf("failed to write python file: %w", err) + } + + runArgs := []string{pythonFile} + runArgs = append(runArgs, task.SparkPythonTask.Parameters...) + + output, err := exec.Command(venvPython(env.venvDir), runArgs...).CombinedOutput() + if err != nil { + return string(output), fmt.Errorf("spark python task execution failed: %s\n%s", err, output) + } + + // Normalize trailing newlines to match cloud behavior (exactly one trailing newline) + return strings.TrimRight(string(output), "\r\n") + "\n", nil +} + // getOrCreateClusterEnv returns a cached venv for existing clusters or creates // a fresh one for new clusters. The cleanup function is non-nil only for new // clusters (whose venvs should be removed after use). From 81eae8d175d104634d7c116e5e535f278c6ca05e Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 18 Jun 2026 17:16:55 +0200 Subject: [PATCH 2/3] acc: keep spark_python_task in the cloud and DBR lanes Local execution is additive: retain CloudSlow and RunsOnDbr so the test still exercises real cluster execution on cloud and DBR, only adding Local = true. Co-authored-by: Isaac --- .../resources/clusters/run/spark_python_task/out.test.toml | 2 ++ .../bundle/resources/clusters/run/spark_python_task/test.toml | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml b/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml index bbc7fcfd1bd..475b179caed 100644 --- a/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml +++ b/acceptance/bundle/resources/clusters/run/spark_python_task/out.test.toml @@ -1,3 +1,5 @@ Local = true Cloud = true +CloudSlow = true +RunsOnDbr = true EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml b/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml index 8793a9b7ca7..3784a7961ab 100644 --- a/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml +++ b/acceptance/bundle/resources/clusters/run/spark_python_task/test.toml @@ -1,6 +1,7 @@ Local = true -Cloud = true +CloudSlow = true RecordRequests = false +RunsOnDbr = true Ignore = [ "databricks.yml", From d8d4a39f0b137cb36147505892a3396b51db55b1 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 18 Jun 2026 18:37:15 +0200 Subject: [PATCH 3/3] acc: update bundle/run/basic golden for spark_python_task execution bundle/run/basic's job is a spark_python_task; now that the testserver executes it, `bundle run` prints the script's stdout. Regenerate the golden to include it. Co-authored-by: Isaac --- acceptance/bundle/run/basic/output.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/acceptance/bundle/run/basic/output.txt b/acceptance/bundle/run/basic/output.txt index 6d0534d9c2b..77a6a923e1d 100644 --- a/acceptance/bundle/run/basic/output.txt +++ b/acceptance/bundle/run/basic/output.txt @@ -17,6 +17,7 @@ Run URL: [DATABRICKS_URL]/jobs/[NUMID]/runs/[NUMID]?o=[NUMID] [TIMESTAMP] "foo" RUNNING [TIMESTAMP] "foo" TERMINATED SUCCESS +1 === no resource key with -- >>> [CLI] bundle run -- @@ -30,6 +31,7 @@ Run URL: [DATABRICKS_URL]/jobs/[NUMID]/runs/[NUMID]?o=[NUMID] [TIMESTAMP] "foo" RUNNING [TIMESTAMP] "foo" TERMINATED SUCCESS +1 === inline script >>> [CLI] bundle run -- echo hello