Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,20 +965,31 @@ impl PhysicalPlanner {
))
}
OpStruct::NativeScan(scan) => {
let data_schema = convert_spark_types_to_arrow_schema(scan.data_schema.as_slice());
// Extract common data and single partition's file list
// Per-partition injection happens in Scala before sending to native
let common = scan
.common
.as_ref()
.ok_or_else(|| GeneralError("NativeScan missing common data".into()))?;

let data_schema =
convert_spark_types_to_arrow_schema(common.data_schema.as_slice());
let required_schema: SchemaRef =
convert_spark_types_to_arrow_schema(scan.required_schema.as_slice());
convert_spark_types_to_arrow_schema(common.required_schema.as_slice());
let partition_schema: SchemaRef =
convert_spark_types_to_arrow_schema(scan.partition_schema.as_slice());
let projection_vector: Vec<usize> = scan
convert_spark_types_to_arrow_schema(common.partition_schema.as_slice());
let projection_vector: Vec<usize> = common
.projection_vector
.iter()
.map(|offset| *offset as usize)
.collect();

// Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions)
let partition_files = &scan.file_partitions[self.partition as usize];
let partition_files = scan
.file_partition
.as_ref()
.ok_or_else(|| GeneralError("NativeScan missing file_partition".into()))?;

// Check if this partition has any files (bucketed scan with bucket pruning may have empty partitions)
if partition_files.partitioned_file.is_empty() {
let empty_exec = Arc::new(EmptyExec::new(required_schema));
return Ok((
Expand All @@ -988,19 +999,19 @@ impl PhysicalPlanner {
}

// Convert the Spark expressions to Physical expressions
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = scan
let data_filters: Result<Vec<Arc<dyn PhysicalExpr>>, ExecutionError> = common
.data_filters
.iter()
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema)))
.collect();

let default_values: Option<HashMap<usize, ScalarValue>> = if !scan
let default_values: Option<HashMap<usize, ScalarValue>> = if !common
.default_values
.is_empty()
{
// We have default values. Extract the two lists (same length) of values and
// indexes in the schema, and then create a HashMap to use in the SchemaMapper.
let default_values: Result<Vec<ScalarValue>, DataFusionError> = scan
let default_values: Result<Vec<ScalarValue>, DataFusionError> = common
.default_values
.iter()
.map(|expr| {
Expand All @@ -1015,7 +1026,7 @@ impl PhysicalPlanner {
})
.collect();
let default_values = default_values?;
let default_values_indexes: Vec<usize> = scan
let default_values_indexes: Vec<usize> = common
.default_values_indexes
.iter()
.map(|offset| *offset as usize)
Expand All @@ -1037,7 +1048,7 @@ impl PhysicalPlanner {
.map(|f| f.file_path.clone())
.expect("partition should have files after empty check");

let object_store_options: HashMap<String, String> = scan
let object_store_options: HashMap<String, String> = common
.object_store_options
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
Expand All @@ -1048,10 +1059,8 @@ impl PhysicalPlanner {
&object_store_options,
)?;

// Comet serializes all partitions' PartitionedFiles, but we only want to read this
// Spark partition's PartitionedFiles
let files =
self.get_partitioned_files(&scan.file_partitions[self.partition as usize])?;
// Get files for this partition
let files = self.get_partitioned_files(partition_files)?;
let file_groups: Vec<Vec<PartitionedFile>> = vec![files];
let partition_fields: Vec<Field> = partition_schema
.fields()
Expand All @@ -1070,10 +1079,10 @@ impl PhysicalPlanner {
Some(projection_vector),
Some(data_filters?),
default_values,
scan.session_timezone.as_str(),
scan.case_sensitive,
common.session_timezone.as_str(),
common.case_sensitive,
self.session_ctx(),
scan.encryption_enabled,
common.encryption_enabled,
)?;
Ok((
vec![],
Expand Down
45 changes: 22 additions & 23 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,29 @@ message Scan {
bool arrow_ffi_safe = 3;
}

// Common data shared by all partitions in split mode (sent once at planning)
message NativeScanCommon {
repeated SparkStructField required_schema = 1;
repeated SparkStructField data_schema = 2;
repeated SparkStructField partition_schema = 3;
repeated spark.spark_expression.Expr data_filters = 4;
repeated int64 projection_vector = 5;
string session_timezone = 6;
repeated spark.spark_expression.Expr default_values = 7;
repeated int64 default_values_indexes = 8;
bool case_sensitive = 9;
map<string, string> object_store_options = 10;
bool encryption_enabled = 11;
string source = 12;
repeated spark.spark_expression.DataType fields = 13;
}

message NativeScan {
repeated spark.spark_expression.DataType fields = 1;
// The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This
// is purely for informational purposes when viewing native query plans in
// debug mode.
string source = 2;
repeated SparkStructField required_schema = 3;
repeated SparkStructField data_schema = 4;
repeated SparkStructField partition_schema = 5;
repeated spark.spark_expression.Expr data_filters = 6;
repeated SparkFilePartition file_partitions = 7;
repeated int64 projection_vector = 8;
string session_timezone = 9;
repeated spark.spark_expression.Expr default_values = 10;
repeated int64 default_values_indexes = 11;
bool case_sensitive = 12;
// Options for configuring object stores such as AWS S3, GCS, etc. The key-value pairs are taken
// from Hadoop configuration for compatibility with Hadoop FileSystem implementations of object
// stores.
// The configuration values have hadoop. or spark.hadoop. prefix trimmed. For instance, the
// configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in
// the map.
map<string, string> object_store_options = 13;
bool encryption_enabled = 14;
// Common data shared across partitions (schemas, filters, projections, config)
NativeScanCommon common = 1;

// Single partition's file list (injected at execution time)
SparkFilePartition file_partition = 2;
}

message CsvScan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,17 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
builder: Operator.Builder,
childOp: OperatorOuterClass.Operator*): Option[OperatorOuterClass.Operator] = {
val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
nativeScanBuilder.setSource(scan.simpleStringWithNodeId())
val commonBuilder = OperatorOuterClass.NativeScanCommon.newBuilder()

// Set source in common (used as part of injection key)
commonBuilder.setSource(scan.simpleStringWithNodeId())

val scanTypes = scan.output.flatten { attr =>
serializeDataType(attr.dataType)
}

if (scanTypes.length == scan.output.length) {
nativeScanBuilder.addAllFields(scanTypes.asJava)
commonBuilder.addAllFields(scanTypes.asJava)

// Sink operators don't have children
builder.clearChildren()
Expand All @@ -120,7 +123,7 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
logWarning(s"Unsupported data filter $filter")
}
}
nativeScanBuilder.addAllDataFilters(dataFilters.asJava)
commonBuilder.addAllDataFilters(dataFilters.asJava)
}

val possibleDefaultValues = getExistenceDefaultValues(scan.requiredSchema)
Expand All @@ -136,20 +139,15 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
(Literal(expr), index.toLong.asInstanceOf[java.lang.Long])
}
.unzip
nativeScanBuilder.addAllDefaultValues(
commonBuilder.addAllDefaultValues(
defaultValues.flatMap(exprToProto(_, scan.output)).toIterable.asJava)
nativeScanBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava)
commonBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava)
}

// Extract object store options from first file (S3 configs apply to all files in scan)
var firstPartition: Option[PartitionedFile] = None
val filePartitions = scan.getFilePartitions()
val filePartitionsProto = filePartitions.map { partition =>
if (firstPartition.isEmpty) {
firstPartition = partition.files.headOption
}
partition2Proto(partition, scan.relation.partitionSchema)
}
nativeScanBuilder.addAllFilePartitions(filePartitionsProto.asJava)
firstPartition = filePartitions.flatMap(_.files.headOption).headOption

val partitionSchema = schema2Proto(scan.relation.partitionSchema.fields)
val requiredSchema = schema2Proto(scan.requiredSchema.fields)
Expand All @@ -166,31 +164,34 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
val projectionVector = (dataSchemaIndexes ++ partitionSchemaIndexes).map(idx =>
idx.toLong.asInstanceOf[java.lang.Long])

nativeScanBuilder.addAllProjectionVector(projectionVector.toIterable.asJava)
commonBuilder.addAllProjectionVector(projectionVector.toIterable.asJava)

// In `CometScanRule`, we ensure partitionSchema is supported.
assert(partitionSchema.length == scan.relation.partitionSchema.fields.length)

nativeScanBuilder.addAllDataSchema(dataSchema.toIterable.asJava)
nativeScanBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
nativeScanBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone"))
nativeScanBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))
commonBuilder.addAllDataSchema(dataSchema.toIterable.asJava)
commonBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
commonBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
commonBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone"))
commonBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE))

// Collect S3/cloud storage configurations
val hadoopConf = scan.relation.sparkSession.sessionState
.newHadoopConfWithOptions(scan.relation.options)

nativeScanBuilder.setEncryptionEnabled(CometParquetUtils.encryptionEnabled(hadoopConf))
commonBuilder.setEncryptionEnabled(CometParquetUtils.encryptionEnabled(hadoopConf))

firstPartition.foreach { partitionFile =>
val objectStoreOptions =
NativeConfig.extractObjectStoreOptions(hadoopConf, partitionFile.pathUri)
objectStoreOptions.foreach { case (key, value) =>
nativeScanBuilder.putObjectStoreOptions(key, value)
commonBuilder.putObjectStoreOptions(key, value)
}
}

// Set common data in NativeScan (file_partition will be populated at execution time)
nativeScanBuilder.setCommon(commonBuilder.build())

Some(builder.setNativeScan(nativeScanBuilder).build())

} else {
Expand All @@ -204,6 +205,6 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging {
}

override def createExec(nativeOp: Operator, op: CometScanExec): CometNativeExec = {
CometNativeScanExec(nativeOp, op.wrapped, op.session)
CometNativeScanExec(nativeOp, op.wrapped, op.session, op)
}
}
Loading
Loading