diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index adea169e..7d9ca32f 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -23,9 +23,9 @@ on: env: OPERATOR_NAME: "spark-k8s-operator" - RUST_NIGHTLY_TOOLCHAIN_VERSION: "nightly-2025-10-23" - NIX_PKG_MANAGER_VERSION: "2.30.0" - RUST_TOOLCHAIN_VERSION: "1.89.0" + RUST_NIGHTLY_TOOLCHAIN_VERSION: "nightly-2026-02-24" + NIX_PKG_MANAGER_VERSION: "2.33.3" + RUST_TOOLCHAIN_VERSION: "1.93.0" HADOLINT_VERSION: "v2.14.0" PYTHON_VERSION: "3.14" CARGO_TERM_COLOR: always @@ -139,7 +139,7 @@ jobs: set -euo pipefail [ -n "$GITHUB_DEBUG" ] && set -x - CURRENT_VERSION=$(cargo metadata --format-version 1 --no-deps | jq -r '.packages[0].version') + CURRENT_VERSION=$(cargo metadata --format-version 1 --no-deps | jq -r '.packages[] | select(.name == "stackable-spark-k8s-operator") | .version') if [ "$GITHUB_EVENT_NAME" == 'pull_request' ]; then # Include a PR suffix if this workflow is triggered by a PR diff --git a/.github/workflows/pr_pre-commit.yaml b/.github/workflows/pr_pre-commit.yaml index acbc963e..b45e2daf 100644 --- a/.github/workflows/pr_pre-commit.yaml +++ b/.github/workflows/pr_pre-commit.yaml @@ -7,11 +7,11 @@ on: env: CARGO_TERM_COLOR: always - NIX_PKG_MANAGER_VERSION: "2.30.0" - RUST_TOOLCHAIN_VERSION: "nightly-2025-10-23" + NIX_PKG_MANAGER_VERSION: "2.33.3" + RUST_TOOLCHAIN_VERSION: "nightly-2026-02-24" HADOLINT_VERSION: "v2.14.0" PYTHON_VERSION: "3.14" - JINJA2_CLI_VERSION: "0.8.2" + JINJA2_CLI_VERSION: "1.0.0" jobs: pre-commit: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e338a62d..91fc091f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -105,7 +105,7 @@ repos: - id: cargo-rustfmt name: cargo-rustfmt language: system - entry: cargo +nightly-2025-10-23 fmt --all -- --check + entry: cargo +nightly-2026-02-24 fmt --all -- --check stages: [pre-commit, pre-merge-commit] pass_filenames: false files: \.rs$ diff --git a/.vscode/settings.json b/.vscode/settings.json index b3b3af8b..b59f2a53 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,7 @@ { "rust-analyzer.rustfmt.overrideCommand": [ "rustfmt", - "+nightly-2025-10-23", + "+nightly-2026-02-24", "--edition", "2024", "--" diff --git a/CHANGELOG.md b/CHANGELOG.md index d5b99c68..f728092b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ All notable changes to this project will be documented in this file. Previously, Jobs were retried at most 6 times by default ([#647]). - Support for Spark `3.5.8` ([#650]). - First class support for S3 on Spark connect clusters ([#652]). +- Spark applications can now have templates that are merged into the application manifest before reconciliation. This allows users with many applications to source out common configuration in a central place and reduce duplication ([#660]). ### Fixed @@ -45,6 +46,7 @@ All notable changes to this project will be documented in this file. [#652]: https://github.com/stackabletech/spark-k8s-operator/pull/652 [#655]: https://github.com/stackabletech/spark-k8s-operator/pull/655 [#656]: https://github.com/stackabletech/spark-k8s-operator/pull/656 +[#660]: https://github.com/stackabletech/spark-k8s-operator/pull/660 ## [25.11.0] - 2025-11-07 diff --git a/Cargo.lock b/Cargo.lock index 4b791ba9..44c7b7de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2947,6 +2947,7 @@ dependencies = [ "futures 0.3.32", "indoc", "product-config", + "regex", "rstest", "semver", "serde", diff --git a/Cargo.nix b/Cargo.nix index 6e08c905..cdb5bfc1 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -1,5 +1,5 @@ -# This file was @generated by crate2nix 0.14.1 with the command: +# This file was @generated by crate2nix 0.15.0 with the command: # "generate" # See https://github.com/kolloch/crate2nix for more info. @@ -19,6 +19,11 @@ # (separated by `,`, prefixed with `+`). # Used for conditional compilation based on CPU feature detection. , targetFeatures ? [] + # Additional target attributes for conditional dependencies. + # Use this for custom cfg flags that are passed via rustcflags but need to + # be known at Nix evaluation time for dependency resolution. + # Example: { tracing_unstable = true; } for crates using cfg(tracing_unstable). +, extraTargetFlags ? {} # Whether to perform release builds: longer compile times, faster binaries. , release ? true # Additional crate2nix configuration if it exists. @@ -9842,6 +9847,10 @@ rec { name = "product-config"; packageId = "product-config"; } + { + name = "regex"; + packageId = "regex"; + } { name = "semver"; packageId = "semver"; @@ -14623,7 +14632,7 @@ rec { endian = if platform.parsed.cpu.significantByte.name == "littleEndian" then "little" else "big"; pointer_width = toString platform.parsed.cpu.bits; debug_assertions = false; - }; + } // extraTargetFlags; registryUrl = { registries @@ -14786,7 +14795,7 @@ rec { in pkgs.runCommand "${crate.name}-linked" { - inherit (crate) outputs crateName; + inherit (crate) outputs crateName meta; passthru = (crate.passthru or { }) // { inherit test; }; @@ -14998,7 +15007,7 @@ rec { crateConfig // { src = - crateConfig.src or (pkgs.fetchurl rec { + crateConfig.src or (fetchurl rec { name = "${crateConfig.crateName}-${crateConfig.version}.tar.gz"; # https://www.pietroalbini.org/blog/downloading-crates-io/ # Not rate-limited, CDN URL. diff --git a/Cargo.toml b/Cargo.toml index 0ddad0db..b347bcac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ tokio = { version = "1.40", features = ["full"] } tracing = "0.1" tracing-futures = { version = "0.2", features = ["futures-03"] } indoc = "2" +regex = "1" [patch."https://github.com/stackabletech/operator-rs.git"] # stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" } diff --git a/deploy/helm/spark-k8s-operator/templates/roles.yaml b/deploy/helm/spark-k8s-operator/templates/roles.yaml index c7737445..41fad95c 100644 --- a/deploy/helm/spark-k8s-operator/templates/roles.yaml +++ b/deploy/helm/spark-k8s-operator/templates/roles.yaml @@ -124,6 +124,7 @@ rules: - sparkapplications - sparkhistoryservers - sparkconnectservers + - sparkapptemplates verbs: - get - list diff --git a/docker/Dockerfile b/docker/Dockerfile index 9286552c..3ea0ac51 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -13,12 +13,12 @@ # We want to automatically use the latest. We also don't tag our images with a version. # hadolint ignore=DL3007 -FROM oci.stackable.tech/sdp/ubi9-rust-builder:latest AS builder +FROM oci.stackable.tech/sdp/ubi10-rust-builder:latest AS builder # We want to automatically use the latest. # hadolint ignore=DL3007 -FROM registry.access.redhat.com/ubi9/ubi-minimal:latest AS operator +FROM registry.access.redhat.com/ubi10/ubi-minimal:latest AS operator ARG VERSION # NOTE (@Techassi): This is required for OpenShift/Red Hat certification @@ -74,7 +74,7 @@ LABEL org.opencontainers.image.description="Deploy and manage Apache Spark-on-Ku # https://docs.openshift.com/container-platform/4.16/openshift_images/create-images.html#defining-image-metadata # https://github.com/projectatomic/ContainerApplicationGenericLabels/blob/master/vendor/redhat/labels.md -LABEL io.openshift.tags="ubi9,stackable,sdp,spark-k8s" +LABEL io.openshift.tags="ubi10,stackable,sdp,spark-k8s" LABEL io.k8s.description="Deploy and manage Apache Spark-on-Kubernetes clusters." LABEL io.k8s.display-name="Stackable Operator for Apache Spark-on-Kubernetes" diff --git a/docs/modules/spark-k8s/pages/usage-guide/app_templates.adoc b/docs/modules/spark-k8s/pages/usage-guide/app_templates.adoc new file mode 100644 index 00000000..46481cb4 --- /dev/null +++ b/docs/modules/spark-k8s/pages/usage-guide/app_templates.adoc @@ -0,0 +1,99 @@ += Spark Application Templates +:description: Learn how to configure application templates for Spark applications on the Stackable Data Platform. + +Spark application templates are used to define reusable configurations for Spark applications. +When you have many applications with similar configurations, templates can help you avoid duplication by grouping common settings together. +Application templates are available for the `v1alpha1` version of the SparkApplication custom resource and share the exact same structure as the SparkApplication resource, but with some differences in the way the operator handles them: + +1. Application templates are cluster wide resources, while Spark application resources are namespace-scoped. This means that application templates can be used across multiple namespaces, while Spark application resources are limited to the namespace they are created in. +2. Application templates are not reconciled by the operator, but must be referenced from a SparkApplication resource to be applied. This means that changes to an application template will not automatically trigger updates to SparkApplication resources that reference it. +3. An application can reference multiple application templates, and the settings from these templates will be merged together. The merging order of the templates is indicated by their index in the reference list. The application fields have the highest precedence and will override any conflicting settings from the templates. This allows you to have a base template with common settings and then override specific settings in the application resource as needed. +4. Application template references are immutable in the sense that once applied to an application they cannot be changed again. Currently templates are applied upon the creation of the application, and any changes to the template references after that will be ignored. + +== Examples + +Applications use `metadata.annotations` to reference application templates as shown below: + +[source,yaml] +---- +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: app + annotations: + spark-application.template.merge: "true" # <1> + spark-application.template.0.name: "app-template" # <2> +spec: # <3> + sparkImage: + productVersion: "4.1.1" + mode: cluster + mainClass: com.example.Main + mainApplicationFile: "/examples.jar" +---- +<1> Enable application template merging for this application. +<2> Name of the application template to reference. +<3> Application specification. The fields `sparkImage`, `mode`, `mainClass`, and `mainApplicationFile` are required for the application to be valid, but the rest of the fields are optional and can be defined in the application template. + +The application template referenced in the example above is defined as follows: + +[source,yaml] +---- +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplicationTemplate # <1> +metadata: + name: app-template # <2> +spec: + sparkImage: + productVersion: "4.1.1" + pullPolicy: IfNotPresent + mode: cluster + mainClass: com.example.Main + mainApplicationFile: "placeholder" # <3> + sparkConf: + spark.kubernetes.file.upload.path: "s3a://my-bucket" + s3connection: + reference: spark-history-s3-connection + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + reference: spark-history-s3-bucket + driver: + config: + logging: + enableVectorAgent: False + executor: + replicas: 1 + config: + logging: + enableVectorAgent: False +---- +<1> The kind of the resource is `SparkApplicationTemplate` to indicate that this is an application template. +<2> Name of the application template. +<3> The value of `mainApplicationFile` is set to a placeholder value, which will be overridden by the application resource. Similarly to the application, The fields `sparkImage`, `mode`, `mainClass`, and `mainApplicationFile` are required for the template to be valid. + +An application can reference multiple application templates as shown below: + +[source,yaml] +---- +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: app + annotations: + spark-application.template.merge: "true" # <1> + spark-application.template.0.name: "app-template-0" # <2> + spark-application.template.1.name: "app-template-1" + spark-application.template.2.name: "app-template-2" +spec: # <3> + sparkImage: + productVersion: "4.1.1" + mode: cluster + mainClass: com.example.Main + mainApplicationFile: "/examples.jar" +---- +<1> Enable application template merging for this application. +<2> The name of the application templates to reference. The settings from these templates will be merged together in the order they are referenced, with `app-template-0` having the lowest precedence and `app-template-2` having the highest precedence. Tha application fields have the highest overall precedence and will override any conflicting settings from the templates. diff --git a/docs/modules/spark-k8s/partials/nav.adoc b/docs/modules/spark-k8s/partials/nav.adoc index e4ac8d3e..f28352ae 100644 --- a/docs/modules/spark-k8s/partials/nav.adoc +++ b/docs/modules/spark-k8s/partials/nav.adoc @@ -6,6 +6,7 @@ ** xref:spark-k8s:usage-guide/job-dependencies.adoc[] ** xref:spark-k8s:usage-guide/resources.adoc[] ** xref:spark-k8s:usage-guide/s3.adoc[] +** xref:spark-k8s:usage-guide/app_templates.adoc[] ** xref:spark-k8s:usage-guide/security.adoc[] ** xref:spark-k8s:usage-guide/logging.adoc[] ** xref:spark-k8s:usage-guide/history-server.adoc[] diff --git a/extra/crds.yaml b/extra/crds.yaml index 37f394e1..54dafd49 100644 --- a/extra/crds.yaml +++ b/extra/crds.yaml @@ -2139,6 +2139,18 @@ spec: properties: phase: type: string + resolvedTemplateRef: + items: + properties: + name: + type: string + uid: + nullable: true + type: string + required: + - name + type: object + type: array required: - phase type: object @@ -4408,3 +4420,2143 @@ spec: storage: true subresources: status: {} +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: sparkapptemplates.spark.stackable.tech +spec: + group: spark.stackable.tech + names: + categories: [] + kind: SparkApplicationTemplate + plural: sparkapptemplates + shortNames: + - sparkapptemplate + singular: sparkapplicationtemplate + scope: Cluster + versions: + - additionalPrinterColumns: [] + name: v1alpha1 + schema: + openAPIV3Schema: + description: Auto-generated derived type for SparkApplicationTemplateSpec via `CustomResource` + properties: + spec: + description: |- + A Spark application template. This resource is managed by the Stackable operator for Apache Spark. + Find more information on how to use it and the resources that the operator generates in the + [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/). + properties: + args: + default: [] + description: Arguments passed directly to the job artifact. + items: + type: string + type: array + deps: + default: + excludePackages: [] + packages: [] + repositories: [] + requirements: [] + description: |- + Job dependencies: a list of python packages that will be installed via pip, a list of packages + or repositories that is passed directly to spark-submit, or a list of excluded packages + (also passed directly to spark-submit). + properties: + excludePackages: + default: [] + description: A list of excluded packages that is passed directly to `spark-submit`. + items: + type: string + type: array + packages: + default: [] + description: A list of packages that is passed directly to `spark-submit`. + items: + type: string + type: array + repositories: + default: [] + description: A list of repositories that is passed directly to `spark-submit`. + items: + type: string + type: array + requirements: + default: [] + description: |- + Under the `requirements` you can specify Python dependencies that will be installed with `pip`. + Example: `tabulate==0.8.9` + items: + type: string + type: array + type: object + driver: + description: |- + The driver role specifies the configuration that, together with the driver pod template, is used by + Spark to create driver pods. + nullable: true + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: |- + These configuration settings control + [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + description: Log configuration per container. + properties: + job: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + requirements: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + spark: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + spark-submit: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + tls: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + vector: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + requestedSecretLifetime: + description: |- + Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. + This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. + nullable: true + type: string + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: |- + Resource usage is configured here, this includes CPU usage, memory usage and disk storage + usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: |- + The maximum amount of CPU cores that can be requested by Pods. + Equivalent to the `limit` for Pod resource configuration. + Cores are specified either as a decimal point number or as milli units. + For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + x-kubernetes-int-or-string: true + min: + description: |- + The minimal amount of CPU cores that Pods need to run. + Equivalent to the `request` for Pod resource configuration. + Cores are specified either as a decimal point number or as milli units. + For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + x-kubernetes-int-or-string: true + type: object + memory: + properties: + limit: + description: |- + The maximum amount of memory that should be available to the Pod. + Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), + which means these suffixes are supported: E, P, T, G, M, k. + You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. + For example, the following represent roughly the same value: + `128974848, 129e6, 129M, 128974848000m, 123Mi` + nullable: true + x-kubernetes-int-or-string: true + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + volumeMounts: + description: Volume mounts for the spark-submit, driver and executor pods. + items: + type: object + x-kubernetes-preserve-unknown-fields: true + type: array + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: |- + The `configOverrides` can be used to configure properties in product config files + that are not exposed in the CRD. Read the + [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) + and consult the operator specific usage guide documentation for details on the + available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: |- + `envOverrides` configure environment variables to be set in the Pods. + It is a map from strings to strings - environment variables and the value to set. + Read the + [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) + for more information and consult the operator specific usage guide to find out about + the product specific environment variables that are available. + type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: |- + Allows overriding JVM arguments. + Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) + for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object + podOverrides: + default: {} + description: |- + In the `podOverrides` property you can define a + [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#podtemplatespec-v1-core) + to override any property that can be set on a Kubernetes Pod. + Read the + [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) + for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + env: + default: [] + description: |- + A list of environment variables that will be set in the job pod and the driver and executor + pod templates. + items: + description: EnvVar represents an environment variable present in a Container. + properties: + name: + description: Name of the environment variable. May consist of any printable ASCII characters except '='. + type: string + value: + description: 'Variable references $(VAR_NAME) are expanded using the previously defined environment variables in the container and any service environment variables. If a variable cannot be resolved, the reference in the input string will be unchanged. Double $$ are reduced to a single $, which allows for escaping the $(VAR_NAME) syntax: i.e. "$$(VAR_NAME)" will produce the string literal "$(VAR_NAME)". Escaped references will never be expanded, regardless of whether the variable exists or not. Defaults to "".' + type: string + valueFrom: + description: Source for the environment variable's value. Cannot be used if value is not empty. + properties: + configMapKeyRef: + description: Selects a key of a ConfigMap. + properties: + key: + description: The key to select. + type: string + name: + description: 'Name of the referent. This field is effectively required, but due to backwards compatibility is allowed to be empty. Instances of this type with an empty value here are almost certainly wrong. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + optional: + description: Specify whether the ConfigMap or its key must be defined + type: boolean + required: + - key + - name + type: object + fieldRef: + description: 'Selects a field of the pod: supports metadata.name, metadata.namespace, `metadata.labels['''']`, `metadata.annotations['''']`, spec.nodeName, spec.serviceAccountName, status.hostIP, status.podIP, status.podIPs.' + properties: + apiVersion: + description: Version of the schema the FieldPath is written in terms of, defaults to "v1". + type: string + fieldPath: + description: Path of the field to select in the specified API version. + type: string + required: + - fieldPath + type: object + fileKeyRef: + description: FileKeyRef selects a key of the env file. Requires the EnvFiles feature gate to be enabled. + properties: + key: + description: The key within the env file. An invalid key will prevent the pod from starting. The keys defined within a source may consist of any printable ASCII characters except '='. During Alpha stage of the EnvFiles feature gate, the key size is limited to 128 characters. + type: string + optional: + description: |- + Specify whether the file or its key must be defined. If the file or key does not exist, then the env var is not published. If optional is set to true and the specified key does not exist, the environment variable will not be set in the Pod's containers. + + If optional is set to false and the specified key does not exist, an error will be returned during Pod creation. + type: boolean + path: + description: The path within the volume from which to select the file. Must be relative and may not contain the '..' path or start with '..'. + type: string + volumeName: + description: The name of the volume mount containing the env file. + type: string + required: + - key + - path + - volumeName + type: object + resourceFieldRef: + description: 'Selects a resource of the container: only resources limits and requests (limits.cpu, limits.memory, limits.ephemeral-storage, requests.cpu, requests.memory and requests.ephemeral-storage) are currently supported.' + properties: + containerName: + description: 'Container name: required for volumes, optional for env vars' + type: string + divisor: + description: Specifies the output format of the exposed resources, defaults to "1" + nullable: true + x-kubernetes-int-or-string: true + resource: + description: 'Required: resource to select' + type: string + required: + - resource + type: object + secretKeyRef: + description: Selects a key of a secret in the pod's namespace + properties: + key: + description: The key of the secret to select from. Must be a valid secret key. + type: string + name: + description: 'Name of the referent. This field is effectively required, but due to backwards compatibility is allowed to be empty. Instances of this type with an empty value here are almost certainly wrong. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + optional: + description: Specify whether the Secret or its key must be defined + type: boolean + required: + - key + - name + type: object + type: object + required: + - name + type: object + type: array + executor: + description: |- + The executor role specifies the configuration that, together with the driver pod template, is used by + Spark to create the executor pods. + This is RoleGroup instead of plain CommonConfiguration because it needs to allow for the number of replicas. + to be specified. + nullable: true + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: |- + These configuration settings control + [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + description: Log configuration per container. + properties: + job: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + requirements: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + spark: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + spark-submit: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + tls: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + vector: + anyOf: + - required: + - custom + - {} + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: |- + The log level threshold. + Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + - null + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + requestedSecretLifetime: + description: |- + Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. + This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. + nullable: true + type: string + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: |- + Resource usage is configured here, this includes CPU usage, memory usage and disk storage + usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: |- + The maximum amount of CPU cores that can be requested by Pods. + Equivalent to the `limit` for Pod resource configuration. + Cores are specified either as a decimal point number or as milli units. + For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + x-kubernetes-int-or-string: true + min: + description: |- + The minimal amount of CPU cores that Pods need to run. + Equivalent to the `request` for Pod resource configuration. + Cores are specified either as a decimal point number or as milli units. + For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + x-kubernetes-int-or-string: true + type: object + memory: + properties: + limit: + description: |- + The maximum amount of memory that should be available to the Pod. + Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), + which means these suffixes are supported: E, P, T, G, M, k. + You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. + For example, the following represent roughly the same value: + `128974848, 129e6, 129M, 128974848000m, 123Mi` + nullable: true + x-kubernetes-int-or-string: true + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + volumeMounts: + description: Volume mounts for the spark-submit, driver and executor pods. + items: + type: object + x-kubernetes-preserve-unknown-fields: true + type: array + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: |- + The `configOverrides` can be used to configure properties in product config files + that are not exposed in the CRD. Read the + [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) + and consult the operator specific usage guide documentation for details on the + available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: |- + `envOverrides` configure environment variables to be set in the Pods. + It is a map from strings to strings - environment variables and the value to set. + Read the + [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) + for more information and consult the operator specific usage guide to find out about + the product specific environment variables that are available. + type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: |- + Allows overriding JVM arguments. + Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) + for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object + podOverrides: + default: {} + description: |- + In the `podOverrides` property you can define a + [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#podtemplatespec-v1-core) + to override any property that can be set on a Kubernetes Pod. + Read the + [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) + for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + replicas: + format: uint16 + maximum: 65535.0 + minimum: 0.0 + nullable: true + type: integer + type: object + image: + description: |- + User-supplied image containing spark-job dependencies that will be copied to the specified volume mount. + See the [examples](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/examples). + nullable: true + type: string + job: + description: |- + The job builds a spark-submit command, complete with arguments and referenced dependencies + such as templates, and passes it on to Spark. + The reason this property uses its own type (SubmitConfigFragment) is because logging is not + supported for spark-submit processes. + nullable: true + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + requestedSecretLifetime: + description: |- + Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`. + This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate. + nullable: true + type: string + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: |- + Resource usage is configured here, this includes CPU usage, memory usage and disk storage + usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: |- + The maximum amount of CPU cores that can be requested by Pods. + Equivalent to the `limit` for Pod resource configuration. + Cores are specified either as a decimal point number or as milli units. + For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + x-kubernetes-int-or-string: true + min: + description: |- + The minimal amount of CPU cores that Pods need to run. + Equivalent to the `request` for Pod resource configuration. + Cores are specified either as a decimal point number or as milli units. + For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + x-kubernetes-int-or-string: true + type: object + memory: + properties: + limit: + description: |- + The maximum amount of memory that should be available to the Pod. + Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), + which means these suffixes are supported: E, P, T, G, M, k. + You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. + For example, the following represent roughly the same value: + `128974848, 129e6, 129M, 128974848000m, 123Mi` + nullable: true + x-kubernetes-int-or-string: true + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + retryOnFailureCount: + description: Number of times to retry the submit job on failure. Default is `0` (no retries). + format: uint16 + maximum: 65535.0 + minimum: 0.0 + nullable: true + type: integer + volumeMounts: + description: Volume mounts for the spark-submit, driver and executor pods. + items: + type: object + x-kubernetes-preserve-unknown-fields: true + type: array + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: |- + The `configOverrides` can be used to configure properties in product config files + that are not exposed in the CRD. Read the + [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) + and consult the operator specific usage guide documentation for details on the + available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: |- + `envOverrides` configure environment variables to be set in the Pods. + It is a map from strings to strings - environment variables and the value to set. + Read the + [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) + for more information and consult the operator specific usage guide to find out about + the product specific environment variables that are available. + type: object + jvmArgumentOverrides: + default: + add: [] + remove: [] + removeRegex: [] + description: |- + Allows overriding JVM arguments. + Please read on the [JVM argument overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#jvm-argument-overrides) + for details on the usage. + properties: + add: + default: [] + description: JVM arguments to be added + items: + type: string + type: array + remove: + default: [] + description: JVM arguments to be removed by exact match + items: + type: string + type: array + removeRegex: + default: [] + description: JVM arguments matching any of this regexes will be removed + items: + type: string + type: array + type: object + podOverrides: + default: {} + description: |- + In the `podOverrides` property you can define a + [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.34/#podtemplatespec-v1-core) + to override any property that can be set on a Kubernetes Pod. + Read the + [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) + for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + logFileDirectory: + description: The log file directory definition used by the Spark history server. + nullable: true + oneOf: + - required: + - s3 + - required: + - customLogDirectory + properties: + customLogDirectory: + description: A custom log directory + type: string + s3: + description: An S3 bucket storing the log events + properties: + bucket: + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: |- + S3 bucket specification containing the bucket name and an inlined or referenced connection specification. + Learn more on the [S3 concept documentation](https://docs.stackable.tech/home/nightly/concepts/s3). + properties: + bucketName: + description: The name of the S3 bucket. + type: string + connection: + description: The definition of an S3 connection, either inline or as a reference. + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: |- + S3 connection definition as a resource. + Learn more on the [S3 concept documentation](https://docs.stackable.tech/home/nightly/concepts/s3). + properties: + accessStyle: + default: VirtualHosted + description: |- + Which access style to use. + Defaults to virtual hosted-style as most of the data products out there. + Have a look at the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html). + enum: + - Path + - VirtualHosted + type: string + credentials: + description: |- + If the S3 uses authentication you have to specify you S3 credentials. + In the most cases a [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) + providing `accessKey` and `secretKey` is sufficient. + nullable: true + properties: + scope: + description: |- + [Scope](https://docs.stackable.tech/home/nightly/secret-operator/scope) of the + [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass). + nullable: true + properties: + listenerVolumes: + default: [] + description: |- + The listener volume scope allows Node and Service scopes to be inferred from the applicable listeners. + This must correspond to Volume names in the Pod that mount Listeners. + items: + type: string + type: array + node: + default: false + description: |- + The node scope is resolved to the name of the Kubernetes Node object that the Pod is running on. + This will typically be the DNS name of the node. + type: boolean + pod: + default: false + description: |- + The pod scope is resolved to the name of the Kubernetes Pod. + This allows the secret to differentiate between StatefulSet replicas. + type: boolean + services: + default: [] + description: |- + The service scope allows Pod objects to specify custom scopes. + This should typically correspond to Service objects that the Pod participates in. + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) containing the LDAP bind credentials.' + type: string + required: + - secretClass + type: object + host: + description: 'Host of the S3 server without any protocol or port. For example: `west1.my-cloud.com`.' + type: string + port: + description: |- + Port the S3 server listens on. + If not specified the product will determine the port to use. + format: uint16 + maximum: 65535.0 + minimum: 0.0 + nullable: true + type: integer + region: + default: + name: us-east-1 + description: |- + Bucket region used for signing headers (sigv4). + + This defaults to `us-east-1` which is compatible with other implementations such as Minio. + + WARNING: Some products use the Hadoop S3 implementation which falls back to us-east-2. + properties: + name: + default: us-east-1 + type: string + type: object + tls: + description: Use a TLS connection. If not specified no TLS will be used. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client. + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates. + type: object + server: + description: Use TLS and a CA certificate to verify the server. + properties: + caCert: + description: CA cert to verify the server. + oneOf: + - required: + - webPki + - required: + - secretClass + properties: + secretClass: + description: |- + Name of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) which will provide the CA certificate. + Note that a SecretClass does not need to have a key but can also work with just a CA certificate, + so if you got provided with a CA cert but don't have access to the key you can still use this method. + type: string + webPki: + description: |- + Use TLS and the CA certificates trusted by the common web browsers to verify the server. + This can be useful when you e.g. use public AWS S3 or other public available services. + type: object + type: object + required: + - caCert + type: object + type: object + required: + - verification + type: object + required: + - host + type: object + reference: + type: string + type: object + required: + - bucketName + - connection + type: object + reference: + type: string + type: object + prefix: + type: string + required: + - bucket + - prefix + type: object + type: object + mainApplicationFile: + description: The actual application file that will be called by `spark-submit`. + type: string + mainClass: + description: The main class - i.e. entry point - for JVM artifacts. + nullable: true + type: string + mode: + description: 'Mode: cluster or client. Currently only cluster is supported.' + enum: + - cluster + - client + type: string + s3connection: + description: |- + Configure an S3 connection that the SparkApplication has access to. + Read more in the [Spark S3 usage guide](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/s3). + nullable: true + oneOf: + - required: + - inline + - required: + - reference + properties: + inline: + description: |- + S3 connection definition as a resource. + Learn more on the [S3 concept documentation](https://docs.stackable.tech/home/nightly/concepts/s3). + properties: + accessStyle: + default: VirtualHosted + description: |- + Which access style to use. + Defaults to virtual hosted-style as most of the data products out there. + Have a look at the [AWS documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html). + enum: + - Path + - VirtualHosted + type: string + credentials: + description: |- + If the S3 uses authentication you have to specify you S3 credentials. + In the most cases a [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) + providing `accessKey` and `secretKey` is sufficient. + nullable: true + properties: + scope: + description: |- + [Scope](https://docs.stackable.tech/home/nightly/secret-operator/scope) of the + [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass). + nullable: true + properties: + listenerVolumes: + default: [] + description: |- + The listener volume scope allows Node and Service scopes to be inferred from the applicable listeners. + This must correspond to Volume names in the Pod that mount Listeners. + items: + type: string + type: array + node: + default: false + description: |- + The node scope is resolved to the name of the Kubernetes Node object that the Pod is running on. + This will typically be the DNS name of the node. + type: boolean + pod: + default: false + description: |- + The pod scope is resolved to the name of the Kubernetes Pod. + This allows the secret to differentiate between StatefulSet replicas. + type: boolean + services: + default: [] + description: |- + The service scope allows Pod objects to specify custom scopes. + This should typically correspond to Service objects that the Pod participates in. + items: + type: string + type: array + type: object + secretClass: + description: '[SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) containing the LDAP bind credentials.' + type: string + required: + - secretClass + type: object + host: + description: 'Host of the S3 server without any protocol or port. For example: `west1.my-cloud.com`.' + type: string + port: + description: |- + Port the S3 server listens on. + If not specified the product will determine the port to use. + format: uint16 + maximum: 65535.0 + minimum: 0.0 + nullable: true + type: integer + region: + default: + name: us-east-1 + description: |- + Bucket region used for signing headers (sigv4). + + This defaults to `us-east-1` which is compatible with other implementations such as Minio. + + WARNING: Some products use the Hadoop S3 implementation which falls back to us-east-2. + properties: + name: + default: us-east-1 + type: string + type: object + tls: + description: Use a TLS connection. If not specified no TLS will be used. + nullable: true + properties: + verification: + description: The verification method used to verify the certificates of the server and/or the client. + oneOf: + - required: + - none + - required: + - server + properties: + none: + description: Use TLS but don't verify certificates. + type: object + server: + description: Use TLS and a CA certificate to verify the server. + properties: + caCert: + description: CA cert to verify the server. + oneOf: + - required: + - webPki + - required: + - secretClass + properties: + secretClass: + description: |- + Name of the [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass) which will provide the CA certificate. + Note that a SecretClass does not need to have a key but can also work with just a CA certificate, + so if you got provided with a CA cert but don't have access to the key you can still use this method. + type: string + webPki: + description: |- + Use TLS and the CA certificates trusted by the common web browsers to verify the server. + This can be useful when you e.g. use public AWS S3 or other public available services. + type: object + type: object + required: + - caCert + type: object + type: object + required: + - verification + type: object + required: + - host + type: object + reference: + type: string + type: object + sparkConf: + additionalProperties: + type: string + default: {} + description: A map of key/value strings that will be passed directly to spark-submit. + type: object + sparkImage: + anyOf: + - required: + - custom + - productVersion + - required: + - productVersion + description: |- + Specify which image to use, the easiest way is to only configure the `productVersion`. + You can also configure a custom image registry to pull from, as well as completely custom + images. + + Consult the [Product image selection documentation](https://docs.stackable.tech/home/nightly/concepts/product_image_selection) + for details. + properties: + custom: + description: |- + Overwrite the docker image. + Specify the full docker image name, e.g. `oci.stackable.tech/sdp/superset:1.4.1-stackable2.1.0` + type: string + productVersion: + description: Version of the product, e.g. `1.4.1`. + type: string + pullPolicy: + default: Always + description: '[Pull policy](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy) used when pulling the image.' + enum: + - IfNotPresent + - Always + - Never + type: string + pullSecrets: + description: '[Image pull secrets](https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod) to pull images from a private registry.' + items: + description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. + properties: + name: + description: 'Name of the referent. This field is effectively required, but due to backwards compatibility is allowed to be empty. Instances of this type with an empty value here are almost certainly wrong. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + required: + - name + type: object + nullable: true + type: array + repo: + description: Name of the docker repo, e.g. `oci.stackable.tech/sdp` + nullable: true + type: string + stackableVersion: + description: |- + Stackable version of the product, e.g. `23.4`, `23.4.1` or `0.0.0-dev`. + If not specified, the operator will use its own version, e.g. `23.4.1`. + When using a nightly operator or a pr version, it will use the nightly `0.0.0-dev` image. + nullable: true + type: string + type: object + vectorAggregatorConfigMapName: + description: |- + Name of the Vector aggregator [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery). + It must contain the key `ADDRESS` with the address of the Vector aggregator. + Follow the [logging tutorial](https://docs.stackable.tech/home/nightly/tutorials/logging-vector-aggregator) + to learn how to configure log aggregation with Vector. + nullable: true + type: string + volumes: + default: [] + description: A list of volumes that can be made available to the job, driver or executors via their volume mounts. + items: + type: object + x-kubernetes-preserve-unknown-fields: true + type: array + required: + - mainApplicationFile + - mode + - sparkImage + type: object + required: + - spec + title: SparkApplicationTemplate + type: object + served: true + storage: true + subresources: {} diff --git a/nix/sources.json b/nix/sources.json index f79c2cf0..10cce14f 100644 --- a/nix/sources.json +++ b/nix/sources.json @@ -17,10 +17,10 @@ "homepage": "", "owner": "kolloch", "repo": "crate2nix", - "rev": "be31feae9a82c225c0fd1bdf978565dc452a483a", - "sha256": "14d0ymlrwk7dynv35qcw4xn0dylfpwjmf6f8znflbk2l6fk23l12", + "rev": "26b698e804dd32dc5bb1995028fef00cc87d603a", + "sha256": "13jgy25yjd1m42xam6zri8vwx0n2qbwvpad2cmkhkrlx913n79ni", "type": "tarball", - "url": "https://github.com/kolloch/crate2nix/archive/be31feae9a82c225c0fd1bdf978565dc452a483a.tar.gz", + "url": "https://github.com/kolloch/crate2nix/archive/26b698e804dd32dc5bb1995028fef00cc87d603a.tar.gz", "url_template": "https://github.com///archive/.tar.gz" }, "nixpkgs": { @@ -29,10 +29,10 @@ "homepage": "", "owner": "NixOS", "repo": "nixpkgs", - "rev": "a7fc11be66bdfb5cdde611ee5ce381c183da8386", - "sha256": "0h3gvjbrlkvxhbxpy01n603ixv0pjy19n9kf73rdkchdvqcn70j2", + "rev": "26eaeac4e409d7b5a6bf6f90a2a2dc223c78d915", + "sha256": "1knl8dcr5ip70a2vbky3q844212crwrvybyw2nhfmgm1mvqry963", "type": "tarball", - "url": "https://github.com/NixOS/nixpkgs/archive/a7fc11be66bdfb5cdde611ee5ce381c183da8386.tar.gz", + "url": "https://github.com/NixOS/nixpkgs/archive/26eaeac4e409d7b5a6bf6f90a2a2dc223c78d915.tar.gz", "url_template": "https://github.com///archive/.tar.gz" } } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index d8582fd3..eecd346c 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ # DO NOT EDIT, this file is generated by operator-templating [toolchain] -channel = "1.89.0" +channel = "1.93.0" profile = "default" diff --git a/rust/operator-binary/Cargo.toml b/rust/operator-binary/Cargo.toml index 1c7e1073..608ac35b 100644 --- a/rust/operator-binary/Cargo.toml +++ b/rust/operator-binary/Cargo.toml @@ -25,6 +25,7 @@ tracing-futures.workspace = true clap.workspace = true futures.workspace = true tokio.workspace = true +regex.workspace = true indoc.workspace = true [dev-dependencies] diff --git a/rust/operator-binary/src/crd/job_dependencies.rs b/rust/operator-binary/src/crd/job_dependencies.rs new file mode 100644 index 00000000..5da1aa68 --- /dev/null +++ b/rust/operator-binary/src/crd/job_dependencies.rs @@ -0,0 +1,25 @@ +//! This module provides the JobDependencies definition for Spark applications. + +use serde::{Deserialize, Serialize}; +use stackable_operator::schemars::{self, JsonSchema}; + +#[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Eq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct JobDependencies { + /// Under the `requirements` you can specify Python dependencies that will be installed with `pip`. + /// Example: `tabulate==0.8.9` + #[serde(default)] + pub requirements: Vec, + + /// A list of packages that is passed directly to `spark-submit`. + #[serde(default)] + pub packages: Vec, + + /// A list of repositories that is passed directly to `spark-submit`. + #[serde(default)] + pub repositories: Vec, + + /// A list of excluded packages that is passed directly to `spark-submit`. + #[serde(default)] + pub exclude_packages: Vec, +} diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 993fad7c..386fcae8 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -46,18 +46,24 @@ use stackable_operator::{ use crate::{ config::jvm::construct_extra_java_options, - crd::roles::{ - RoleConfig, RoleConfigFragment, SparkApplicationRole, SparkContainer, SparkMode, - SubmitConfig, SubmitConfigFragment, VolumeMounts, + crd::{ + job_dependencies::JobDependencies, + roles::{ + RoleConfig, RoleConfigFragment, SparkApplicationRole, SparkContainer, SparkMode, + SubmitConfig, SubmitConfigFragment, VolumeMounts, + }, }, }; pub mod affinity; pub mod constants; pub mod history; +pub mod job_dependencies; pub mod listener_ext; pub mod logdir; pub mod roles; +pub mod template_merger; +pub mod template_spec; pub mod tlscerts; #[derive(Snafu, Debug)] @@ -112,7 +118,7 @@ pub enum Error { #[snafu(display("failed to configure S3 bucket"))] ConfigureS3Bucket { source: s3::v1alpha1::BucketError }, - #[snafu(display("failed to configure S3 copnnection"))] + #[snafu(display("failed to configure S3 connection"))] ConfigureS3Connection { source: s3::v1alpha1::ConnectionError, }, @@ -136,11 +142,20 @@ pub enum Error { )] pub mod versioned { + #[derive(Clone, Debug, Deserialize, PartialEq, Serialize, JsonSchema)] + #[serde(rename_all = "camelCase")] + pub(crate) struct ResolvedSparkApplicationTemplate { + pub name: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub uid: Option, + } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize, JsonSchema)] #[allow(clippy::derive_partial_eq_without_eq)] #[serde(rename_all = "camelCase")] pub struct SparkApplicationStatus { pub phase: String, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub resolved_template_ref: Vec, } /// A Spark cluster stacklet. This resource is managed by the Stackable operator for Apache Spark. @@ -238,27 +253,6 @@ pub mod versioned { #[serde(default, skip_serializing_if = "Option::is_none")] pub log_file_directory: Option, } - - #[derive(Clone, Debug, Default, Deserialize, JsonSchema, PartialEq, Eq, Serialize)] - #[serde(rename_all = "camelCase")] - pub struct JobDependencies { - /// Under the `requirements` you can specify Python dependencies that will be installed with `pip`. - /// Example: `tabulate==0.8.9` - #[serde(default)] - pub requirements: Vec, - - /// A list of packages that is passed directly to `spark-submit`. - #[serde(default)] - pub packages: Vec, - - /// A list of repositories that is passed directly to `spark-submit`. - #[serde(default)] - pub repositories: Vec, - - /// A list of excluded packages that is passed directly to `spark-submit`. - #[serde(default)] - pub exclude_packages: Vec, - } } impl v1alpha1::SparkApplication { diff --git a/rust/operator-binary/src/crd/template_merger.rs b/rust/operator-binary/src/crd/template_merger.rs new file mode 100644 index 00000000..bd5d2991 --- /dev/null +++ b/rust/operator-binary/src/crd/template_merger.rs @@ -0,0 +1,512 @@ +//! This module provides functionality for merging SparkApplication instances. + +use std::collections::HashMap; + +use stackable_operator::{ + config::merge::Merge, k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta, +}; + +use super::v1alpha1::SparkApplication; + +/// Deep merge two SparkApplication instances. +/// +/// This function merges all fields from the `overlay` SparkApplication into the `base` SparkApplication, +/// creating a new SparkApplication. The merge strategy is: +/// - Scalar fields: `overlay` value takes precedence if present +/// - Option fields: `overlay` value is used if `Some`, otherwise `base` value is kept +/// - Collections (HashMap, Vec): Items from both are combined +/// - Nested configurations: Deeply merged using the Merge trait +/// - Metadata: Combined with `overlay` metadata taking precedence for most fields +/// +/// # Arguments +/// +/// * `base` - The base SparkApplication to merge into +/// * `overlay` - The SparkApplication whose values will override/augment the base +/// +/// # Returns +/// +/// A new SparkApplication containing the merged result +pub fn deep_merge(base: &SparkApplication, overlay: &SparkApplication) -> SparkApplication { + let metadata = merge_metadata(&base.metadata, &overlay.metadata); + + // Merge spec fields + let spec = super::v1alpha1::SparkApplicationSpec { + // Scalar fields: overlay takes precedence + mode: overlay.spec.mode.clone(), + main_application_file: overlay.spec.main_application_file.clone(), + + // Option fields: overlay if Some, otherwise base + main_class: overlay + .spec + .main_class + .clone() + .or_else(|| base.spec.main_class.clone()), + image: overlay + .spec + .image + .clone() + .or_else(|| base.spec.image.clone()), + vector_aggregator_config_map_name: overlay + .spec + .vector_aggregator_config_map_name + .clone() + .or_else(|| base.spec.vector_aggregator_config_map_name.clone()), + s3connection: overlay + .spec + .s3connection + .clone() + .or_else(|| base.spec.s3connection.clone()), + log_file_directory: overlay + .spec + .log_file_directory + .clone() + .or_else(|| base.spec.log_file_directory.clone()), + + // Product image: overlay takes precedence + spark_image: overlay.spec.spark_image.clone(), + + // Merge job configuration + job: merge_common_config(base.spec.job.as_ref(), overlay.spec.job.as_ref()), + + // Merge driver configuration + driver: merge_common_config(base.spec.driver.as_ref(), overlay.spec.driver.as_ref()), + + // Merge executor configuration (RoleGroup) + executor: merge_role_group(base.spec.executor.as_ref(), overlay.spec.executor.as_ref()), + + // Merge collections + spark_conf: merge_hashmap(&base.spec.spark_conf, &overlay.spec.spark_conf), + args: merge_vec(&base.spec.args, &overlay.spec.args), + volumes: merge_vec(&base.spec.volumes, &overlay.spec.volumes), + env: merge_vec(&base.spec.env, &overlay.spec.env), + + // Merge deps + deps: merge_deps(&base.spec.deps, &overlay.spec.deps), + }; + + // Status: use overlay if present, otherwise base + let status = overlay.status.clone().or_else(|| base.status.clone()); + + SparkApplication { + metadata, + spec, + status, + } +} + +/// Merge ObjectMeta, with overlay taking precedence for most fields +// TODO: figure out if it is the right thing to copy all metadata from templates into apps +// or if this might also be made configurable via annotations +fn merge_metadata(base: &ObjectMeta, overlay: &ObjectMeta) -> ObjectMeta { + ObjectMeta { + name: overlay.name.clone().or_else(|| base.name.clone()), + namespace: overlay.namespace.clone().or_else(|| base.namespace.clone()), + labels: merge_option_hashmap(&base.labels, &overlay.labels), + annotations: merge_option_hashmap(&base.annotations, &overlay.annotations), + owner_references: overlay + .owner_references + .clone() + .or_else(|| base.owner_references.clone()), + finalizers: overlay + .finalizers + .clone() + .or_else(|| base.finalizers.clone()), + uid: overlay.uid.clone().or_else(|| base.uid.clone()), + ..Default::default() + } +} + +/// Merge two Option> +fn merge_option_hashmap( + base: &Option>, + overlay: &Option>, +) -> Option> { + match (base, overlay) { + (None, None) => None, + (Some(b), None) => Some(b.clone()), + (None, Some(o)) => Some(o.clone()), + (Some(b), Some(o)) => { + let mut merged = b.clone(); + merged.extend(o.clone()); + Some(merged) + } + } +} + +/// Merge two HashMaps, with overlay values taking precedence +fn merge_hashmap(base: &HashMap, overlay: &HashMap) -> HashMap +where + K: Eq + std::hash::Hash + Clone, + V: Clone, +{ + let mut merged = base.clone(); + merged.extend(overlay.clone()); + merged +} + +/// Merge two Vecs by concatenating them +fn merge_vec(base: &[T], overlay: &[T]) -> Vec { + let mut merged = base.to_vec(); + merged.extend_from_slice(overlay); + merged +} + +/// Merge CommonConfiguration using the Merge trait +fn merge_common_config( + base: Option<&stackable_operator::role_utils::CommonConfiguration>, + overlay: Option<&stackable_operator::role_utils::CommonConfiguration>, +) -> Option> +where + C: Clone + Merge, + R: Clone, +{ + match (base, overlay) { + (None, None) => None, + (Some(b), None) => Some(b.clone()), + (None, Some(o)) => Some(o.clone()), + (Some(b), Some(o)) => { + // Clone the base and merge the overlay config into it + let mut merged = b.clone(); + merged.config.merge(&o.config); + Some(merged) + } + } +} + +/// Merge RoleGroup +fn merge_role_group( + base: Option<&stackable_operator::role_utils::RoleGroup>, + overlay: Option<&stackable_operator::role_utils::RoleGroup>, +) -> Option> +where + C: Clone + Merge, + R: Clone, +{ + match (base, overlay) { + (None, None) => None, + (Some(b), None) => Some(b.clone()), + (None, Some(o)) => Some(o.clone()), + (Some(b), Some(o)) => { + // Clone the base and merge overlay + let mut merged = b.clone(); + merged.config.config.merge(&o.config.config); + // Use overlay replicas if present + if o.replicas.is_some() { + merged.replicas = o.replicas; + } + Some(merged) + } + } +} + +/// Merge JobDependencies +fn merge_deps( + base: &super::job_dependencies::JobDependencies, + overlay: &super::job_dependencies::JobDependencies, +) -> super::job_dependencies::JobDependencies { + super::job_dependencies::JobDependencies { + requirements: merge_vec(&base.requirements, &overlay.requirements), + packages: merge_vec(&base.packages, &overlay.packages), + repositories: merge_vec(&base.repositories, &overlay.repositories), + exclude_packages: merge_vec(&base.exclude_packages, &overlay.exclude_packages), + } +} + +#[cfg(test)] +mod tests { + use indoc::indoc; + + use super::*; + + #[test] + fn test_deep_merge_basic() { + let base = serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: base-app + namespace: default + spec: + mode: cluster + mainApplicationFile: base.py + mainClass: BaseClass + sparkImage: + productVersion: "3.5.0" + args: + - arg1 + "#}) + .unwrap(); + + let overlay = serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: overlay-app + uid: e8990dae-3f4c-417c-8b08-b2770e347d07 + spec: + mode: cluster + mainApplicationFile: overlay.py + sparkImage: + productVersion: "3.5.1" + args: + - arg2 + "#}) + .unwrap(); + + let merged = deep_merge(&base, &overlay); + + // overlay name should win + assert_eq!(merged.metadata.name, Some("overlay-app".to_string())); + // namespace from base should be preserved + assert_eq!(merged.metadata.namespace, Some("default".to_string())); + // metadata uid should be preserved + assert_eq!( + merged.metadata.uid, + Some("e8990dae-3f4c-417c-8b08-b2770e347d07".to_string()) + ); + // overlay main_application_file should win + assert_eq!(merged.spec.main_application_file, "overlay.py"); + // base main_class should be preserved since overlay is None + assert_eq!(merged.spec.main_class, Some("BaseClass".to_string())); + // overlay image should be used + assert_eq!(merged.spec.spark_image.product_version(), "3.5.1"); + // args should be concatenated + assert_eq!(merged.spec.args, vec!["arg1", "arg2"]); + } + + #[test] + fn test_deep_merge_spark_conf() { + let base = serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: base-app + spec: + mode: cluster + sparkImage: + productVersion: "3.5.8" + mainApplicationFile: base.py + sparkConf: + "spark.executor.memory": "4g" + "spark.executor.cores": "2" + "#}) + .unwrap(); + + let overlay = serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: overlay-app + spec: + mode: cluster + sparkImage: + productVersion: "4.1.0" + mainApplicationFile: overlay.py + sparkConf: + "spark.executor.cores": "4" + "spark.executor.instances": "3" + "#}) + .unwrap(); + + let merged = deep_merge(&base, &overlay); + + // spark_conf should be merged with overlay taking precedence for conflicting keys + assert_eq!( + merged.spec.spark_conf.get("spark.executor.memory"), + Some(&"4g".to_string()) + ); + assert_eq!( + merged.spec.spark_conf.get("spark.executor.cores"), + Some(&"4".to_string()) + ); + assert_eq!( + merged.spec.spark_conf.get("spark.executor.instances"), + Some(&"3".to_string()) + ); + } + + #[test] + fn test_deep_merge_job() { + let base = serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: base-app + spec: + mode: cluster + mainApplicationFile: base.py + sparkImage: + productVersion: "3.5.8" + "#}) + .unwrap(); + + let overlay = serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: overlay-app + spec: + mode: cluster + mainApplicationFile: overlay.py + sparkImage: + productVersion: "4.1.0" + job: + config: + retryOnFailureCount: 2 + "#}) + .unwrap(); + + let merged = deep_merge(&base, &overlay); + + // job config should be merged with overlay taking precedence for conflicting keys + assert_eq!( + merged + .spec + .job + .as_ref() + .unwrap() + .config + .retry_on_failure_count, + Some(2u16) + ); + } + + #[test] + fn test_merge_two_templates_into_spark_application() { + let template_a = serde_yaml::from_str::< + crate::crd::template_spec::v1alpha1::SparkApplicationTemplate, + >(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplicationTemplate + metadata: + name: template-a + spec: + mode: cluster + mainApplicationFile: local:///placeholder.jar + mainClass: com.example.Job + sparkImage: + productVersion: "3.5.8" + sparkConf: + "spark.executor.memory": "4g" + args: + - "--verbose" + deps: + requirements: + - "numpy==1.21" + "#}) + .unwrap(); + + let template_b = serde_yaml::from_str::< + crate::crd::template_spec::v1alpha1::SparkApplicationTemplate, + >(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplicationTemplate + metadata: + name: template-b + spec: + mode: cluster + mainApplicationFile: local:///placeholder.jar + sparkImage: + productVersion: "3.5.8" + sparkConf: + "spark.executor.memory": "8g" + "spark.executor.cores": "4" + args: + - "--output" + - "/tmp/out" + deps: + requirements: + - "pandas==2.0" + "#}) + .unwrap(); + + let spark_app = serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: my-spark-app + namespace: default + spec: + mode: cluster + mainApplicationFile: local:///app.jar + sparkImage: + productVersion: "3.5.8" + sparkConf: + "spark.driver.memory": "2g" + args: + - "--app-arg" + deps: + requirements: + - "scipy==1.9" + "#}) + .unwrap(); + + // Convert templates to SparkApplication and merge left-to-right: + // template-a has the lowest priority, the spark application has the highest. + let app_from_a = crate::crd::v1alpha1::SparkApplication::from(&template_a); + let app_from_b = crate::crd::v1alpha1::SparkApplication::from(&template_b); + // This how `merge_application_templates()` does it + let template_apps = [app_from_a, app_from_b, spark_app]; + let merged = template_apps + .into_iter() + .reduce(|merge_app, app| deep_merge(&merge_app, &app)) + .expect("bad dev"); + + // mainClass set only in template-a; neither template-b nor the app overrides it + assert_eq!( + merged.spec.main_class, + Some("com.example.Job".to_string()), + "mainClass should come from template-a" + ); + + // The app's mainApplicationFile wins + assert_eq!( + merged.spec.main_application_file, "local:///app.jar", + "mainApplicationFile should come from the spark application" + ); + + // template-b overrides template-a's executor memory value + assert_eq!( + merged.spec.spark_conf.get("spark.executor.memory"), + Some(&"8g".to_string()), + "spark.executor.memory should be overridden by template-b" + ); + + // template-b contributes executor cores (not present in template-a or the app) + assert_eq!( + merged.spec.spark_conf.get("spark.executor.cores"), + Some(&"4".to_string()), + "spark.executor.cores should come from template-b" + ); + + // driver memory is set only by the spark application + assert_eq!( + merged.spec.spark_conf.get("spark.driver.memory"), + Some(&"2g".to_string()), + "spark.driver.memory should come from the spark application" + ); + + // args are concatenated in priority order + assert_eq!( + merged.spec.args, + vec!["--verbose", "--output", "/tmp/out", "--app-arg"], + "args should be concatenated from all sources in order" + ); + + // deps.requirements are concatenated in priority order + assert_eq!( + merged.spec.deps.requirements, + vec!["numpy==1.21", "pandas==2.0", "scipy==1.9"], + "deps.requirements should be concatenated from all sources in order" + ); + } +} diff --git a/rust/operator-binary/src/crd/template_spec.rs b/rust/operator-binary/src/crd/template_spec.rs new file mode 100644 index 00000000..bc53ea47 --- /dev/null +++ b/rust/operator-binary/src/crd/template_spec.rs @@ -0,0 +1,539 @@ +//! This module provides the SparkApplicationTemplateSpec CRD definition. + +use std::{collections::HashMap, num::ParseIntError, str::ParseBoolError}; + +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + commons::product_image_selection::ProductImage, + crd::s3, + k8s_openapi::api::core::v1::{EnvVar, Volume}, + kube::{Api, CustomResource, ResourceExt, api::ListParams}, + role_utils::{CommonConfiguration, JavaCommonConfig, RoleGroup}, + schemars::{self, JsonSchema}, + utils::crds::raw_object_list_schema, + versioned::versioned, +}; +use strum::{EnumDiscriminants, IntoStaticStr}; + +use super::{ + history::LogFileDirectorySpec, + job_dependencies::JobDependencies, + roles::{RoleConfigFragment, SparkMode, SubmitConfigFragment}, +}; +use crate::crd::template_merger::deep_merge; + +#[derive(Snafu, Debug, EnumDiscriminants)] +#[strum_discriminants(derive(IntoStaticStr))] +#[allow(clippy::enum_variant_names)] +pub enum Error { + #[snafu(display("failed to build template merge options from application annotations"))] + BuildMergeTemplateOptions, + + #[snafu(display( + "invalid index value [{value}] for template names. value must be non negative integer" + ))] + InvalidAnnotationTemplateIndex { + source: ParseIntError, + value: String, + }, + + #[snafu(display("invalid regex for template names annotation"))] + InvalidAnnotationTemplateNameRx { source: regex::Error }, + + #[snafu(display("invalid value [{value}] for annotation [{name}]"))] + InvalidAnnotationBooleanValue { + source: ParseBoolError, + name: String, + value: String, + }, + + #[snafu(display("failed to list SparkApplicationTemplate named [{template_name}]"))] + ListSparkApplicationTemplates { + template_name: String, + source: stackable_operator::kube::Error, + }, +} + +#[versioned( + version(name = "v1alpha1"), + crates( + kube_core = "stackable_operator::kube::core", + kube_client = "stackable_operator::kube::client", + k8s_openapi = "stackable_operator::k8s_openapi", + schemars = "stackable_operator::schemars", + versioned = "stackable_operator::versioned" + ) +)] +pub mod versioned { + + /// A Spark application template. This resource is managed by the Stackable operator for Apache Spark. + /// Find more information on how to use it and the resources that the operator generates in the + /// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/). + #[versioned(crd( + group = "spark.stackable.tech", + plural = "sparkapptemplates", + shortname = "sparkapptemplate", + ))] + #[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct SparkApplicationTemplateSpec { + /// Mode: cluster or client. Currently only cluster is supported. + pub mode: SparkMode, + + /// The main class - i.e. entry point - for JVM artifacts. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub main_class: Option, + + /// The actual application file that will be called by `spark-submit`. + pub main_application_file: String, + + /// User-supplied image containing spark-job dependencies that will be copied to the specified volume mount. + /// See the [examples](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/examples). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub image: Option, + + // no doc - docs in ProductImage struct. + pub spark_image: ProductImage, + + /// Name of the Vector aggregator [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery). + /// It must contain the key `ADDRESS` with the address of the Vector aggregator. + /// Follow the [logging tutorial](DOCS_BASE_URL_PLACEHOLDER/tutorials/logging-vector-aggregator) + /// to learn how to configure log aggregation with Vector. + #[serde(skip_serializing_if = "Option::is_none")] + pub vector_aggregator_config_map_name: Option, + + /// The job builds a spark-submit command, complete with arguments and referenced dependencies + /// such as templates, and passes it on to Spark. + /// The reason this property uses its own type (SubmitConfigFragment) is because logging is not + /// supported for spark-submit processes. + // + // IMPORTANT: Please note that the jvmArgumentOverrides have no effect here! + // However, due to product-config things I wasn't able to remove them. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub job: Option>, + + /// The driver role specifies the configuration that, together with the driver pod template, is used by + /// Spark to create driver pods. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub driver: Option>, + + /// The executor role specifies the configuration that, together with the driver pod template, is used by + /// Spark to create the executor pods. + /// This is RoleGroup instead of plain CommonConfiguration because it needs to allow for the number of replicas. + /// to be specified. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub executor: Option>, + + /// A map of key/value strings that will be passed directly to spark-submit. + #[serde(default)] + pub spark_conf: HashMap, + + /// Job dependencies: a list of python packages that will be installed via pip, a list of packages + /// or repositories that is passed directly to spark-submit, or a list of excluded packages + /// (also passed directly to spark-submit). + #[serde(default)] + pub deps: JobDependencies, + + /// Configure an S3 connection that the SparkApplication has access to. + /// Read more in the [Spark S3 usage guide](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/s3). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub s3connection: Option, + + /// Arguments passed directly to the job artifact. + #[serde(default)] + pub args: Vec, + + /// A list of volumes that can be made available to the job, driver or executors via their volume mounts. + #[serde(default)] + #[schemars(schema_with = "raw_object_list_schema")] + pub volumes: Vec, + + /// A list of environment variables that will be set in the job pod and the driver and executor + /// pod templates. + #[serde(default)] + pub env: Vec, + + /// The log file directory definition used by the Spark history server. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub log_file_directory: Option, + } +} + +impl From<&v1alpha1::SparkApplicationTemplate> + for super::v1alpha1::ResolvedSparkApplicationTemplate +{ + fn from(value: &v1alpha1::SparkApplicationTemplate) -> Self { + Self { + name: value.name_any(), + uid: value.metadata.uid.clone(), + } + } +} + +impl From<&v1alpha1::SparkApplicationTemplate> for super::v1alpha1::SparkApplication { + fn from(template: &v1alpha1::SparkApplicationTemplate) -> super::v1alpha1::SparkApplication { + let spec = super::v1alpha1::SparkApplicationSpec { + mode: template.spec.mode.clone(), + main_class: template.spec.main_class.clone(), + main_application_file: template.spec.main_application_file.clone(), + image: template.spec.image.clone(), + spark_image: template.spec.spark_image.clone(), + vector_aggregator_config_map_name: template + .spec + .vector_aggregator_config_map_name + .clone(), + job: template.spec.job.clone(), + driver: template.spec.driver.clone(), + executor: template.spec.executor.clone(), + spark_conf: template.spec.spark_conf.clone(), + deps: template.spec.deps.clone(), + s3connection: template.spec.s3connection.clone(), + args: template.spec.args.clone(), + volumes: template.spec.volumes.clone(), + env: template.spec.env.clone(), + log_file_directory: template.spec.log_file_directory.clone(), + }; + + super::v1alpha1::SparkApplication { + metadata: template.metadata.clone(), + spec, + status: None, + } + } +} + +// Values of this type are built from the metadata.annotations of the spark application objects. +#[derive(Default)] +struct MergeTemplateOptions { + merge: bool, + template_names: Vec, + update_strategy: TemplateUpdateStrategy, + apply_strategy: TemplateApplyStrategy, +} + +#[derive(Default, Debug, PartialEq, strum::EnumString)] +#[strum(serialize_all = "camelCase")] +enum TemplateUpdateStrategy { + #[default] + OnCreate, +} + +#[derive(Default, Debug, PartialEq, strum::EnumString)] +#[strum(serialize_all = "camelCase")] +enum TemplateApplyStrategy { + #[default] + Enforce, +} + +// This annotation regex selects the template names to apply. +// The value determines the merge order. +const ANNO_TEMPLATE_NAME_RX: &str = "^spark-application\\.template\\.(?P\\d+)\\.name$"; +// A boolean that enable/disables template merging. +const ANNO_TEMPLATE_MERGE: &str = "spark-application.template.merge"; +// This annotation instructs the operator when to update patched applications. +// Currently templates are merged only once in the lifetime of a spark application, namely at creation time. +const ANNO_TEMPLATE_UPDATE_STRATEGY: &str = "spark-application.template.updateStrategy"; +// This tells the operator how to handle merging errors. +// Currently only "enforce" is supported, meaning: fail in case of errors. +const ANNO_TEMPLATE_APPLY_STRATEGY: &str = "spark-application.template.applyStrategy"; + +impl TryFrom<&super::v1alpha1::SparkApplication> for MergeTemplateOptions { + type Error = Error; + + // Build a `MergeTemplateOptions` value from the metadata annotations of a `SparkApplication`. + // The `template_names` are sorted in the order in which they are applied as specified by the `index`. + fn try_from(app: &super::v1alpha1::SparkApplication) -> Result { + if let Some(annos) = app.metadata.annotations.as_ref() { + let merge: bool = match annos.get(ANNO_TEMPLATE_MERGE) { + Some(v) => { + v.parse::() + .with_context(|_| InvalidAnnotationBooleanValueSnafu { + name: ANNO_TEMPLATE_MERGE.to_string(), + value: v.to_string(), + })? + } + _ => false, + }; + let update_strategy = match annos.get(ANNO_TEMPLATE_UPDATE_STRATEGY) { + Some(v) => v.parse::().unwrap_or_default(), + _ => TemplateUpdateStrategy::default(), + }; + let apply_strategy = match annos.get(ANNO_TEMPLATE_APPLY_STRATEGY) { + Some(v) => v.parse::().unwrap_or_default(), + _ => TemplateApplyStrategy::default(), + }; + + // Extract template indexes and names. + // Sort by indexes and discard them. + let template_name_rx = + Regex::new(ANNO_TEMPLATE_NAME_RX).context(InvalidAnnotationTemplateNameRxSnafu)?; + + let mut template_index_name = vec![]; + for (k, v) in annos.iter() { + if let Some(caps) = template_name_rx.captures(k) { + let index = caps["index"].parse::().context( + InvalidAnnotationTemplateIndexSnafu { + value: caps["index"].to_string(), + }, + )?; + template_index_name.push((index, v)); + } + } + template_index_name.sort_by_key(|(index, _)| *index); + let template_names: Vec = template_index_name + .iter() + .map(|(_, v)| (*v).clone()) + .collect(); + + Ok(MergeTemplateOptions { + merge, + update_strategy, + apply_strategy, + template_names, + }) + } else { + Ok(MergeTemplateOptions::default()) + } + } +} + +pub(crate) struct MergeTemplateResult { + pub app: Option, + pub resolved_template_ref: Vec, +} + +// Merges one or more [`SparkApplicationTemplate`](v1alpha1::SparkApplicationTemplate) resources +// into the given [`SparkApplication`](super::v1alpha1::SparkApplication). +// +// Template merging is controlled by annotations on the `SparkApplication`. +// +// The function returns an empty [`MergeTemplateResult`] immediately if: +// +// 1. `spark-application.template.merge` annotation is `"false"`. +// 2. `spark-application.template.merge` annotation is `"true"` +// and `spark-application.template.updateStrategy` annotation is `"onCreate"` +// and `spark_application.status.resolved_template_ref` is not empty. +// +// When merging is enabled, the function: +// +// 1. Parses the merge options (template names, update strategy, apply strategy) from the +// application's annotations. +// 2. Checks whether templates have already been applied and, if the update strategy is +// [`TemplateUpdateStrategy::OnCreate`], skips re-applying them. +// 3. Resolves the named templates from the Kubernetes API in the order defined by their index +// annotations. +// 4. Performs a deep merge of all resolved templates followed by the `SparkApplication` itself +// (left-to-right, with the application having the highest priority). +// +// # Returns +// +// - `Ok(MergeTemplateResult { app: Some(...), resolved_template_ref: [...] })` when at least one +// template was found and merged. +// - `Ok(MergeTemplateResult { app: None, resolved_template_ref: [] })` when merging is disabled, +// the update strategy prevents re-applying, or no templates were resolved. +// - `Err(Error)` if annotation parsing fails or a Kubernetes API call returns an error. +pub(crate) async fn merge_application_templates( + client: &stackable_operator::client::Client, + spark_application: &super::v1alpha1::SparkApplication, +) -> Result { + let app_name = spark_application.name_any(); + + tracing::info!("app [{app_name}] : begin template merging"); + + let default_result = MergeTemplateResult { + app: None, + resolved_template_ref: vec![], + }; + + let merge_template_options = MergeTemplateOptions::try_from(spark_application)?; + + if merge_template_options.merge { + let have_resolved_template_refs = spark_application + .status + .as_ref() + .map(|s| s.resolved_template_ref.is_empty()) + .unwrap_or(false); + if have_resolved_template_refs + && merge_template_options.update_strategy == TemplateUpdateStrategy::OnCreate + { + tracing::info!("app [{app_name}] : templates already merged."); + // Templates have already been applied and the update strategy (OnCreate) only allows + // to apply them once (on creation). + return Ok(default_result); + } + + // Retrieve the template objects from the kube api. + // In the future if we support additional strategies in addition to "enforce", + // this list might not be identical to the one in `merge_template_options` + // because some objects might be missing. + let templates = resolve( + client, + &merge_template_options.template_names, + merge_template_options.apply_strategy, + ) + .await?; + + if !templates.is_empty() { + // The list of apps from templates in the correct order. + // The final element is the actual Spark application being reconciled + // which has the highest priority during merging. + let mut template_apps: Vec = templates + .iter() + .map(super::v1alpha1::SparkApplication::from) + .collect::>(); + template_apps.push(spark_application.clone()); + + // Deep merge app templates from left to right + let merged_app = template_apps + .into_iter() + .reduce(|merge_app, app| deep_merge(&merge_app, &app)); + + // In the future, when different apply strategies are supported, the effective template + // list might differ from what is in the app annotations so make sure we return the correct one. + let effective_template_list = templates + .iter() + .map(super::v1alpha1::ResolvedSparkApplicationTemplate::from) + .collect::>(); + + tracing::info!( + "app [{app_name}] : successfully merged templates [{tnames}]", + tnames = effective_template_list + .iter() + .map(|rsat| rsat.name.clone()) + .collect::>() + .join(",") + ); + return Ok(MergeTemplateResult { + app: merged_app, + resolved_template_ref: effective_template_list, + }); + } else { + tracing::warn!("app [{app_name}]: template merging enabled but no templates resolved") + } + } else { + tracing::info!("app [{app_name}]: no templates to merge") + } + + tracing::info!("app [{app_name}] : done template merging"); + + Ok(default_result) +} + +async fn resolve( + client: &stackable_operator::client::Client, + template_names: &[String], + apply_strategy: TemplateApplyStrategy, +) -> Result, Error> { + if template_names.is_empty() { + return Ok(vec![]); + } + + let templates_api = Api::::all(client.as_kube_client()); + let mut resolved_templates = Vec::new(); + for template_name in template_names { + let template_res = templates_api + .list(&ListParams::default().fields(&format!("metadata.name={template_name}"))) + .await + .with_context(|_| ListSparkApplicationTemplatesSnafu { template_name }) + .map(|object_list| object_list.items.into_iter().next()); + + let template = if apply_strategy == TemplateApplyStrategy::Enforce { + // When the apply strategy is "enforce" we immediately raise an error, + // otherwise we ignore it and skip to the next template in the list. + template_res? + } else { + template_res.ok().flatten() + }; + + if let Some(template) = template { + resolved_templates.push(template); + } + } + + Ok(resolved_templates) +} + +#[cfg(test)] +mod tests { + use indoc::indoc; + + use super::*; + + #[test] + fn try_from_parses_annotations_and_sorts_template_names() { + let spark_application = + serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: app-with-templates + annotations: + spark-application.template.merge: "true" + spark-application.template.updateStrategy: "onCreate" + spark-application.template.applyStrategy: "enforce" + spark-application.template.2.name: "template-c" + spark-application.template.0.name: "template-a" + spark-application.template.1.name: "template-b" + spec: + mode: cluster + mainApplicationFile: local:///app.py + sparkImage: + productVersion: "3.5.8" + "#}) + .unwrap(); + + let options = MergeTemplateOptions::try_from(&spark_application).unwrap(); + + assert!(options.merge); + assert!(matches!( + options.update_strategy, + TemplateUpdateStrategy::OnCreate + )); + assert!(matches!( + options.apply_strategy, + TemplateApplyStrategy::Enforce + )); + assert_eq!( + options.template_names, + vec![ + "template-a".to_string(), + "template-b".to_string(), + "template-c".to_string() + ] + ); + } + + #[test] + fn try_from_without_annotations_returns_default_options() { + let spark_application = + serde_yaml::from_str::(indoc! {r#" + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: app-without-annotations + spec: + mode: cluster + mainApplicationFile: local:///app.py + sparkImage: + productVersion: "3.5.8" + "#}) + .unwrap(); + + let options = MergeTemplateOptions::try_from(&spark_application).unwrap(); + + assert!(!options.merge); + assert!(matches!( + options.update_strategy, + TemplateUpdateStrategy::OnCreate + )); + assert!(matches!( + options.apply_strategy, + TemplateApplyStrategy::Enforce + )); + assert!(options.template_names.is_empty()); + } +} diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 83aafd4c..ded09475 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -42,6 +42,7 @@ use crate::{ SPARK_CONTROLLER_NAME, SPARK_FULL_CONTROLLER_NAME, }, history::SparkHistoryServer, + template_spec::{SparkApplicationTemplate, SparkApplicationTemplateVersion}, }, webhooks::conversion::create_webhook_server, }; @@ -86,6 +87,8 @@ async fn main() -> anyhow::Result<()> { .print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?; SparkConnectServer::merged_crd(SparkConnectServerVersion::V1Alpha1)? .print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?; + SparkApplicationTemplate::merged_crd(SparkApplicationTemplateVersion::V1Alpha1)? + .print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?; } Command::Run(RunArguments { operator_environment, diff --git a/rust/operator-binary/src/pod_driver_controller.rs b/rust/operator-binary/src/pod_driver_controller.rs index ad90d38e..2587ed86 100644 --- a/rust/operator-binary/src/pod_driver_controller.rs +++ b/rust/operator-binary/src/pod_driver_controller.rs @@ -113,6 +113,11 @@ pub async fn reconcile(pod: Arc>, client: Arc) -> &app, &v1alpha1::SparkApplicationStatus { phase: phase.clone(), + resolved_template_ref: app + .status + .as_ref() + .map(|s| s.resolved_template_ref.clone()) + .unwrap_or_default(), }, ) .await diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index c06a321f..6a50f86e 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -70,6 +70,11 @@ use crate::{ #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { + #[snafu(display("failed to merge application templates"))] + MergeApplicationTemplates { + source: crate::crd::template_spec::Error, + }, + #[snafu(display("missing secret lifetime"))] MissingSecretLifetime, @@ -240,6 +245,21 @@ pub async fn reconcile( return Ok(Action::await_change()); } + // It is important to do this at the top of the reconciliation function to ensure + // all referenced resources and configuration are merged before any of them are created. + let merged_template_result = + &crate::crd::template_spec::merge_application_templates(client, spark_application) + .await + .context(MergeApplicationTemplatesSnafu)?; + let spark_application = match &merged_template_result.app { + Some(app) => app, + None => spark_application, + }; + + // This is the final version of the spark app to reconcile. + // No more mutating operations after this point (except for status). + tracing::debug!("reconciling spark application [{spark_application:?}]"); + let opt_s3conn = match spark_application.spec.s3connection.as_ref() { Some(s3bd) => Some( s3bd.clone() @@ -415,6 +435,7 @@ pub async fn reconcile( spark_application, &v1alpha1::SparkApplicationStatus { phase: "Unknown".to_string(), + resolved_template_ref: merged_template_result.resolved_template_ref.clone(), }, ) .await diff --git a/rust/operator-binary/src/webhooks/conversion.rs b/rust/operator-binary/src/webhooks/conversion.rs index 9ad30049..f32c6fdd 100644 --- a/rust/operator-binary/src/webhooks/conversion.rs +++ b/rust/operator-binary/src/webhooks/conversion.rs @@ -14,6 +14,7 @@ use crate::{ SparkApplication, SparkApplicationVersion, constants::FIELD_MANAGER, history::{SparkHistoryServer, SparkHistoryServerVersion}, + template_spec::{SparkApplicationTemplate, SparkApplicationTemplateVersion}, }, }; @@ -50,6 +51,11 @@ pub async fn create_webhook_server( .context(MergeCrdSnafu)?, SparkApplication::try_convert as fn(_) -> _, ), + ( + SparkApplicationTemplate::merged_crd(SparkApplicationTemplateVersion::V1Alpha1) + .context(MergeCrdSnafu)?, + SparkApplicationTemplate::try_convert as fn(_) -> _, + ), ]; let conversion_webhook_options = ConversionWebhookOptions { diff --git a/tests/templates/kuttl/spark-history-server/08-deploy-spark-app-template.yaml.j2 b/tests/templates/kuttl/spark-history-server/08-deploy-spark-app-template.yaml.j2 new file mode 100644 index 00000000..c7271d30 --- /dev/null +++ b/tests/templates/kuttl/spark-history-server/08-deploy-spark-app-template.yaml.j2 @@ -0,0 +1,38 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplicationTemplate +metadata: + name: template-pi-s3 +spec: +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + sparkImage: +{% if test_scenario['values']['spark'].find(",") > 0 %} + custom: "{{ test_scenario['values']['spark'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['spark'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['spark'] }}" +{% endif %} + pullPolicy: IfNotPresent + mode: cluster + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: "/stackable/spark/examples/jars/spark-examples.jar" + sparkConf: + spark.kubernetes.file.upload.path: "s3a://my-bucket" + s3connection: + reference: spark-history-s3-connection + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + reference: spark-history-s3-bucket + driver: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + executor: + replicas: 1 + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 index 314abcea..b1c93d44 100644 --- a/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-history-server/10-deploy-spark-app.yaml.j2 @@ -3,10 +3,15 @@ apiVersion: spark.stackable.tech/v1alpha1 kind: SparkApplication metadata: name: spark-pi-s3-1 + annotations: + spark-application.template.merge: "true" + spark-application.template.0.name: "template-pi-s3" spec: -{% if lookup('env', 'VECTOR_AGGREGATOR') %} - vectorAggregatorConfigMapName: vector-aggregator-discovery -{% endif %} + # + # The fields below are mandatory in the v1alpha1 version of SparkApplicationSpec + # and therefore must be present. + # They are also final, meaning they cannot be overridden by templates. + # sparkImage: {% if test_scenario['values']['spark'].find(",") > 0 %} custom: "{{ test_scenario['values']['spark'].split(',')[1] }}" @@ -14,25 +19,6 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['spark'] }}" {% endif %} - pullPolicy: IfNotPresent mode: cluster mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "/stackable/spark/examples/jars/spark-examples.jar" - sparkConf: - spark.kubernetes.file.upload.path: "s3a://my-bucket" - s3connection: - reference: spark-history-s3-connection - logFileDirectory: - s3: - prefix: eventlogs/ - bucket: - reference: spark-history-s3-bucket - driver: - config: - logging: - enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} - executor: - replicas: 1 - config: - logging: - enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} diff --git a/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 index a3bf7f1a..2ea61a61 100644 --- a/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 +++ b/tests/templates/kuttl/spark-history-server/12-deploy-spark-app.yaml.j2 @@ -3,10 +3,15 @@ apiVersion: spark.stackable.tech/v1alpha1 kind: SparkApplication metadata: name: spark-pi-s3-2 + annotations: + spark-application.template.merge: "true" + spark-application.template.0.name: "template-pi-s3" spec: -{% if lookup('env', 'VECTOR_AGGREGATOR') %} - vectorAggregatorConfigMapName: vector-aggregator-discovery -{% endif %} + # + # The fields below are mandatory in the v1alpha1 version of SparkApplicationSpec + # and therefore must be present. + # They are also final, meaning they cannot be overridden by templates. + # sparkImage: {% if test_scenario['values']['spark'].find(",") > 0 %} custom: "{{ test_scenario['values']['spark'].split(',')[1] }}" @@ -14,25 +19,6 @@ spec: {% else %} productVersion: "{{ test_scenario['values']['spark'] }}" {% endif %} - pullPolicy: IfNotPresent mode: cluster mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "/stackable/spark/examples/jars/spark-examples.jar" - sparkConf: - spark.kubernetes.file.upload.path: "s3a://my-bucket" - s3connection: - reference: spark-history-s3-connection - logFileDirectory: - s3: - prefix: eventlogs/ - bucket: - reference: spark-history-s3-bucket - driver: - config: - logging: - enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} - executor: - replicas: 1 - config: - logging: - enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}