Skip to content

feat: add support of iceberg key encoder#308

Open
zuston wants to merge 1 commit intoapache:mainfrom
zuston:icebergkey
Open

feat: add support of iceberg key encoder#308
zuston wants to merge 1 commit intoapache:mainfrom
zuston:icebergkey

Conversation

@zuston
Copy link
Member

@zuston zuston commented Feb 13, 2026

Purpose

Linked issue: close #xxx

Brief change log

Tests

API and Format

Documentation

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for Iceberg key encoding by implementing the IcebergKeyEncoder struct that follows Fluss Java's encoding specifications. The implementation enables Fluss to encode row keys for the Iceberg data lake format.

Changes:

  • Added new IcebergKeyEncoder implementation with support for scalar types (Int, BigInt, Float, Double, Date, Time, Timestamp, Decimal, String, Char, Binary, Bytes)
  • Integrated IcebergKeyEncoder into the KeyEncoderFactory to handle Iceberg format
  • Added comprehensive test coverage for supported data types

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.

File Description
crates/fluss/src/row/encode/mod.rs Replaces the "not yet implemented" error for Iceberg format with the new IcebergKeyEncoder implementation
crates/fluss/src/row/encode/iceberg_key_encoder.rs Complete implementation of the Iceberg key encoder with type validation, encoding logic, and tests for supported types

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), bytes);
}
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for null key handling. The encoder has logic to reject null values at lines 117-121, but there is no test case that verifies this behavior. Consider adding a test that attempts to encode a row with a null key value and verifies that it returns an IllegalArgument error with the message "Cannot encode Iceberg key with null value".

Suggested change
}
}
#[test]
fn test_null_key_value_rejected() {
let row_type =
RowType::with_data_types_and_field_names(vec![DataTypes::int32()], vec!["id"]);
// Create a row with a null value for the key field.
let row = GenericRow::from_data(vec![Datum::Null]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["id".to_string()]).unwrap();
let result = encoder.encode_key(&row);
match result {
Err(IllegalArgument(msg)) => {
assert_eq!(msg, "Cannot encode Iceberg key with null value");
}
_ => panic!("Expected IllegalArgument error for null key value"),
}
}

Copilot uses AI. Check for mistakes.

let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), bytes);
}
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for Array and Map type validation. The validation function rejects Array and Map types at lines 96-105, but there are no test cases that verify these error messages are returned when attempting to create an encoder with these types.

Suggested change
}
}
#[test]
fn test_array_type_not_supported_for_key_encoding() {
// Construct a row type with a LIST/Array field, which should be rejected
let row_type = RowType::with_data_types_and_field_names(
vec![DataTypes::list(DataTypes::int32())],
vec!["arr"],
);
let result =
IcebergKeyEncoder::create_key_encoder(&row_type, &["arr".to_string()]);
let err = result.expect_err("expected Array/List type to be rejected");
match err {
crate::error::Error::IllegalArgument(msg) => {
// The message should clearly indicate that Array/List types are not supported
assert!(
msg.contains("Array")
|| msg.contains("LIST")
|| msg.contains("list"),
"unexpected error message for Array/List type: {}",
msg
);
}
other => panic!("expected IllegalArgument error, got {:?}", other),
}
}
#[test]
fn test_map_type_not_supported_for_key_encoding() {
// Construct a row type with a MAP field, which should be rejected
let row_type = RowType::with_data_types_and_field_names(
vec![DataTypes::map(DataTypes::string(), DataTypes::int32())],
vec!["map_field"],
);
let result =
IcebergKeyEncoder::create_key_encoder(&row_type, &["map_field".to_string()]);
let err = result.expect_err("expected Map type to be rejected");
match err {
crate::error::Error::IllegalArgument(msg) => {
// The message should clearly indicate that Map types are not supported
assert!(
msg.contains("Map") || msg.contains("MAP") || msg.contains("map"),
"unexpected error message for Map type: {}",
msg
);
}
other => panic!("expected IllegalArgument error, got {:?}", other),
}
}

Copilot uses AI. Check for mistakes.

let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), bytes);
}
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for Float and Double types. The encoder supports these types (lines 133-134), but there are no test cases that verify their encoding. Consider adding tests for both DataType::Float and DataType::Double to ensure they are correctly encoded as little-endian primitives.

Suggested change
}
}
#[test]
fn test_float_encoding() {
let row_type =
RowType::with_data_types_and_field_names(vec![DataTypes::float()], vec!["value"]);
let value: f32 = 3.1415927_f32;
let row = GenericRow::from_data(vec![Datum::Float(value)]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["value".to_string()]).unwrap();
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), value.to_le_bytes().as_slice());
}
#[test]
fn test_double_encoding() {
let row_type =
RowType::with_data_types_and_field_names(vec![DataTypes::double()], vec!["value"]);
let value: f64 = 3.141592653589793_f64;
let row = GenericRow::from_data(vec![Datum::Double(value)]);
let mut encoder =
IcebergKeyEncoder::create_key_encoder(&row_type, &["value".to_string()]).unwrap();
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), value.to_le_bytes().as_slice());
}

Copilot uses AI. Check for mistakes.

let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), bytes);
}
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test coverage for Char type. The encoder supports Char type (line 144) which is encoded the same way as String, but there is no dedicated test case for it. Consider adding a test to verify that Char fields are correctly encoded as UTF-8 bytes.

Suggested change
}
}
#[test]
fn test_char_encoding() {
let row_type = RowType::with_data_types_and_field_names(
vec![DataTypes::char_type(5)],
vec!["char_field"],
);
let value = "hello";
let row = GenericRow::from_data(vec![Datum::from(value)]);
let mut encoder = IcebergKeyEncoder::create_key_encoder(
&row_type,
&["char_field".to_string()],
)
.unwrap();
let encoded = encoder.encode_key(&row).unwrap();
assert_eq!(encoded.as_ref(), value.as_bytes());
}

Copilot uses AI. Check for mistakes.
Comment on lines +136 to +139
(DataType::Timestamp(_), Datum::TimestampNtz(ts)) => {
let micros =
ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000);
micros.to_le_bytes().to_vec()
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential integer overflow in timestamp conversion. At line 138, ts.get_millisecond() * 1000 could overflow for very large timestamp values. The maximum i64 value is 9,223,372,036,854,775,807, so multiplying a timestamp in milliseconds by 1000 could overflow for timestamps beyond year 292,277,026. Consider using checked_mul() to detect overflow, or documenting this limitation if it's acceptable based on the expected range of timestamps in Iceberg format.

Copilot uses AI. Check for mistakes.
Comment on lines +81 to +110
fn validate_supported_type(field_type: &DataType) -> Result<()> {
match field_type {
DataType::Int(_)
| DataType::BigInt(_)
| DataType::Float(_)
| DataType::Double(_)
| DataType::Date(_)
| DataType::Time(_)
| DataType::Timestamp(_)
| DataType::Decimal(_)
| DataType::String(_)
| DataType::Char(_)
| DataType::Binary(_)
| DataType::Bytes(_) => Ok(()),

DataType::Array(_) => Err(IllegalArgument {
message:
"Array types cannot be used as bucket keys. Bucket keys must be scalar types."
.to_string(),
}),
DataType::Map(_) => Err(IllegalArgument {
message:
"Map types cannot be used as bucket keys. Bucket keys must be scalar types."
.to_string(),
}),
other => Err(IllegalArgument {
message: format!("Unsupported type for Iceberg key encoder: {other}"),
}),
}
}
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type validation is incomplete. Several data types are supported by the FieldGetter but are missing from this validation:

  • Boolean (DataType::Boolean) - which would be returned as Datum::Bool
  • TinyInt (DataType::TinyInt) - which would be returned as Datum::Int8
  • SmallInt (DataType::SmallInt) - which would be returned as Datum::Int16
  • TimestampLtz (DataType::TimestampLTz) - which would be returned as Datum::TimestampLtz

Without explicit handling for these types, if a user attempts to use them as key fields, they would get a generic "Unsupported type" error at validation time. However, if this is intentional and matches the Java implementation behavior, then these types should be listed explicitly in validation with appropriate error messages explaining they are not supported for Iceberg keys.

Copilot uses AI. Check for mistakes.
Comment on lines +123 to +156
let bytes: Vec<u8> = match (&self.field_type, value) {
(DataType::Int(_), Datum::Int32(v)) => (v as i64).to_le_bytes().to_vec(),
(DataType::Date(_), Datum::Date(v)) => (v.get_inner() as i64).to_le_bytes().to_vec(),

(DataType::Time(_), Datum::Time(v)) => {
let micros = v.get_inner() as i64 * 1000;
micros.to_le_bytes().to_vec()
}

(DataType::BigInt(_), Datum::Int64(v)) => v.to_le_bytes().to_vec(),
(DataType::Float(_), Datum::Float32(v)) => v.0.to_le_bytes().to_vec(),
(DataType::Double(_), Datum::Float64(v)) => v.0.to_le_bytes().to_vec(),

(DataType::Timestamp(_), Datum::TimestampNtz(ts)) => {
let micros =
ts.get_millisecond() * 1000 + (ts.get_nano_of_millisecond() as i64 / 1000);
micros.to_le_bytes().to_vec()
}

(DataType::Decimal(_), Datum::Decimal(d)) => d.to_unscaled_bytes(),
(DataType::String(_), Datum::String(s)) => s.as_bytes().to_vec(),
(DataType::Char(_), Datum::String(s)) => s.as_bytes().to_vec(),
(DataType::Binary(_), Datum::Blob(b)) => b.as_ref().to_vec(),
(DataType::Bytes(_), Datum::Blob(b)) => b.as_ref().to_vec(),

// FieldGetter uses Datum::String for CHAR, Datum::Blob for BINARY/BYTES.
(expected_type, actual) => {
return Err(IllegalArgument {
message: format!(
"IcebergKeyEncoder type mismatch: expected {expected_type}, got {actual:?}"
),
});
}
};
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The encode_key function is missing match arms for several data types that could potentially be returned by FieldGetter:

  • Boolean (DataType::Boolean) - would return Datum::Bool
  • TinyInt (DataType::TinyInt) - would return Datum::Int8
  • SmallInt (DataType::SmallInt) - would return Datum::Int16
  • TimestampLtz (DataType::TimestampLTz) - would return Datum::TimestampLtz

If these types pass validation (e.g., if validation is updated), the match statement would fail at runtime with an "IcebergKeyEncoder type mismatch" error at line 150-154. Consider adding explicit handling for these types in the match statement, even if only to encode them (for numeric types) or to return a more specific error message explaining they are unsupported for Iceberg keys.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant