Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for Iceberg key encoding by implementing the IcebergKeyEncoder struct that follows Fluss Java's encoding specifications. The implementation enables Fluss to encode row keys for the Iceberg data lake format.
Changes:
- Added new
IcebergKeyEncoderimplementation with support for scalar types (Int, BigInt, Float, Double, Date, Time, Timestamp, Decimal, String, Char, Binary, Bytes) - Integrated
IcebergKeyEncoderinto theKeyEncoderFactoryto handle Iceberg format - Added comprehensive test coverage for supported data types
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| crates/fluss/src/row/encode/mod.rs | Replaces the "not yet implemented" error for Iceberg format with the new IcebergKeyEncoder implementation |
| crates/fluss/src/row/encode/iceberg_key_encoder.rs | Complete implementation of the Iceberg key encoder with type validation, encoding logic, and tests for supported types |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| let encoded = encoder.encode_key(&row).unwrap(); | ||
| assert_eq!(encoded.as_ref(), bytes); | ||
| } |
There was a problem hiding this comment.
Missing test coverage for null key handling. The encoder has logic to reject null values at lines 117-121, but there is no test case that verifies this behavior. Consider adding a test that attempts to encode a row with a null key value and verifies that it returns an IllegalArgument error with the message "Cannot encode Iceberg key with null value".
| } | |
| } | |
| #[test] | |
| fn test_null_key_value_rejected() { | |
| let row_type = | |
| RowType::with_data_types_and_field_names(vec![DataTypes::int32()], vec!["id"]); | |
| // Create a row with a null value for the key field. | |
| let row = GenericRow::from_data(vec![Datum::Null]); | |
| let mut encoder = | |
| IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap(); | |
| let result = encoder.encode_key(&row); | |
| match result { | |
| Err(IllegalArgument(msg)) => { | |
| assert_eq!(msg, "Cannot encode Iceberg key with null value"); | |
| } | |
| _ => panic!("Expected IllegalArgument error for null key value"), | |
| } | |
| } |
|
|
||
| let encoded = encoder.encode_key(&row).unwrap(); | ||
| assert_eq!(encoded.as_ref(), bytes); | ||
| } |
There was a problem hiding this comment.
Missing test coverage for Array and Map type validation. The validation function rejects Array and Map types at lines 96-105, but there are no test cases that verify these error messages are returned when attempting to create an encoder with these types.
| } | |
| } | |
| #[test] | |
| fn test_array_type_not_supported_for_key_encoding() { | |
| // Construct a row type with a LIST/Array field, which should be rejected | |
| let row_type = RowType::with_data_types_and_field_names( | |
| vec![DataTypes::list(DataTypes::int32())], | |
| vec!["arr"], | |
| ); | |
| let result = | |
| IcebergKeyEncoder::create_key_encoder(&row_type, &["arr".to_string()]); | |
| let err = result.expect_err("expected Array/List type to be rejected"); | |
| match err { | |
| crate::error::Error::IllegalArgument(msg) => { | |
| // The message should clearly indicate that Array/List types are not supported | |
| assert!( | |
| msg.contains("Array") | |
| || msg.contains("LIST") | |
| || msg.contains("list"), | |
| "unexpected error message for Array/List type: {}", | |
| msg | |
| ); | |
| } | |
| other => panic!("expected IllegalArgument error, got {:?}", other), | |
| } | |
| } | |
| #[test] | |
| fn test_map_type_not_supported_for_key_encoding() { | |
| // Construct a row type with a MAP field, which should be rejected | |
| let row_type = RowType::with_data_types_and_field_names( | |
| vec![DataTypes::map(DataTypes::string(), DataTypes::int32())], | |
| vec!["map_field"], | |
| ); | |
| let result = | |
| IcebergKeyEncoder::create_key_encoder(&row_type, &["map_field".to_string()]); | |
| let err = result.expect_err("expected Map type to be rejected"); | |
| match err { | |
| crate::error::Error::IllegalArgument(msg) => { | |
| // The message should clearly indicate that Map types are not supported | |
| assert!( | |
| msg.contains("Map") || msg.contains("MAP") || msg.contains("map"), | |
| "unexpected error message for Map type: {}", | |
| msg | |
| ); | |
| } | |
| other => panic!("expected IllegalArgument error, got {:?}", other), | |
| } | |
| } |
|
|
||
| let encoded = encoder.encode_key(&row).unwrap(); | ||
| assert_eq!(encoded.as_ref(), bytes); | ||
| } |
There was a problem hiding this comment.
Missing test coverage for Float and Double types. The encoder supports these types (lines 133-134), but there are no test cases that verify their encoding. Consider adding tests for both DataType::Float and DataType::Double to ensure they are correctly encoded as little-endian primitives.
| } | |
| } | |
| #[test] | |
| fn test_float_encoding() { | |
| let row_type = | |
| RowType::with_data_types_and_field_names(vec![DataTypes::float()], vec!["value"]); | |
| let value: f32 = 3.1415927_f32; | |
| let row = GenericRow::from_data(vec![Datum::Float(value)]); | |
| let mut encoder = | |
| IcebergKeyEncoder::create_key_encoder(&row_type, &["value".to_string()]).unwrap(); | |
| let encoded = encoder.encode_key(&row).unwrap(); | |
| assert_eq!(encoded.as_ref(), value.to_le_bytes().as_slice()); | |
| } | |
| #[test] | |
| fn test_double_encoding() { | |
| let row_type = | |
| RowType::with_data_types_and_field_names(vec![DataTypes::double()], vec!["value"]); | |
| let value: f64 = 3.141592653589793_f64; | |
| let row = GenericRow::from_data(vec![Datum::Double(value)]); | |
| let mut encoder = | |
| IcebergKeyEncoder::create_key_encoder(&row_type, &["value".to_string()]).unwrap(); | |
| let encoded = encoder.encode_key(&row).unwrap(); | |
| assert_eq!(encoded.as_ref(), value.to_le_bytes().as_slice()); | |
| } |
|
|
||
| let encoded = encoder.encode_key(&row).unwrap(); | ||
| assert_eq!(encoded.as_ref(), bytes); | ||
| } |
There was a problem hiding this comment.
Missing test coverage for Char type. The encoder supports Char type (line 144) which is encoded the same way as String, but there is no dedicated test case for it. Consider adding a test to verify that Char fields are correctly encoded as UTF-8 bytes.
| } | |
| } | |
| #[test] | |
| fn test_char_encoding() { | |
| let row_type = RowType::with_data_types_and_field_names( | |
| vec![DataTypes::char_type(5)], | |
| vec!["char_field"], | |
| ); | |
| let value = "hello"; | |
| let row = GenericRow::from_data(vec![Datum::from(value)]); | |
| let mut encoder = IcebergKeyEncoder::create_key_encoder( | |
| &row_type, | |
| &["char_field".to_string()], | |
| ) | |
| .unwrap(); | |
| let encoded = encoder.encode_key(&row).unwrap(); | |
| assert_eq!(encoded.as_ref(), value.as_bytes()); | |
| } |
| (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { | ||
| let micros = | ||
| ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); | ||
| micros.to_le_bytes().to_vec() |
There was a problem hiding this comment.
Potential integer overflow in timestamp conversion. At line 138, ts.get_millisecond() * 1000 could overflow for very large timestamp values. The maximum i64 value is 9,223,372,036,854,775,807, so multiplying a timestamp in milliseconds by 1000 could overflow for timestamps beyond year 292,277,026. Consider using checked_mul() to detect overflow, or documenting this limitation if it's acceptable based on the expected range of timestamps in Iceberg format.
| fn validate_supported_type(field_type: &DataType) -> Result<()> { | ||
| match field_type { | ||
| DataType::Int(_) | ||
| | DataType::BigInt(_) | ||
| | DataType::Float(_) | ||
| | DataType::Double(_) | ||
| | DataType::Date(_) | ||
| | DataType::Time(_) | ||
| | DataType::Timestamp(_) | ||
| | DataType::Decimal(_) | ||
| | DataType::String(_) | ||
| | DataType::Char(_) | ||
| | DataType::Binary(_) | ||
| | DataType::Bytes(_) => Ok(()), | ||
|
|
||
| DataType::Array(_) => Err(IllegalArgument { | ||
| message: | ||
| "Array types cannot be used as bucket keys. Bucket keys must be scalar types." | ||
| .to_string(), | ||
| }), | ||
| DataType::Map(_) => Err(IllegalArgument { | ||
| message: | ||
| "Map types cannot be used as bucket keys. Bucket keys must be scalar types." | ||
| .to_string(), | ||
| }), | ||
| other => Err(IllegalArgument { | ||
| message: format!("Unsupported type for Iceberg key encoder: {other}"), | ||
| }), | ||
| } | ||
| } |
There was a problem hiding this comment.
The type validation is incomplete. Several data types are supported by the FieldGetter but are missing from this validation:
- Boolean (DataType::Boolean) - which would be returned as Datum::Bool
- TinyInt (DataType::TinyInt) - which would be returned as Datum::Int8
- SmallInt (DataType::SmallInt) - which would be returned as Datum::Int16
- TimestampLtz (DataType::TimestampLTz) - which would be returned as Datum::TimestampLtz
Without explicit handling for these types, if a user attempts to use them as key fields, they would get a generic "Unsupported type" error at validation time. However, if this is intentional and matches the Java implementation behavior, then these types should be listed explicitly in validation with appropriate error messages explaining they are not supported for Iceberg keys.
| let bytes: Vec<u8> = match (&self.field_type, value) { | ||
| (DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(), | ||
| (DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(), | ||
|
|
||
| (DataType::Time(_), Datum::Time(v)) => { | ||
| let micros = v.get_inner() as i64 * 1000; | ||
| micros.to_le_bytes().to_vec() | ||
| } | ||
|
|
||
| (DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(), | ||
| (DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(), | ||
| (DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(), | ||
|
|
||
| (DataType::Timestamp(_), Datum::TimestampNtz(ts)) => { | ||
| let micros = | ||
| ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000); | ||
| micros.to_le_bytes().to_vec() | ||
| } | ||
|
|
||
| (DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(), | ||
| (DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(), | ||
| (DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(), | ||
| (DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(), | ||
| (DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(), | ||
|
|
||
| // FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES. | ||
| (expected_type, actual) => { | ||
| return Err(IllegalArgument { | ||
| message: format!( | ||
| "IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}" | ||
| ), | ||
| }); | ||
| } | ||
| }; |
There was a problem hiding this comment.
The encode_key function is missing match arms for several data types that could potentially be returned by FieldGetter:
- Boolean (DataType::Boolean) - would return Datum::Bool
- TinyInt (DataType::TinyInt) - would return Datum::Int8
- SmallInt (DataType::SmallInt) - would return Datum::Int16
- TimestampLtz (DataType::TimestampLTz) - would return Datum::TimestampLtz
If these types pass validation (e.g., if validation is updated), the match statement would fail at runtime with an "IcebergKeyEncoder type mismatch" error at line 150-154. Consider adding explicit handling for these types in the match statement, even if only to encode them (for numeric types) or to return a more specific error message explaining they are unsupported for Iceberg keys.
Purpose
Linked issue: close #xxx
Brief change log
Tests
API and Format
Documentation