From b42190814214b037bc2b273bf5228d69db687756 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 13 Feb 2026 09:51:09 -0500 Subject: [PATCH 1/3] Passed CometFuzzTestSuite and ParquetEncryptionITCase. --- native/core/src/execution/planner.rs | 37 ++++---- native/proto/src/proto/operator.proto | 43 +++++----- .../serde/operator/CometNativeScan.scala | 43 +++++----- .../spark/sql/comet/CometNativeScanExec.scala | 86 +++++++++++++++++-- .../apache/spark/sql/comet/operators.scala | 51 ++++++++++- .../ParquetReadFromFakeHadoopFsSuite.scala | 8 +- 6 files changed, 195 insertions(+), 73 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 2c3d00a23b..4565b3353a 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -965,20 +965,23 @@ impl PhysicalPlanner { )) } OpStruct::NativeScan(scan) => { - let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice()); + // Access fields from common (shared across all partitions) + let common = scan.common.as_ref().unwrap(); + let data_schema = convert_spark_types_to_arrow_schema(common.data_schema.as_slice()); let required_schema: SchemaRef = - convert_spark_types_to_arrow_schema(scan.required_schema.as_slice()); + convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); let partition_schema: SchemaRef = - convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice()); - let projection_vector: Vec = scan + convert_spark_types_to_arrow_schema(common.partition_schema.as_slice()); + let projection_vector: Vec = common .projection_vector .iter() .map(|offset| *offset as usize) .collect(); - // Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions) - let partition_files = &scan.file_partitions[self.partition as usize]; + // Access the single file_partition directly (injected by PlanDataInjector) + let partition_files = scan.file_partition.as_ref().unwrap(); + // Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions) if partition_files.partitioned_file.is_empty() { let empty_exec = Arc::new(EmptyExec::new(required_schema)); return Ok(( @@ -988,19 +991,19 @@ impl PhysicalPlanner { } // Convert the Spark expressions to Physical expressions - let data_filters: Result>, ExecutionError> = scan + let data_filters: Result>, ExecutionError> = common .data_filters .iter() .map(|expr| self.create_expr(expr, Arc::clone(&required_schema))) .collect(); - let default_values: Option> = if !scan + let default_values: Option> = if !common .default_values .is_empty() { // We have default values. Extract the two lists (same length) of values and // indexes in the schema, and then create a HashMap to use in the SchemaMapper. - let default_values: Result, DataFusionError> = scan + let default_values: Result, DataFusionError> = common .default_values .iter() .map(|expr| { @@ -1015,7 +1018,7 @@ impl PhysicalPlanner { }) .collect(); let default_values = default_values?; - let default_values_indexes: Vec = scan + let default_values_indexes: Vec = common .default_values_indexes .iter() .map(|offset| *offset as usize) @@ -1037,7 +1040,7 @@ impl PhysicalPlanner { .map(|f| f.file_path.clone()) .expect("partition should have files after empty check"); - let object_store_options: HashMap = scan + let object_store_options: HashMap = common .object_store_options .iter() .map(|(k, v)| (k.clone(), v.clone())) @@ -1048,10 +1051,8 @@ impl PhysicalPlanner { &object_store_options, )?; - // Comet serializes all partitions' PartitionedFiles, but we only want to read this - // Spark partition's PartitionedFiles - let files = - self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?; + // Get files for this partition (file_partition is singular, already partition-specific) + let files = self.get_partitioned_files(scan.file_partition.as_ref().unwrap())?; let file_groups: Vec> = vec![files]; let partition_fields: Vec = partition_schema .fields() @@ -1070,10 +1071,10 @@ impl PhysicalPlanner { Some(projection_vector), Some(data_filters?), default_values, - scan.session_timezone.as_str(), - scan.case_sensitive, + common.session_timezone.as_str(), + common.case_sensitive, self.session_ctx(), - scan.encryption_enabled, + common.encryption_enabled, )?; Ok(( vec![], diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 78f118e6db..8b625e4093 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -85,30 +85,27 @@ message Scan { bool arrow_ffi_safe = 3; } +// Common data shared by all partitions in NativeScan (sent once) +message NativeScanCommon { + repeated SparkStructField required_schema = 1; + repeated SparkStructField data_schema = 2; + repeated SparkStructField partition_schema = 3; + repeated spark.spark_expression.Expr data_filters = 4; + repeated int64 projection_vector = 5; + string session_timezone = 6; + repeated spark.spark_expression.Expr default_values = 7; + repeated int64 default_values_indexes = 8; + bool case_sensitive = 9; + map object_store_options = 10; + bool encryption_enabled = 11; + string source = 12; + repeated spark.spark_expression.DataType fields = 13; +} + message NativeScan { - repeated spark.spark_expression.DataType fields = 1; - // The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This - // is purely for informational purposes when viewing native query plans in - // debug mode. - string source = 2; - repeated SparkStructField required_schema = 3; - repeated SparkStructField data_schema = 4; - repeated SparkStructField partition_schema = 5; - repeated spark.spark_expression.Expr data_filters = 6; - repeated SparkFilePartition file_partitions = 7; - repeated int64 projection_vector = 8; - string session_timezone = 9; - repeated spark.spark_expression.Expr default_values = 10; - repeated int64 default_values_indexes = 11; - bool case_sensitive = 12; - // Options for configuring object stores such as AWS S3, GCS, etc. The key-value pairs are taken - // from Hadoop configuration for compatibility with Hadoop FileSystem implementations of object - // stores. - // The configuration values have hadoop. or spark.hadoop. prefix trimmed. For instance, the - // configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in - // the map. - map object_store_options = 13; - bool encryption_enabled = 14; + // Split mode fields only (no backward compatibility) + NativeScanCommon common = 1; + SparkFilePartition file_partition = 2; // Singular - just this partition's files } message CsvScan { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index b7909b67cb..f6711239c9 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -97,14 +97,17 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { builder: Operator.Builder, childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = { val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder() - nativeScanBuilder.setSource(scan.simpleStringWithNodeId()) + val commonBuilder = OperatorOuterClass.NativeScanCommon.newBuilder() + + // Set source in common (used as part of injection key) + commonBuilder.setSource(scan.simpleStringWithNodeId()) val scanTypes = scan.output.flatten { attr => serializeDataType(attr.dataType) } if (scanTypes.length == scan.output.length) { - nativeScanBuilder.addAllFields(scanTypes.asJava) + commonBuilder.addAllFields(scanTypes.asJava) // Sink operators don't have children builder.clearChildren() @@ -120,7 +123,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { logWarning(s"Unsupported data filter $filter") } } - nativeScanBuilder.addAllDataFilters(dataFilters.asJava) + commonBuilder.addAllDataFilters(dataFilters.asJava) } val possibleDefaultValues = getExistenceDefaultValues(scan.requiredSchema) @@ -136,20 +139,15 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { (Literal(expr), index.toLong.asInstanceOf[java.lang.Long]) } .unzip - nativeScanBuilder.addAllDefaultValues( + commonBuilder.addAllDefaultValues( defaultValues.flatMap(exprToProto(_, scan.output)).toIterable.asJava) - nativeScanBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava) + commonBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava) } + // Get first partition for object store options (we still need to peek at partitions) var firstPartition: Option[PartitionedFile] = None val filePartitions = scan.getFilePartitions() - val filePartitionsProto = filePartitions.map { partition => - if (firstPartition.isEmpty) { - firstPartition = partition.files.headOption - } - partition2Proto(partition, scan.relation.partitionSchema) - } - nativeScanBuilder.addAllFilePartitions(filePartitionsProto.asJava) + firstPartition = filePartitions.flatMap(_.files.headOption).headOption val partitionSchema = schema2Proto(scan.relation.partitionSchema.fields) val requiredSchema = schema2Proto(scan.requiredSchema.fields) @@ -166,31 +164,34 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { val projectionVector = (dataSchemaIndexes ++ partitionSchemaIndexes).map(idx => idx.toLong.asInstanceOf[java.lang.Long]) - nativeScanBuilder.addAllProjectionVector(projectionVector.toIterable.asJava) + commonBuilder.addAllProjectionVector(projectionVector.toIterable.asJava) // In `CometScanRule`, we ensure partitionSchema is supported. assert(partitionSchema.length == scan.relation.partitionSchema.fields.length) - nativeScanBuilder.addAllDataSchema(dataSchema.toIterable.asJava) - nativeScanBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava) - nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava) - nativeScanBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone")) - nativeScanBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE)) + commonBuilder.addAllDataSchema(dataSchema.toIterable.asJava) + commonBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava) + commonBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava) + commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone")) + commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE)) // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState .newHadoopConfWithOptions(scan.relation.options) - nativeScanBuilder.setEncryptionEnabled(CometParquetUtils.encryptionEnabled(hadoopConf)) + commonBuilder.setEncryptionEnabled(CometParquetUtils.encryptionEnabled(hadoopConf)) firstPartition.foreach { partitionFile => val objectStoreOptions = NativeConfig.extractObjectStoreOptions(hadoopConf, partitionFile.pathUri) objectStoreOptions.foreach { case (key, value) => - nativeScanBuilder.putObjectStoreOptions(key, value) + commonBuilder.putObjectStoreOptions(key, value) } } + // Set common data in NativeScan (file_partition will be populated at execution time) + nativeScanBuilder.setCommon(commonBuilder.build()) + Some(builder.setNativeScan(nativeScanBuilder).build()) } else { @@ -204,6 +205,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { } override def createExec(nativeOp: Operator, op: CometScanExec): CometNativeExec = { - CometNativeScanExec(nativeOp, op.wrapped, op.session) + CometNativeScanExec(nativeOp, op.wrapped, op.session, op) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index 841bc21aa2..af8c7e1f50 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -32,12 +32,14 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.collection._ import com.google.common.base.Objects import org.apache.comet.CometConf -import org.apache.comet.parquet.CometParquetFileFormat +import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetUtils} import org.apache.comet.serde.OperatorOuterClass.Operator /** @@ -55,7 +57,9 @@ case class CometNativeScanExec( tableIdentifier: Option[TableIdentifier], disableBucketedScan: Boolean = false, originalPlan: FileSourceScanExec, - override val serializedPlanOpt: SerializedPlan) + override val serializedPlanOpt: SerializedPlan, + @transient scan: CometScanExec, // Need to access getFilePartitions() + sourceKey: String) // Unique injection key extends CometLeafExec with DataSourceScanExec with ShimStreamSourceAwareSparkPlan { @@ -78,6 +82,61 @@ case class CometNativeScanExec( override lazy val outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering + @transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = { + // Extract common data from nativeOp + val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray + + // Get file partitions from CometScanExec (handles bucketing, etc.) + val filePartitions = scan.getFilePartitions() + + // Serialize each partition's files + import org.apache.comet.serde.operator.partition2Proto + val perPartitionBytes = filePartitions.map { filePartition => + val partitionProto = partition2Proto(filePartition, relation.partitionSchema) + val partitionNativeScan = org.apache.comet.serde.OperatorOuterClass.NativeScan + .newBuilder() + .setFilePartition(partitionProto) + .build() + + partitionNativeScan.toByteArray + }.toArray + + (commonBytes, perPartitionBytes) + } + + def commonData: Array[Byte] = serializedPartitionData._1 + def perPartitionData: Array[Array[Byte]] = serializedPartitionData._2 + + override def doExecuteColumnar(): RDD[ColumnarBatch] = { + val nativeMetrics = CometMetricNode.fromCometPlan(this) + val serializedPlan = CometExec.serializeNativePlan(nativeOp) + + // Handle encryption if needed + val hadoopConf = relation.sparkSession.sessionState + .newHadoopConfWithOptions(relation.options) + val encryptionEnabled = CometParquetUtils.encryptionEnabled(hadoopConf) + val (broadcastedHadoopConfForEncryption, encryptedFilePaths) = if (encryptionEnabled) { + val broadcastedConf = relation.sparkSession.sparkContext + .broadcast(new SerializableConfiguration(hadoopConf)) + (Some(broadcastedConf), relation.inputFiles.toSeq) + } else { + (None, Seq.empty) + } + + CometExecRDD( + sparkContext, + inputRDDs = Seq.empty, + commonByKey = Map(sourceKey -> commonData), + perPartitionByKey = Map(sourceKey -> perPartitionData), + serializedPlan = serializedPlan, + numPartitions = perPartitionData.length, + numOutputCols = output.length, + nativeMetrics = nativeMetrics, + subqueries = Seq.empty, + broadcastedHadoopConfForEncryption = broadcastedHadoopConfForEncryption, + encryptedFilePaths = encryptedFilePaths) + } + override def doCanonicalize(): CometNativeScanExec = { CometNativeScanExec( nativeOp, @@ -93,7 +152,10 @@ case class CometNativeScanExec( None, disableBucketedScan, originalPlan.doCanonicalize(), - SerializedPlan(None)) + SerializedPlan(None), + null, // scan not needed for canonicalization + "" + ) // sourceKey not needed for canonicalization } override def stringArgs: Iterator[Any] = Iterator(output) @@ -123,7 +185,8 @@ object CometNativeScanExec { def apply( nativeOp: Operator, scanExec: FileSourceScanExec, - session: SparkSession): CometNativeScanExec = { + session: SparkSession, + scan: CometScanExec): CometNativeScanExec = { // TreeNode.mapProductIterator is protected method. def mapProductIterator[B: ClassTag](product: Product, f: Any => B): Array[B] = { val arr = Array.ofDim[B](product.productArity) @@ -135,6 +198,17 @@ object CometNativeScanExec { arr } + // Compute unique sourceKey for this scan + val source = nativeOp.getNativeScan.getCommon.getSource + val nativeScan = nativeOp.getNativeScan.getCommon + val keyComponents = Seq( + nativeScan.getRequiredSchemaList.toString, + nativeScan.getDataFiltersList.toString, + nativeScan.getProjectionVectorList.toString, + nativeScan.getFieldsList.toString) + val hashCode = keyComponents.mkString("|").hashCode + val sourceKey = s"${source}_${hashCode}" + // Replacing the relation in FileSourceScanExec by `copy` seems causing some issues // on other Spark distributions if FileSourceScanExec constructor is changed. // Using `makeCopy` to avoid the issue. @@ -161,7 +235,9 @@ object CometNativeScanExec { wrapped.tableIdentifier, wrapped.disableBucketedScan, wrapped, - SerializedPlan(None)) + SerializedPlan(None), + scan, + sourceKey) scanExec.logicalLink.foreach(batchScanExec.setLogicalLink) batchScanExec } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index eba74c9e25..372371951a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -82,7 +82,8 @@ private[comet] object PlanDataInjector { // Registry of injectors for different operator types private val injectors: Seq[PlanDataInjector] = Seq( - IcebergPlanDataInjector + IcebergPlanDataInjector, + NativeScanPlanDataInjector // Future: DeltaPlanDataInjector, HudiPlanDataInjector, etc. ) @@ -112,7 +113,7 @@ private[comet] object PlanDataInjector { case _ => throw new CometRuntimeException(s"Missing planning data for key: $key") } - case None => // No key, skip injection + case None => } } @@ -191,6 +192,46 @@ private[comet] object IcebergPlanDataInjector extends PlanDataInjector { } } +/** + * Injector for NativeScan operators. + */ +private[comet] object NativeScanPlanDataInjector extends PlanDataInjector { + + override def canInject(op: Operator): Boolean = + op.hasNativeScan && + op.getNativeScan.hasCommon && + !op.getNativeScan.hasFilePartition + + override def getKey(op: Operator): Option[String] = { + // Reconstruct the same sourceKey that was used when storing the data + val common = op.getNativeScan.getCommon + val source = common.getSource + val keyComponents = Seq( + common.getRequiredSchemaList.toString, + common.getDataFiltersList.toString, + common.getProjectionVectorList.toString, + common.getFieldsList.toString) + val hashCode = keyComponents.mkString("|").hashCode + Some(s"${source}_${hashCode}") + } + + override def inject( + op: Operator, + commonBytes: Array[Byte], + partitionBytes: Array[Byte]): Operator = { + + val common = OperatorOuterClass.NativeScanCommon.parseFrom(commonBytes) + val partitionOnly = OperatorOuterClass.NativeScan.parseFrom(partitionBytes) + + // Merge common data + this partition's files + val scanBuilder = OperatorOuterClass.NativeScan.newBuilder() + scanBuilder.setCommon(common) + scanBuilder.setFilePartition(partitionOnly.getFilePartition) + + op.toBuilder.setNativeScan(scanBuilder).build() + } +} + /** * A Comet physical operator */ @@ -589,6 +630,12 @@ abstract class CometNativeExec extends CometExec { Map(iceberg.metadataLocation -> iceberg.commonData), Map(iceberg.metadataLocation -> iceberg.perPartitionData)) + // Found a NativeScan with planning data + case nativeScan: CometNativeScanExec => + ( + Map(nativeScan.sourceKey -> nativeScan.commonData), + Map(nativeScan.sourceKey -> nativeScan.perPartitionData)) + // Broadcast stages are boundaries - don't collect per-partition data from inside them. // After DPP filtering, broadcast scans may have different partition counts than the // probe side, causing ArrayIndexOutOfBoundsException in CometExecRDD.getPartitions. diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala index f4a8b5ed82..b8db737a3c 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala @@ -66,11 +66,11 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP p } assert(scans.size == 1) + // File partitions are now accessed from the scan field, not from the protobuf + val filePartitions = scans.head.scan.getFilePartitions() + assert(filePartitions.nonEmpty) assert( - scans.head.nativeOp.getNativeScan - .getFilePartitions(0) - .getPartitionedFile(0) - .getFilePath + filePartitions.head.files.head.filePath.toString .startsWith(FakeHDFSFileSystem.PREFIX)) } From a1575fa52fec7eca1ba3bd2201d097b2bcf4bffb Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 13 Feb 2026 10:20:48 -0500 Subject: [PATCH 2/3] Cleanup before PR submission. --- native/core/src/execution/planner.rs | 3 ++- .../serde/operator/CometNativeScan.scala | 2 +- .../spark/sql/comet/CometNativeScanExec.scala | 23 ++++++++++--------- .../apache/spark/sql/comet/operators.scala | 2 +- 4 files changed, 16 insertions(+), 14 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 4565b3353a..4fd992fd77 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -967,7 +967,8 @@ impl PhysicalPlanner { OpStruct::NativeScan(scan) => { // Access fields from common (shared across all partitions) let common = scan.common.as_ref().unwrap(); - let data_schema = convert_spark_types_to_arrow_schema(common.data_schema.as_slice()); + let data_schema = + convert_spark_types_to_arrow_schema(common.data_schema.as_slice()); let required_schema: SchemaRef = convert_spark_types_to_arrow_schema(common.required_schema.as_slice()); let partition_schema: SchemaRef = diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index f6711239c9..d5d075760f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -144,7 +144,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { commonBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava) } - // Get first partition for object store options (we still need to peek at partitions) + // Extract object store options from first file (S3 configs apply to all files in scan) var firstPartition: Option[PartitionedFile] = None val filePartitions = scan.getFilePartitions() firstPartition = filePartitions.flatMap(_.files.headOption).headOption diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index af8c7e1f50..fde8bacbc5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -58,8 +58,8 @@ case class CometNativeScanExec( disableBucketedScan: Boolean = false, originalPlan: FileSourceScanExec, override val serializedPlanOpt: SerializedPlan, - @transient scan: CometScanExec, // Need to access getFilePartitions() - sourceKey: String) // Unique injection key + @transient scan: CometScanExec, // Lazy access to file partitions without serializing with plan + sourceKey: String) // Key for PlanDataInjector to match common+partition data at runtime extends CometLeafExec with DataSourceScanExec with ShimStreamSourceAwareSparkPlan { @@ -111,7 +111,7 @@ case class CometNativeScanExec( val nativeMetrics = CometMetricNode.fromCometPlan(this) val serializedPlan = CometExec.serializeNativePlan(nativeOp) - // Handle encryption if needed + // Encryption config must be passed to each executor task val hadoopConf = relation.sparkSession.sessionState .newHadoopConfWithOptions(relation.options) val encryptionEnabled = CometParquetUtils.encryptionEnabled(hadoopConf) @@ -153,7 +153,7 @@ case class CometNativeScanExec( disableBucketedScan, originalPlan.doCanonicalize(), SerializedPlan(None), - null, // scan not needed for canonicalization + null, // Transient scan not needed for canonicalization "" ) // sourceKey not needed for canonicalization } @@ -198,14 +198,15 @@ object CometNativeScanExec { arr } - // Compute unique sourceKey for this scan - val source = nativeOp.getNativeScan.getCommon.getSource - val nativeScan = nativeOp.getNativeScan.getCommon + // Generate unique key for this scan so PlanDataInjector can match common+partition data. + // Multiple scans of same table with different projections/filters get different keys. + val common = nativeOp.getNativeScan.getCommon + val source = common.getSource val keyComponents = Seq( - nativeScan.getRequiredSchemaList.toString, - nativeScan.getDataFiltersList.toString, - nativeScan.getProjectionVectorList.toString, - nativeScan.getFieldsList.toString) + common.getRequiredSchemaList.toString, + common.getDataFiltersList.toString, + common.getProjectionVectorList.toString, + common.getFieldsList.toString) val hashCode = keyComponents.mkString("|").hashCode val sourceKey = s"${source}_${hashCode}" diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 372371951a..da2ae21a95 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -223,7 +223,7 @@ private[comet] object NativeScanPlanDataInjector extends PlanDataInjector { val common = OperatorOuterClass.NativeScanCommon.parseFrom(commonBytes) val partitionOnly = OperatorOuterClass.NativeScan.parseFrom(partitionBytes) - // Merge common data + this partition's files + // Build complete NativeScan with common fields + this partition's file list val scanBuilder = OperatorOuterClass.NativeScan.newBuilder() scanBuilder.setCommon(common) scanBuilder.setFilePartition(partitionOnly.getFilePartition) From 8ada2fd32e12823069d820ff8cd60f6a04923cb8 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Fri, 13 Feb 2026 10:26:53 -0500 Subject: [PATCH 3/3] Cleanup before PR submission. --- native/core/src/execution/planner.rs | 19 +++++++---- native/proto/src/proto/operator.proto | 8 +++-- .../spark/sql/comet/CometNativeScanExec.scala | 33 ++++++++++++++++++- 3 files changed, 50 insertions(+), 10 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 4fd992fd77..92a9cb23e2 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -965,8 +965,13 @@ impl PhysicalPlanner { )) } OpStruct::NativeScan(scan) => { - // Access fields from common (shared across all partitions) - let common = scan.common.as_ref().unwrap(); + // Extract common data and single partition's file list + // Per-partition injection happens in Scala before sending to native + let common = scan + .common + .as_ref() + .ok_or_else(|| GeneralError("NativeScan missing common data".into()))?; + let data_schema = convert_spark_types_to_arrow_schema(common.data_schema.as_slice()); let required_schema: SchemaRef = @@ -979,8 +984,10 @@ impl PhysicalPlanner { .map(|offset| *offset as usize) .collect(); - // Access the single file_partition directly (injected by PlanDataInjector) - let partition_files = scan.file_partition.as_ref().unwrap(); + let partition_files = scan + .file_partition + .as_ref() + .ok_or_else(|| GeneralError("NativeScan missing file_partition".into()))?; // Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions) if partition_files.partitioned_file.is_empty() { @@ -1052,8 +1059,8 @@ impl PhysicalPlanner { &object_store_options, )?; - // Get files for this partition (file_partition is singular, already partition-specific) - let files = self.get_partitioned_files(scan.file_partition.as_ref().unwrap())?; + // Get files for this partition + let files = self.get_partitioned_files(partition_files)?; let file_groups: Vec> = vec![files]; let partition_fields: Vec = partition_schema .fields() diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 8b625e4093..93872b462c 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -85,7 +85,7 @@ message Scan { bool arrow_ffi_safe = 3; } -// Common data shared by all partitions in NativeScan (sent once) +// Common data shared by all partitions in split mode (sent once at planning) message NativeScanCommon { repeated SparkStructField required_schema = 1; repeated SparkStructField data_schema = 2; @@ -103,9 +103,11 @@ message NativeScanCommon { } message NativeScan { - // Split mode fields only (no backward compatibility) + // Common data shared across partitions (schemas, filters, projections, config) NativeScanCommon common = 1; - SparkFilePartition file_partition = 2; // Singular - just this partition's files + + // Single partition's file list (injected at execution time) + SparkFilePartition file_partition = 2; } message CsvScan { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala index fde8bacbc5..3f2748c3ea 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala @@ -43,7 +43,17 @@ import org.apache.comet.parquet.{CometParquetFileFormat, CometParquetUtils} import org.apache.comet.serde.OperatorOuterClass.Operator /** - * Comet fully native scan node for DataSource V1 that delegates to DataFusion's DataSourceExec. + * Native scan operator for DataSource V1 Parquet files using DataFusion's ParquetExec. + * + * Replaces Spark's FileSourceScanExec to enable native execution. File planning runs in Spark to + * produce FilePartitions (handling bucketing, partition pruning, etc.), which are serialized to + * protobuf for DataFusion to execute using its ParquetExec. This provides better performance than + * reading through Spark's FileFormat abstraction. + * + * Uses split-mode serialization introduced in PR #3349: common scan metadata (schemas, filters, + * projections) is serialized once at planning time, while per-partition file lists are lazily + * serialized at execution time. This reduces memory when scanning tables with many partitions, as + * each executor task receives only its partition's file list rather than all files. */ case class CometNativeScanExec( override val nativeOp: Operator, @@ -82,6 +92,27 @@ case class CometNativeScanExec( override lazy val outputOrdering: Seq[SortOrder] = originalPlan.outputOrdering + /** + * Lazy partition serialization - deferred until execution time to reduce driver memory. + * + * Split-mode serialization pattern: + * {{{ + * Planning time: + * - CometNativeScan.convert() serializes common data (schemas, filters, projections) + * - commonData embedded in nativeOp protobuf + * - File partitions NOT serialized yet + * + * Execution time: + * - doExecuteColumnar() accesses commonData and perPartitionData + * - Forces serializedPartitionData evaluation (here) + * - Each partition's file list serialized separately + * - CometExecRDD receives per-partition data and injects at runtime + * }}} + * + * This pattern reduces memory usage for tables with many partitions - instead of serializing + * all files for all partitions in the driver, we serialize only common metadata (once) and each + * partition's files (lazily, as tasks are scheduled). + */ @transient private lazy val serializedPartitionData: (Array[Byte], Array[Array[Byte]]) = { // Extract common data from nativeOp val commonBytes = nativeOp.getNativeScan.getCommon.toByteArray