diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index f3b98e73e60f..91c4e73e31ca 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -156,6 +156,9 @@ static void insertNumber(IColumn & column, WhichDataType type, T value) case TypeIndex::DateTime64: assert_cast &>(column).insertValue(static_cast(value)); break; + case TypeIndex::Time64: + assert_cast &>(column).insertValue(static_cast(value)); + break; case TypeIndex::IPv4: assert_cast(column).insertValue(IPv4(static_cast(value))); break; @@ -304,6 +307,8 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(const avro return createDecimalDeserializeFn(root_node, target_type, false); if (target.isDateTime64()) return createDecimalDeserializeFn(root_node, target_type, false); + if (target.isTime64()) + return createDecimalDeserializeFn(root_node, target_type, false); break; case avro::AVRO_INT: if (target_type->isValueRepresentedByNumber()) @@ -1283,8 +1288,11 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) { case avro::Type::AVRO_INT: { - if (node->logicalType().type() == avro::LogicalType::DATE) + auto logical_type = node->logicalType(); + if (logical_type.type() == avro::LogicalType::DATE) return {std::make_shared()}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; return {std::make_shared()}; } @@ -1295,6 +1303,10 @@ DataTypePtr AvroSchemaReader::avroNodeToDataType(avro::NodePtr node) return {std::make_shared(3)}; if (logical_type.type() == avro::LogicalType::TIMESTAMP_MICROS) return {std::make_shared(6)}; + if (logical_type.type() == avro::LogicalType::TIME_MILLIS) + return {std::make_shared(3)}; + if (logical_type.type() == avro::LogicalType::TIME_MICROS) + return {std::make_shared(6)}; return std::make_shared(); } diff --git a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp index 4a0511deded7..803c8a2fb55c 100644 --- a/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp +++ b/src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include /// This file deals with schema conversion and with repetition and definition levels. @@ -475,6 +476,55 @@ void preparePrimitiveColumn(ColumnPtr column, DataTypePtr type, const std::strin case TypeIndex::Decimal128: decimal(16, getDecimalPrecision(*type), getDecimalScale(*type)); break; case TypeIndex::Decimal256: decimal(32, getDecimalPrecision(*type), getDecimalScale(*type)); break; + case TypeIndex::Time: + { + parq::TimeUnit unit; + unit.__set_MICROS({}); + + parq::TimeType tt; + tt.__set_isAdjustedToUTC(false); + tt.__set_unit(unit); + + parq::LogicalType t; + t.__set_TIME(tt); + types(T::INT64, parq::ConvertedType::TIME_MICROS, t); + state.datetime_multiplier = 1'000'000; + break; + } + + case TypeIndex::Time64: + { + std::optional converted; + parq::TimeUnit unit; + const auto & dt = assert_cast(*type); + UInt32 scale = dt.getScale(); + UInt32 converted_scale; + if (scale <= 6) + { + converted = parq::ConvertedType::TIME_MICROS; + unit.__set_MICROS({}); + converted_scale = 6; + } + else if (scale <= 9) + { + unit.__set_NANOS({}); + converted_scale = 9; + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected Time64 scale: {}", scale); + } + + parq::TimeType tt; + tt.__set_isAdjustedToUTC(false); + tt.__set_unit(unit); + parq::LogicalType t; + t.__set_TIME(tt); + types(T::INT64, converted, t); + state.datetime_multiplier = DataTypeTime64::getScaleMultiplier(converted_scale - scale); + break; + } + default: throw Exception(ErrorCodes::UNKNOWN_TYPE, "Internal type '{}' of column '{}' is not supported for conversion into Parquet data format.", type->getFamilyName(), name); } diff --git a/src/Processors/Formats/Impl/Parquet/Write.cpp b/src/Processors/Formats/Impl/Parquet/Write.cpp index 42af8e997f0a..93870e8af4a6 100644 --- a/src/Processors/Formats/Impl/Parquet/Write.cpp +++ b/src/Processors/Formats/Impl/Parquet/Write.cpp @@ -358,28 +358,32 @@ struct ConverterNumeric } }; -struct ConverterDateTime64WithMultiplier +template +struct ConverterTimeType64WithMultiplierImpl { using Statistics = StatisticsNumeric; - using Col = ColumnDecimal; + using Col = ColumnDecimal; const Col & column; Int64 multiplier; PODArray buf; - ConverterDateTime64WithMultiplier(const ColumnPtr & c, Int64 multiplier_) : column(assert_cast(*c)), multiplier(multiplier_) {} + ConverterTimeType64WithMultiplierImpl(const ColumnPtr & c, Int64 multiplier_) : column(assert_cast(*c)), multiplier(multiplier_) {} const Int64 * getBatch(size_t offset, size_t count) { buf.resize(count); for (size_t i = 0; i < count; ++i) - /// Not checking overflow because DateTime64 values should already be in the range where + /// Not checking overflow because Time64/DateTime64 values should already be in the range where /// they fit in Int64 at any allowed scale (i.e. up to nanoseconds). buf[i] = column.getData()[offset + i].value * multiplier; return buf.data(); } }; +using ConverterDateTime64WithMultiplier = ConverterTimeType64WithMultiplierImpl; +using ConverterTime64WithMultiplier = ConverterTimeType64WithMultiplierImpl; + /// Multiply DateTime by 1000 to get milliseconds (because Parquet doesn't support seconds). struct ConverterDateTime { @@ -400,6 +404,26 @@ struct ConverterDateTime } }; +struct ConverterTime +{ + using Statistics = StatisticsNumeric; + + using Col = ColumnVector; + const Col & column; + PODArray buf; + Int64 multiplier; + + ConverterTime(const ColumnPtr & c, Int64 multiplier_) : column(assert_cast(*c)), multiplier(multiplier_) {} + + const Int64 * getBatch(size_t offset, size_t count) + { + buf.resize(count); + for (size_t i = 0; i < count; ++i) + buf[i] = static_cast(column.getData()[offset + i]) * multiplier; + return buf.data(); + } +}; + struct ConverterString { using Statistics = StatisticsStringRef; @@ -1164,7 +1188,12 @@ void writeColumnChunkBody( N(Int16, Int32Type); break; } - case TypeIndex::Int32 : N(Int32, Int32Type); break; + case TypeIndex::Int32: + if (s.type->getTypeId() == TypeIndex::Time) + writeColumnImpl(s, options, out, ConverterTime(s.primitive_column, s.datetime_multiplier)); + else + N(Int32, Int32Type); + break; case TypeIndex::Int64 : N(Int64, Int64Type); break; case TypeIndex::UInt32: @@ -1177,7 +1206,6 @@ void writeColumnChunkBody( writeColumnImpl(s, options, out, ConverterDateTime(s.primitive_column)); } break; - #undef N case TypeIndex::Float32: @@ -1246,6 +1274,17 @@ void writeColumnChunkBody( case TypeIndex::Decimal256: D(Decimal256); break; #undef D + case TypeIndex::Time64: + if (s.datetime_multiplier == 1) + writeColumnImpl( + s, options, out, ConverterNumeric, Int64, Int64>( + s.primitive_column)); + else + writeColumnImpl( + s, options, out, ConverterTime64WithMultiplier( + s.primitive_column, s.datetime_multiplier)); + break; + default: throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected column type: {}", s.primitive_column->getFamilyName()); } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h index a09125cfbeb2..2c4b877324c6 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h @@ -40,6 +40,7 @@ DEFINE_ICEBERG_FIELD(time); DEFINE_ICEBERG_FIELD(timestamp); DEFINE_ICEBERG_FIELD(timestamptz); DEFINE_ICEBERG_FIELD(type) +DEFINE_ICEBERG_FIELD(logicalType); /// this field has a camelCase name DEFINE_ICEBERG_FIELD(transform); DEFINE_ICEBERG_FIELD(direction); diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index c2d5a0c0c5bc..c84dbd68b240 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -129,6 +130,8 @@ bool canDumpIcebergStats(const Field & field, DataTypePtr type) case TypeIndex::Date32: case TypeIndex::Int64: case TypeIndex::DateTime64: + case TypeIndex::Time: + case TypeIndex::Time64: case TypeIndex::String: return true; default: @@ -144,6 +147,46 @@ std::vector dumpValue(T value) return bytes; } +DataTypePtr getTimeTypeOrNull(DataTypePtr type) +{ + if (type->isNullable()) + return getTimeTypeOrNull(assert_cast(type.get())->getNestedType()); + + const WhichDataType which(type); + if (which.isTime() || which.isTime64()) + return type; + + return nullptr; +} + +Int64 getTimeValueInMicroseconds(const Field & field, DataTypePtr type) +{ + if (type->isNullable()) + return getTimeValueInMicroseconds(field, assert_cast(type.get())->getNestedType()); + + const WhichDataType which(type); + if (which.isTime()) + { + if (field.getType() == Field::Types::Int64) + return field.safeGet() * 1'000'000; + if (field.getType() == Field::Types::UInt64) + return static_cast(field.safeGet()) * 1'000'000; + return static_cast(field.safeGet()) * 1'000'000; + } + + if (which.isTime64()) + { + const auto scale = getDecimalScale(*type); + if (scale > 6) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported type for iceberg {}", type->getName()); + + const auto value = field.safeGet().getValue().value; + return value * DataTypeTime64::getScaleMultiplier(6 - scale).value; + } + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected Time or Time64, got {}", type->getName()); +} + std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) { switch (type->getTypeId()) @@ -154,8 +197,12 @@ std::vector dumpFieldToBytes(const Field & field, DataTypePtr type) case TypeIndex::Date: case TypeIndex::Date32: return dumpValue(field.safeGet()); + case TypeIndex::Time: + return dumpValue(getTimeValueInMicroseconds(field, type)); case TypeIndex::Int64: return dumpValue(field.safeGet()); + case TypeIndex::Time64: + return dumpValue(getTimeValueInMicroseconds(field, type)); case TypeIndex::DateTime64: return dumpValue(field.safeGet().getValue().value); case TypeIndex::String: @@ -219,7 +266,16 @@ void extendSchemaForPartitions( Poco::JSON::Object::Ptr field = new Poco::JSON::Object; field->set(Iceberg::f_field_id, 1000 + i); field->set(Iceberg::f_name, partition_columns[i]); - field->set(Iceberg::f_type, getAvroType(partition_types[i])); + auto logical_type = getAvroLogicalType(partition_types[i]); + if (!logical_type.isEmpty()) + { + Poco::JSON::Object::Ptr type_field = new Poco::JSON::Object; + type_field->set(Iceberg::f_type, getAvroType(partition_types[i])); + type_field->set(Iceberg::f_logicalType, logical_type); + field->set(Iceberg::f_type, type_field); + } + else + field->set(Iceberg::f_type, getAvroType(partition_types[i])); partition_fields->add(field); } @@ -385,6 +441,14 @@ void generateManifestFile( avro::GenericRecord & partition_record = data_file.field("partition").value(); for (size_t i = 0; i < partition_columns.size(); ++i) { + auto partition_time_type = getTimeTypeOrNull(partition_types[i]); + if (!partition_values[i].isNull() && partition_time_type) + { + partition_record.field(partition_columns[i]) = + avro::GenericDatum(getTimeValueInMicroseconds(partition_values[i], partition_types[i])); + continue; + } + switch (partition_values[i].getType()) { case Field::Types::Int64: @@ -412,7 +476,6 @@ void generateManifestFile( partition_record.field(partition_columns[i]) = avro::GenericDatum(partition_values[i].safeGet().getValue()); break; - case Field::Types::Null: break; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp index 4efa9b46615a..dda2050c9024 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp @@ -243,7 +243,7 @@ DataTypePtr IcebergSchemaProcessor::getSimpleType(const String & type_name, Cont if (type_name == f_date) return std::make_shared(); if (type_name == f_time) - return std::make_shared(); + return std::make_shared(6); if (type_name == f_timestamp) return std::make_shared(6); if (type_name == f_timestamptz) diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp index 050d4f70b7c1..c8df8a4ef825 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -403,6 +404,10 @@ std::pair getIcebergType(DataTypePtr type, Int32 & ite return {"timestamp", true}; case TypeIndex::Time: return {"time", true}; + case TypeIndex::Time64: + if (getDecimalScale(*type) <= 6) + return {"time", true}; + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported type for iceberg {}", type->getName()); case TypeIndex::String: return {"string", true}; case TypeIndex::UUID: @@ -476,13 +481,21 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type) case TypeIndex::Int32: case TypeIndex::Date: case TypeIndex::Date32: - case TypeIndex::Time: return "int"; case TypeIndex::UInt64: case TypeIndex::Int64: case TypeIndex::DateTime: case TypeIndex::DateTime64: + case TypeIndex::Time: return "long"; + case TypeIndex::Time64: + { + auto scale = getDecimalScale(*type); + if (scale <= 6) + return "long"; + else + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported type for iceberg {}", type->getName()); + } case TypeIndex::Float32: return "float"; case TypeIndex::Float64: @@ -500,6 +513,27 @@ Poco::Dynamic::Var getAvroType(DataTypePtr type) } } +Poco::Dynamic::Var getAvroLogicalType(DataTypePtr type) +{ + if (type->isNullable()) + { + auto type_nullable = std::static_pointer_cast(type); + return getAvroLogicalType(type_nullable->getNestedType()); + } + + const WhichDataType which(type); + if (which.isTime()) + return "time-micros"; + if (which.isTime64()) + { + auto scale = getDecimalScale(*type); + if (scale <= 6) + return "time-micros"; + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unsupported time precision for avro {}({})", type->getName(), scale); + } + return Poco::Dynamic::Var(); +} + Poco::JSON::Object::Ptr getPartitionField( ASTPtr partition_by_element, const std::unordered_map & column_name_to_source_id, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h index 6aa8013a4214..5cea603be3c2 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h @@ -87,6 +87,7 @@ Poco::JSON::Object::Ptr getMetadataJSONObject( std::pair getIcebergType(DataTypePtr type, Int32 & iter); Poco::Dynamic::Var getAvroType(DataTypePtr type); +Poco::Dynamic::Var getAvroLogicalType(DataTypePtr type); /// Spec: https://iceberg.apache.org/spec/?h=metadata.json#table-metadata-fields std::pair createEmptyMetadataFile( diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index 701b3bc6e8d8..ec68d174d4ce 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -5,7 +5,7 @@ import random import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, time as dtime import pyarrow as pa import pytest @@ -26,7 +26,8 @@ StringType, StructType, TimestampType, - TimestamptzType + TimestamptzType, + TimeType, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER @@ -1015,3 +1016,99 @@ def _test_cluster_joins(started_cluster): ) assert res == "Jack\tBlack\nJack\tSilver\nJohn\tBlack\nJohn\tSilver\n" + + +def test_partitioning_by_time(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_time_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=TimeType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": dtime(12,0,0), "value": "test1"}, + {"key": dtime(13,0,0), "value": "test2"}, + {"key": dtime(14,0,0), "value": "test3"}, + ] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n14:00:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` WHERE key = '13:00:00.000000' ORDER BY key") == "13:00:00.000000\ttest2\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` WHERE key >= '13:00:00.000000' ORDER BY key") == "13:00:00.000000\ttest2\n14:00:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}` WHERE key <= '13:00:00.000000' ORDER BY key") == "12:00:00.000000\ttest1\n13:00:00.000000\ttest2\n" + + +def test_partitioning_by_string(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_partitioning_by_string_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + namespace = f"{root_namespace}.A" + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(namespace) + + schema = Schema( + NestedField( + field_id=1, + name="key", + field_type=StringType(), + required=False + ), + NestedField( + field_id=2, + name="value", + field_type=StringType(), + required=False, + ), + NestedField( + field_id=3, + name="time_value", + field_type=TimeType(), + required=False, + ), + ) + + partition_spec = PartitionSpec( + PartitionField( + source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_key" + ) + ) + + table = create_table(catalog, namespace, table_name, schema=schema, partition_spec=partition_spec) + data = [{"key": "a:b,c[d=e/f%g?h", "value": "test", "time_value": dtime(12,0,0)}] + df = pa.Table.from_pylist(data) + table.append(df) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{namespace}.{table_name}`") == "a:b,c[d=e/f%g?h\ttest\t12:00:00.000000\n" diff --git a/tests/integration/test_storage_iceberg_no_spark/test_write_time.py b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py new file mode 100644 index 000000000000..c4562fede71e --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_write_time.py @@ -0,0 +1,41 @@ +import pytest +from helpers.config_cluster import minio_secret_key, minio_access_key +from helpers.iceberg_utils import get_uuid_str + + +@pytest.mark.parametrize( + "time_type", + [ + "Time", + "Time64(0)", + "Time64(3)", + "Time64(6)", + "Nullable(Time)", + "Nullable(Time64(0))", + "Nullable(Time64(3))", + "Nullable(Time64(6))", + ], +) +def test_write_time(started_cluster_iceberg_no_spark, time_type): + node = started_cluster_iceberg_no_spark.instances["node1"] + + TABLE_NAME = "test_partitioning_by_time_" + get_uuid_str() + + node.query( + f"CREATE TABLE `{TABLE_NAME}` (key {time_type}, value {time_type}, comment String) ENGINE = IcebergLocal('/var/lib/clickhouse/user_files/iceberg_data/default/{TABLE_NAME}', 'Parquet') PARTITION BY key", + settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1} + ) + node.query( + f"INSERT INTO `{TABLE_NAME}` VALUES ('12:00:00', '12:10:00', 'test1'), ('13:00:00', '13:10:00', 'test2'), ('14:00:00', '14:10:00', 'test3');", + settings={"allow_experimental_insert_into_iceberg": 1, 'write_full_path_in_iceberg_metadata': 1} + ) + + assert node.query(f"SELECT * FROM `{TABLE_NAME}` ORDER BY key") == "12:00:00.000000\t12:10:00.000000\ttest1\n13:00:00.000000\t13:10:00.000000\ttest2\n14:00:00.000000\t14:10:00.000000\ttest3\n" + # Partition pruning + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key = '13:00:00.000000' ORDER BY key") == "13:00:00.000000\t13:10:00.000000\ttest2\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key >= '13:00:00.000000' ORDER BY key") == "13:00:00.000000\t13:10:00.000000\ttest2\n14:00:00.000000\t14:10:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE key <= '13:00:00.000000' ORDER BY key") == "12:00:00.000000\t12:10:00.000000\ttest1\n13:00:00.000000\t13:10:00.000000\ttest2\n" + # Min/Max pruning + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE value = '13:10:00.000000' ORDER BY key") == "13:00:00.000000\t13:10:00.000000\ttest2\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE value >= '13:10:00.000000' ORDER BY key") == "13:00:00.000000\t13:10:00.000000\ttest2\n14:00:00.000000\t14:10:00.000000\ttest3\n" + assert node.query(f"SELECT * FROM `{TABLE_NAME}` WHERE value <= '13:10:00.000000' ORDER BY key") == "12:00:00.000000\t12:10:00.000000\ttest1\n13:00:00.000000\t13:10:00.000000\ttest2\n"