From 9f00f379edbefb7ce6102ec74a032126b7976775 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 18 Mar 2026 17:17:16 +0100 Subject: [PATCH 1/8] Iceberg type 'time' support --- src/Processors/Formats/Impl/AvroRowInputFormat.cpp | 14 +++++++++++++- .../DataLakes/Iceberg/IcebergWrites.cpp | 4 ++++ .../DataLakes/Iceberg/SchemaProcessor.cpp | 2 +- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 1 + 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index f3b98e73e60f..91c4e73e31ca 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -156,6 +156,9 @@ static void insertNumber(IColumn & column, WhichDataType type, T value) case TypeIndex::DateTime64: assert_cast &>(column).insertValue(static_cast(value)); break; + case TypeIndex::Time64: + assert_cast &>(column).insertValue(static_cast(value)); + break; case TypeIndex::IPv4: assert_cast(column).insertValue(IPv4(static_cast(value))); break; @@ -304,6 +307,8 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro return createDecimalDeserializeFn(root_node, target_type, false); if (target.isDateTime64()) return createDecimalDeserializeFn(root_node, target_type, false); + if (target.isTime64()) + return createDecimalDeserializeFn(root_node, target_type, false); break; case avro::AVRO_INT: if (target_type->isValueRepresentedByNumber()) @@ -1283,8 +1288,11 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) { case avro::Type::AVRO_INT: { - if (node->logicalType().type() == avro::LogicalType::DATE) + auto logical_type = node->logicalType(); + if (logical_type.type() == avro::LogicalType::DATE) return {std::make_shared()}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; return {std::make_shared()}; } @@ -1295,6 +1303,10 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) return {std::make_shared(3)}; if (logical_type.type() == avro::LogicalType::TIMESTAMP_MICROS) return {std::make_shared(6)}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; + if (logical_type.type() == avro::LogicalType::TIME_MICROS) + return {std::make_shared(6)}; return std::make_shared(); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index c2d5a0c0c5bc..070a96d2b6c6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -129,6 +129,8 @@ bool canDumpIcebergStats(const Field & field, DataTypePtr type) case TypeIndex::Date32: case TypeIndex::Int64: case TypeIndex::DateTime64: + case TypeIndex::Time: + case TypeIndex::Time64: case TypeIndex::String: return true; default: @@ -157,6 +159,8 @@ std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) case TypeIndex::Int64: return dumpValue(field.safeGet()); case TypeIndex::DateTime64: + case TypeIndex::Time: + case TypeIndex::Time64: return dumpValue(field.safeGet().getValue().value); case TypeIndex::String: { diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp index 4efa9b46615a..dda2050c9024 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp @@ -243,7 +243,7 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name, Cont if (type_name == f_date) return std::make_shared(); if (type_name == f_time) - return std::make_shared(); + return std::make_shared(6); if (type_name == f_timestamp) return std::make_shared(6); if (type_name == f_timestamptz) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 050d4f70b7c1..f785e1ca40af 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -402,6 +402,7 @@ std::pair getIcebergType(DataTypePtr type, Int32 & ite case TypeIndex::DateTime64: return {"timestamp", true}; case TypeIndex::Time: + case TypeIndex::Time64: return {"time", true}; case TypeIndex::String: return {"string", true}; From bb896c3fca79f564c20850b526ed60f9024f3e46 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 19 Mar 2026 17:03:56 +0100 Subject: [PATCH 2/8] Fix tests --- .../integration/test_database_iceberg/test.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 701b3bc6e8d8..3fcfc22d43d5 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -25,6 +25,7 @@ NestedField, StringType, StructType, + TimeType, TimestampType, TimestamptzType ) @@ -1015,3 +1016,95 @@ def _test_cluster_joins(started_cluster): ) assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_partitioning_by_time(started_cluster, storage_type): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_time_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=TimeType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": dtime(12,0,0), "value": "test"}] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "12:00:00.000000\ttest\n" + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_partitioning_by_string(started_cluster, storage_type): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_string_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=StringType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + NestedField( + field_id=3, + name="time_value", + field_type=TimeType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": "a:b,c[d=e/f%g?h", "value": "test", "time_value": dtime(12,0,0)}] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "a:b,c[d=e/f%g?h\ttest\t12:00:00.000000\n" From 28ba40b84d145f9264afd3bbc2f4b5478cfb4306 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 19 Mar 2026 17:26:54 +0100 Subject: [PATCH 3/8] Fix --- .../ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 070a96d2b6c6..f92216bc9ca7 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -157,10 +157,10 @@ std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) case TypeIndex::Date32: return dumpValue(field.safeGet()); case TypeIndex::Int64: - return dumpValue(field.safeGet()); - case TypeIndex::DateTime64: case TypeIndex::Time: + return dumpValue(field.safeGet()); case TypeIndex::Time64: + case TypeIndex::DateTime64: return dumpValue(field.safeGet().getValue().value); case TypeIndex::String: { From f0e1038e8324ad6a6b915138a3e6c697431f972b Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 2 Apr 2026 13:28:07 +0200 Subject: [PATCH 4/8] Fix test --- tests/integration/test_database_iceberg/test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 3fcfc22d43d5..9e5451215510 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -5,7 +5,7 @@ import random import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, time as dtime import pyarrow as pa import pytest @@ -25,9 +25,9 @@ NestedField, StringType, StructType, - TimeType, TimestampType, - TimestamptzType + TimestamptzType, + TimeType, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER From c2b8ac07b5eb1d1d9c6962f7a1f16e72bc94dbaa Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Wed, 8 Apr 2026 11:17:00 +0200 Subject: [PATCH 5/8] Write time to Iceberg --- .../Formats/Impl/Parquet/PrepareForWrite.cpp | 43 +++++++++++++++++++ src/Processors/Formats/Impl/Parquet/Write.cpp | 24 ++++++++--- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 1 + .../integration/test_database_iceberg/test.py | 10 ++++- .../test_write_time.py | 27 ++++++++++++ 5 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 tests/integration/test_storage_iceberg_no_spark/test_write_time.py diff --git a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp index 4a0511deded7..95a8551df4b5 100644 --- a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp +++ b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp @@ -475,6 +475,49 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin case TypeIndex::Decimal128: decimal(16, getDecimalPrecision(*type), getDecimalScale(*type)); break; case TypeIndex::Decimal256: decimal(32, getDecimalPrecision(*type), getDecimalScale(*type)); break; + case TypeIndex::Time: types(T::INT32, C::UINT_32, int_type(32, false)); break; + + case TypeIndex::Time64: + { + std::optional converted; + parq::TimeUnit unit; + const auto & dt = assert_cast(*type); + UInt32 scale = dt.getScale(); + UInt32 converted_scale; + if (scale <= 3) + { + converted = parq::ConvertedType::TIME_MILLIS; + unit.__set_MILLIS({}); + converted_scale = 3; + } + else if (scale <= 6) + { + converted = parq::ConvertedType::TIME_MICROS; + unit.__set_MICROS({}); + converted_scale = 6; + } + else if (scale <= 9) + { + unit.__set_NANOS({}); + converted_scale = 9; + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected Time64 scale: {}", scale); + } + + parq::TimeType tt; + /// (Shouldn't we check the Time64's timezone parameter here? No, the actual number + /// in Time64 column is always in UTC, regardless of the timezone parameter.) + tt.__set_isAdjustedToUTC(true); + tt.__set_unit(unit); + parq::LogicalType t; + t.__set_TIME(tt); + types(T::INT64, converted, t); + state.datetime_multiplier = DataTypeTime64::getScaleMultiplier(converted_scale - scale); + break; + } + default: throw Exception(ErrorCodes::UNKNOWN_TYPE, "Internal type '{}' of column '{}' is not supported for conversion into Parquet data format.", type->getFamilyName(), name); } diff --git a/src/Processors/Formats/Impl/Parquet/Write.cpp b/src/Processors/Formats/Impl/Parquet/Write.cpp index 42af8e997f0a..0c8fc11a750d 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.cpp +++ b/src/Processors/Formats/Impl/Parquet/Write.cpp @@ -358,28 +358,32 @@ struct ConverterNumeric } }; -struct ConverterDateTime64WithMultiplier +template +struct ConverterTimeType64WithMultiplierImpl { using Statistics = StatisticsNumeric; - using Col = ColumnDecimal; + using Col = ColumnDecimal; const Col & column; Int64 multiplier; PODArray buf; - ConverterDateTime64WithMultiplier(const ColumnPtr & c, Int64 multiplier_) : column(assert_cast(*c)), multiplier(multiplier_) {} + ConverterTimeType64WithMultiplierImpl(const ColumnPtr & c, Int64 multiplier_) : column(assert_cast(*c)), multiplier(multiplier_) {} const Int64 * getBatch(size_t offset, size_t count) { buf.resize(count); for (size_t i = 0; i < count; ++i) - /// Not checking overflow because DateTime64 values should already be in the range where + /// Not checking overflow because Time64/DateTime64 values should already be in the range where /// they fit in Int64 at any allowed scale (i.e. up to nanoseconds). buf[i] = column.getData()[offset + i].value * multiplier; return buf.data(); } }; +using ConverterDateTime64WithMultiplier = ConverterTimeType64WithMultiplierImpl; +using ConverterTime64WithMultiplier = ConverterTimeType64WithMultiplierImpl; + /// Multiply DateTime by 1000 to get milliseconds (because Parquet doesn't support seconds). struct ConverterDateTime { @@ -1177,7 +1181,6 @@ void writeColumnChunkBody( writeColumnImpl(s, options, out, ConverterDateTime(s.primitive_column)); } break; - #undef N case TypeIndex::Float32: @@ -1246,6 +1249,17 @@ void writeColumnChunkBody( case TypeIndex::Decimal256: D(Decimal256); break; #undef D + case TypeIndex::Time64: + if (s.datetime_multiplier == 1) + writeColumnImpl( + s, options, out, ConverterNumeric, Int64, Int64>( + s.primitive_column)); + else + writeColumnImpl( + s, options, out, ConverterTime64WithMultiplier( + s.primitive_column, s.datetime_multiplier)); + break; + default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected column type: {}", s.primitive_column->getFamilyName()); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index f785e1ca40af..8563e3c67ce6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -483,6 +483,7 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type) case TypeIndex::Int64: case TypeIndex::DateTime: case TypeIndex::DateTime64: + case TypeIndex::Time64: return "long"; case TypeIndex::Float32: return "float"; diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 9e5451215510..6788494a968e 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -1052,13 +1052,19 @@ def test_partitioning_by_time(started_cluster, storage_type): ) table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) - data = [{"key": dtime(12,0,0), "value": "test"}] + data = [{"key": dtime(12,0,0), "value": "test1"}, + {"key": dtime(13,0,0), "value": "test2"}, + {"key": dtime(14,0,0), "value": "test3"}, + ] df = pa.Table.from_pylist(data) table.append(df) create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) - assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "12:00:00.000000\ttest\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n14:00:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` WHERE key = '13:00:00.000000' ORDER BY key") == "13:00:00.000000\ttest2\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` WHERE key >= '13:00:00.000000' ORDER BY key") == "13:00:00.000000\ttest2\n14:00:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` WHERE key <= '13:00:00.000000' ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n" @pytest.mark.parametrize("storage_type", ["s3"]) diff --git a/tests/integration/test_storage_iceberg_no_spark/test_write_time.py b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py new file mode 100644 index 000000000000..d45a1b1a41e2 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py @@ -0,0 +1,27 @@ +import pytest +from helpers.config_cluster import minio_secret_key, minio_access_key +from helpers.iceberg_utils import get_uuid_str + + +@pytest.mark.parametrize("time_type", ["Time", "Time64"]) +def test_write_time(started_cluster_iceberg_no_spark, time_type): + node = started_cluster_iceberg_no_spark.instances["node1"] + + if time_type == "Time64": + time_type = "Time64(6)" + + TABLE_NAME = "test_partitioning_by_time_" + get_uuid_str() + + node.query( + f"CREATE TABLE `{TABLE_NAME}` (key {time_type}, value String) ENGINE = IcebergLocal('/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}', 'Parquet') PARTITION BY key", + settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1} + ) + node.query( + f"INSERT INTO `{TABLE_NAME}` VALUES ('12:00:00', 'test1'), ('13:00:00', 'test2'), ('14:00:00', 'test3');", + settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1} + ) + + assert node.query(f"SELECT * FROM `{TABLE_NAME}` ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n14:00:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key = '13:00:00.000000' ORDER BY key") == "13:00:00.000000\ttest2\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key >= '13:00:00.000000' ORDER BY key") == "13:00:00.000000\ttest2\n14:00:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key <= '13:00:00.000000' ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n" From ced1375f8e50ebb66ac03593221f150cf9a305ce Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Fri, 10 Apr 2026 08:22:20 +0200 Subject: [PATCH 6/8] Proper write metadata to Avro --- .../DataLakes/Iceberg/Constant.h | 1 + .../DataLakes/Iceberg/IcebergWrites.cpp | 41 +++++++++++++++++-- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 22 ++++++++++ .../ObjectStorage/DataLakes/Iceberg/Utils.h | 1 + .../test_write_time.py | 17 +++++--- 5 files changed, 72 insertions(+), 10 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h index a09125cfbeb2..2c4b877324c6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h @@ -40,6 +40,7 @@ DEFINE_ICEBERG_FIELD(time); DEFINE_ICEBERG_FIELD(timestamp); DEFINE_ICEBERG_FIELD(timestamptz); DEFINE_ICEBERG_FIELD(type) +DEFINE_ICEBERG_FIELD(logicalType); /// this field has a camelCase name DEFINE_ICEBERG_FIELD(transform); DEFINE_ICEBERG_FIELD(direction); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index f92216bc9ca7..9f1406fe34ca 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -223,7 +223,16 @@ void extendSchemaForPartitions( Poco::JSON::Object::Ptr field = new Poco::JSON::Object; field->set(Iceberg::f_field_id, 1000 + i); field->set(Iceberg::f_name, partition_columns[i]); - field->set(Iceberg::f_type, getAvroType(partition_types[i])); + auto logical_type = getAvroLogicalType(partition_types[i]); + if (!logical_type.isEmpty()) + { + Poco::JSON::Object::Ptr type_field = new Poco::JSON::Object; + type_field->set(Iceberg::f_type, getAvroType(partition_types[i])); + type_field->set(Iceberg::f_logicalType, logical_type); + field->set(Iceberg::f_type, type_field); + } + else + field->set(Iceberg::f_type, getAvroType(partition_types[i])); partition_fields->add(field); } @@ -413,10 +422,34 @@ void generateManifestFile( break; case Field::Types::Decimal64: - partition_record.field(partition_columns[i]) = - avro::GenericDatum(partition_values[i].safeGet().getValue()); + { + auto type_id = partition_types[i]->getTypeId(); + if (type_id == TypeIndex::Time64 || type_id == TypeIndex::Time) + { /// Need to write logical type into Avro + auto scale = getDecimalScale(*partition_types[i]); + if (scale == 0) + partition_record.field(partition_columns[i]) = + avro::GenericDatum(partition_values[i].safeGet().getValue()); + else if (scale == 3 || scale == 6) + { + avro::NodePtr node = std::make_shared(avro::AVRO_LONG); + node->setLogicalType(avro::LogicalType(scale == 3 ? avro::LogicalType::TIME_MILLIS : avro::LogicalType::TIME_MICROS)); + int64_t value = partition_values[i].safeGet().getValue(); + auto datum = avro::GenericDatum(node, value); + partition_record.field(partition_columns[i]) = datum; + } + else + { + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Avro file supports only seconds, milliseconds and microsecods for time, partition precision: {}", scale); + } + } + else + partition_record.field(partition_columns[i]) = + avro::GenericDatum(partition_values[i].safeGet().getValue()); break; - + } case Field::Types::Null: break; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 8563e3c67ce6..815228b7295e 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -502,6 +503,27 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type) } } +Poco::Dynamic::Var getAvroLogicalType(DataTypePtr type) +{ + auto type_id = type->getTypeId(); + if (type_id == TypeIndex::Time || type_id == TypeIndex::Time64) + { + auto scale = getDecimalScale(*type); + switch (scale) + { /// Apache avro has millis and micros time precisions, https://avro.apache.org/docs/1.11.0/spec.html + case 0: + return Poco::Dynamic::Var(); + case 3: + return "time-millis"; + case 6: + return "time-micros"; + default: + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported time precision for avro {}({})", type->getName(), scale); + } + } + return Poco::Dynamic::Var(); +} + Poco::JSON::Object::Ptr getPartitionField( ASTPtr partition_by_element, const std::unordered_map & column_name_to_source_id, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 6aa8013a4214..5cea603be3c2 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -87,6 +87,7 @@ Poco::JSON::Object::Ptr getMetadataJSONObject( std::pair getIcebergType(DataTypePtr type, Int32 & iter); Poco::Dynamic::Var getAvroType(DataTypePtr type); +Poco::Dynamic::Var getAvroLogicalType(DataTypePtr type); /// Spec: https://iceberg.apache.org/spec/?h=metadata.json#table-metadata-fields std::pair createEmptyMetadataFile( diff --git a/tests/integration/test_storage_iceberg_no_spark/test_write_time.py b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py index d45a1b1a41e2..9a7b2164c8de 100644 --- a/tests/integration/test_storage_iceberg_no_spark/test_write_time.py +++ b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py @@ -13,15 +13,20 @@ def test_write_time(started_cluster_iceberg_no_spark, time_type): TABLE_NAME = "test_partitioning_by_time_" + get_uuid_str() node.query( - f"CREATE TABLE `{TABLE_NAME}` (key {time_type}, value String) ENGINE = IcebergLocal('/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}', 'Parquet') PARTITION BY key", + f"CREATE TABLE `{TABLE_NAME}` (key {time_type}, value {time_type}, comment String) ENGINE = IcebergLocal('/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}', 'Parquet') PARTITION BY key", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1} ) node.query( - f"INSERT INTO `{TABLE_NAME}` VALUES ('12:00:00', 'test1'), ('13:00:00', 'test2'), ('14:00:00', 'test3');", + f"INSERT INTO `{TABLE_NAME}` VALUES ('12:00:00', '12:10:00', 'test1'), ('13:00:00', '13:10:00', 'test2'), ('14:00:00', '14:10:00', 'test3');", settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1} ) - assert node.query(f"SELECT * FROM `{TABLE_NAME}` ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n14:00:00.000000\ttest3\n" - assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key = '13:00:00.000000' ORDER BY key") == "13:00:00.000000\ttest2\n" - assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key >= '13:00:00.000000' ORDER BY key") == "13:00:00.000000\ttest2\n14:00:00.000000\ttest3\n" - assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key <= '13:00:00.000000' ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` ORDER BY key") == "12:00:00.000000\t12:10:00.000000\ttest1\n13:00:00.000000\t13:10:00.000000\ttest2\n14:00:00.000000\t14:10:00.000000\ttest3\n" + # Partition pruning + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key = '13:00:00.000000' ORDER BY key") == "13:00:00.000000\t13:10:00.000000\ttest2\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key >= '13:00:00.000000' ORDER BY key") == "13:00:00.000000\t13:10:00.000000\ttest2\n14:00:00.000000\t14:10:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key <= '13:00:00.000000' ORDER BY key") == "12:00:00.000000\t12:10:00.000000\ttest1\n13:00:00.000000\t13:10:00.000000\ttest2\n" + # Min/Max pruning + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE value = '13:10:00.000000' ORDER BY key") == "13:00:00.000000\t13:10:00.000000\ttest2\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE value >= '13:10:00.000000' ORDER BY key") == "13:00:00.000000\t13:10:00.000000\ttest2\n14:00:00.000000\t14:10:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE value <= '13:10:00.000000' ORDER BY key") == "12:00:00.000000\t12:10:00.000000\ttest1\n13:00:00.000000\t13:10:00.000000\ttest2\n" From 40b252b85539ec91de91c9508afe20206634645d Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 5 May 2026 15:33:43 +0200 Subject: [PATCH 7/8] Small fixes --- .../DataLakes/Iceberg/IcebergWrites.cpp | 8 +++---- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 21 ++++++++++++++++--- .../integration/test_database_iceberg/test.py | 6 ++---- .../test_write_time.py | 5 +---- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 9f1406fe34ca..37fa66041209 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -155,9 +155,9 @@ std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) case TypeIndex::Int32: case TypeIndex::Date: case TypeIndex::Date32: + case TypeIndex::Time: return dumpValue(field.safeGet()); case TypeIndex::Int64: - case TypeIndex::Time: return dumpValue(field.safeGet()); case TypeIndex::Time64: case TypeIndex::DateTime64: @@ -423,8 +423,8 @@ void generateManifestFile( case Field::Types::Decimal64: { - auto type_id = partition_types[i]->getTypeId(); - if (type_id == TypeIndex::Time64 || type_id == TypeIndex::Time) + const WhichDataType which(*partition_types[i]); + if (which.isTime64()) { /// Need to write logical type into Avro auto scale = getDecimalScale(*partition_types[i]); if (scale == 0) @@ -442,7 +442,7 @@ void generateManifestFile( { throw Exception( ErrorCodes::BAD_ARGUMENTS, - "Avro file supports only seconds, milliseconds and microsecods for time, partition precision: {}", scale); + "Avro file supports only seconds, milliseconds and microseconds for time, partition precision: {}", scale); } } else diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 815228b7295e..e1b9409f6fd8 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -484,8 +484,17 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type) case TypeIndex::Int64: case TypeIndex::DateTime: case TypeIndex::DateTime64: - case TypeIndex::Time64: return "long"; + case TypeIndex::Time64: + { + auto scale = getDecimalScale(*type); + if (scale == 0 || scale == 3) + return "int"; + else if (scale == 6) + return "long"; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported type for iceberg {}", type->getName()); + } case TypeIndex::Float32: return "float"; case TypeIndex::Float64: @@ -505,8 +514,14 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type) Poco::Dynamic::Var getAvroLogicalType(DataTypePtr type) { - auto type_id = type->getTypeId(); - if (type_id == TypeIndex::Time || type_id == TypeIndex::Time64) + if (type->isNullable()) + { + auto type_nullable = std::static_pointer_cast(type); + return getAvroLogicalType(type_nullable->getNestedType()); + } + + const WhichDataType which(type); + if (which.isTime64()) { auto scale = getDecimalScale(*type); switch (scale) diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 6788494a968e..ec68d174d4ce 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -1018,8 +1018,7 @@ def _test_cluster_joins(started_cluster): assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" -@pytest.mark.parametrize("storage_type", ["s3"]) -def test_partitioning_by_time(started_cluster, storage_type): +def test_partitioning_by_time(started_cluster): node = started_cluster.instances["node1"] test_ref = f"test_partitioning_by_time_{uuid.uuid4()}" @@ -1067,8 +1066,7 @@ def test_partitioning_by_time(started_cluster, storage_type): assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` WHERE key <= '13:00:00.000000' ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n" -@pytest.mark.parametrize("storage_type", ["s3"]) -def test_partitioning_by_string(started_cluster, storage_type): +def test_partitioning_by_string(started_cluster): node = started_cluster.instances["node1"] test_ref = f"test_partitioning_by_string_{uuid.uuid4()}" diff --git a/tests/integration/test_storage_iceberg_no_spark/test_write_time.py b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py index 9a7b2164c8de..03e32a0600d1 100644 --- a/tests/integration/test_storage_iceberg_no_spark/test_write_time.py +++ b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py @@ -3,13 +3,10 @@ from helpers.iceberg_utils import get_uuid_str -@pytest.mark.parametrize("time_type", ["Time", "Time64"]) +@pytest.mark.parametrize("time_type", ["Time", "Time64(0)", "Time64(3)", "Time64(6)"]) def test_write_time(started_cluster_iceberg_no_spark, time_type): node = started_cluster_iceberg_no_spark.instances["node1"] - if time_type == "Time64": - time_type = "Time64(6)" - TABLE_NAME = "test_partitioning_by_time_" + get_uuid_str() node.query( From 0ef39ae4c29fb4073658b5bca4b9352d6d8f49dd Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Tue, 5 May 2026 19:35:14 +0200 Subject: [PATCH 8/8] Fixes, AI assist (Cursor + ChatGPT 5.5) --- .../Formats/Impl/Parquet/PrepareForWrite.cpp | 29 ++++--- src/Processors/Formats/Impl/Parquet/Write.cpp | 27 +++++- .../DataLakes/Iceberg/IcebergWrites.cpp | 82 ++++++++++++------- .../ObjectStorage/DataLakes/Iceberg/Utils.cpp | 27 +++--- .../test_write_time.py | 14 +++- 5 files changed, 122 insertions(+), 57 deletions(-) diff --git a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp index 95a8551df4b5..803c8a2fb55c 100644 --- a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp +++ b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include /// This file deals with schema conversion and with repetition and definition levels. @@ -475,7 +476,21 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin case TypeIndex::Decimal128: decimal(16, getDecimalPrecision(*type), getDecimalScale(*type)); break; case TypeIndex::Decimal256: decimal(32, getDecimalPrecision(*type), getDecimalScale(*type)); break; - case TypeIndex::Time: types(T::INT32, C::UINT_32, int_type(32, false)); break; + case TypeIndex::Time: + { + parq::TimeUnit unit; + unit.__set_MICROS({}); + + parq::TimeType tt; + tt.__set_isAdjustedToUTC(false); + tt.__set_unit(unit); + + parq::LogicalType t; + t.__set_TIME(tt); + types(T::INT64, parq::ConvertedType::TIME_MICROS, t); + state.datetime_multiplier = 1'000'000; + break; + } case TypeIndex::Time64: { @@ -484,13 +499,7 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin const auto & dt = assert_cast(*type); UInt32 scale = dt.getScale(); UInt32 converted_scale; - if (scale <= 3) - { - converted = parq::ConvertedType::TIME_MILLIS; - unit.__set_MILLIS({}); - converted_scale = 3; - } - else if (scale <= 6) + if (scale <= 6) { converted = parq::ConvertedType::TIME_MICROS; unit.__set_MICROS({}); @@ -507,9 +516,7 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin } parq::TimeType tt; - /// (Shouldn't we check the Time64's timezone parameter here? No, the actual number - /// in Time64 column is always in UTC, regardless of the timezone parameter.) - tt.__set_isAdjustedToUTC(true); + tt.__set_isAdjustedToUTC(false); tt.__set_unit(unit); parq::LogicalType t; t.__set_TIME(tt); diff --git a/src/Processors/Formats/Impl/Parquet/Write.cpp b/src/Processors/Formats/Impl/Parquet/Write.cpp index 0c8fc11a750d..93870e8af4a6 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.cpp +++ b/src/Processors/Formats/Impl/Parquet/Write.cpp @@ -404,6 +404,26 @@ struct ConverterDateTime } }; +struct ConverterTime +{ + using Statistics = StatisticsNumeric; + + using Col = ColumnVector; + const Col & column; + PODArray buf; + Int64 multiplier; + + ConverterTime(const ColumnPtr & c, Int64 multiplier_) : column(assert_cast(*c)), multiplier(multiplier_) {} + + const Int64 * getBatch(size_t offset, size_t count) + { + buf.resize(count); + for (size_t i = 0; i < count; ++i) + buf[i] = static_cast(column.getData()[offset + i]) * multiplier; + return buf.data(); + } +}; + struct ConverterString { using Statistics = StatisticsStringRef; @@ -1168,7 +1188,12 @@ void writeColumnChunkBody( N(Int16, Int32Type); break; } - case TypeIndex::Int32 : N(Int32, Int32Type); break; + case TypeIndex::Int32: + if (s.type->getTypeId() == TypeIndex::Time) + writeColumnImpl(s, options, out, ConverterTime(s.primitive_column, s.datetime_multiplier)); + else + N(Int32, Int32Type); + break; case TypeIndex::Int64 : N(Int64, Int64Type); break; case TypeIndex::UInt32: diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 37fa66041209..c84dbd68b240 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -146,6 +147,46 @@ std::vector dumpValue(T value) return bytes; } +DataTypePtr getTimeTypeOrNull(DataTypePtr type) +{ + if (type->isNullable()) + return getTimeTypeOrNull(assert_cast(type.get())->getNestedType()); + + const WhichDataType which(type); + if (which.isTime() || which.isTime64()) + return type; + + return nullptr; +} + +Int64 getTimeValueInMicroseconds(const Field & field, DataTypePtr type) +{ + if (type->isNullable()) + return getTimeValueInMicroseconds(field, assert_cast(type.get())->getNestedType()); + + const WhichDataType which(type); + if (which.isTime()) + { + if (field.getType() == Field::Types::Int64) + return field.safeGet() * 1'000'000; + if (field.getType() == Field::Types::UInt64) + return static_cast(field.safeGet()) * 1'000'000; + return static_cast(field.safeGet()) * 1'000'000; + } + + if (which.isTime64()) + { + const auto scale = getDecimalScale(*type); + if (scale > 6) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported type for iceberg {}", type->getName()); + + const auto value = field.safeGet().getValue().value; + return value * DataTypeTime64::getScaleMultiplier(6 - scale).value; + } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected Time or Time64, got {}", type->getName()); +} + std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) { switch (type->getTypeId()) @@ -155,11 +196,13 @@ std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) case TypeIndex::Int32: case TypeIndex::Date: case TypeIndex::Date32: - case TypeIndex::Time: return dumpValue(field.safeGet()); + case TypeIndex::Time: + return dumpValue(getTimeValueInMicroseconds(field, type)); case TypeIndex::Int64: return dumpValue(field.safeGet()); case TypeIndex::Time64: + return dumpValue(getTimeValueInMicroseconds(field, type)); case TypeIndex::DateTime64: return dumpValue(field.safeGet().getValue().value); case TypeIndex::String: @@ -398,6 +441,14 @@ void generateManifestFile( avro::GenericRecord & partition_record = data_file.field("partition").value(); for (size_t i = 0; i < partition_columns.size(); ++i) { + auto partition_time_type = getTimeTypeOrNull(partition_types[i]); + if (!partition_values[i].isNull() && partition_time_type) + { + partition_record.field(partition_columns[i]) = + avro::GenericDatum(getTimeValueInMicroseconds(partition_values[i], partition_types[i])); + continue; + } + switch (partition_values[i].getType()) { case Field::Types::Int64: @@ -422,34 +473,9 @@ void generateManifestFile( break; case Field::Types::Decimal64: - { - const WhichDataType which(*partition_types[i]); - if (which.isTime64()) - { /// Need to write logical type into Avro - auto scale = getDecimalScale(*partition_types[i]); - if (scale == 0) - partition_record.field(partition_columns[i]) = - avro::GenericDatum(partition_values[i].safeGet().getValue()); - else if (scale == 3 || scale == 6) - { - avro::NodePtr node = std::make_shared(avro::AVRO_LONG); - node->setLogicalType(avro::LogicalType(scale == 3 ? avro::LogicalType::TIME_MILLIS : avro::LogicalType::TIME_MICROS)); - int64_t value = partition_values[i].safeGet().getValue(); - auto datum = avro::GenericDatum(node, value); - partition_record.field(partition_columns[i]) = datum; - } - else - { - throw Exception( - ErrorCodes::BAD_ARGUMENTS, - "Avro file supports only seconds, milliseconds and microseconds for time, partition precision: {}", scale); - } - } - else - partition_record.field(partition_columns[i]) = - avro::GenericDatum(partition_values[i].safeGet().getValue()); + partition_record.field(partition_columns[i]) = + avro::GenericDatum(partition_values[i].safeGet().getValue()); break; - } case Field::Types::Null: break; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index e1b9409f6fd8..c8df8a4ef825 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -403,8 +403,11 @@ std::pair getIcebergType(DataTypePtr type, Int32 & ite case TypeIndex::DateTime64: return {"timestamp", true}; case TypeIndex::Time: - case TypeIndex::Time64: return {"time", true}; + case TypeIndex::Time64: + if (getDecimalScale(*type) <= 6) + return {"time", true}; + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported type for iceberg {}", type->getName()); case TypeIndex::String: return {"string", true}; case TypeIndex::UUID: @@ -478,19 +481,17 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type) case TypeIndex::Int32: case TypeIndex::Date: case TypeIndex::Date32: - case TypeIndex::Time: return "int"; case TypeIndex::UInt64: case TypeIndex::Int64: case TypeIndex::DateTime: case TypeIndex::DateTime64: + case TypeIndex::Time: return "long"; case TypeIndex::Time64: { auto scale = getDecimalScale(*type); - if (scale == 0 || scale == 3) - return "int"; - else if (scale == 6) + if (scale <= 6) return "long"; else throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported type for iceberg {}", type->getName()); @@ -521,20 +522,14 @@ Poco::Dynamic::Var getAvroLogicalType(DataTypePtr type) } const WhichDataType which(type); + if (which.isTime()) + return "time-micros"; if (which.isTime64()) { auto scale = getDecimalScale(*type); - switch (scale) - { /// Apache avro has millis and micros time precisions, https://avro.apache.org/docs/1.11.0/spec.html - case 0: - return Poco::Dynamic::Var(); - case 3: - return "time-millis"; - case 6: - return "time-micros"; - default: - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported time precision for avro {}({})", type->getName(), scale); - } + if (scale <= 6) + return "time-micros"; + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported time precision for avro {}({})", type->getName(), scale); } return Poco::Dynamic::Var(); } diff --git a/tests/integration/test_storage_iceberg_no_spark/test_write_time.py b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py index 03e32a0600d1..c4562fede71e 100644 --- a/tests/integration/test_storage_iceberg_no_spark/test_write_time.py +++ b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py @@ -3,7 +3,19 @@ from helpers.iceberg_utils import get_uuid_str -@pytest.mark.parametrize("time_type", ["Time", "Time64(0)", "Time64(3)", "Time64(6)"]) +@pytest.mark.parametrize( + "time_type", + [ + "Time", + "Time64(0)", + "Time64(3)", + "Time64(6)", + "Nullable(Time)", + "Nullable(Time64(0))", + "Nullable(Time64(3))", + "Nullable(Time64(6))", + ], +) def test_write_time(started_cluster_iceberg_no_spark, time_type): node = started_cluster_iceberg_no_spark.instances["node1"]