Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,11 @@ public void testBinaryVector() {
InternalVector vector2 = row.getVector(0);
assertThat(vector2.size()).isEqualTo(vector.size());
assertThat(vector2.toFloatArray()).isEqualTo(vector.toFloatArray());
assertThat(
DataFormatTestUtil.toStringNoRowKind(
row,
RowType.of(DataTypes.VECTOR(vector.size(), DataTypes.FLOAT()))))
.isEqualTo(Arrays.toString(vector.toFloatArray()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VectorType;
import org.apache.paimon.utils.StringUtils;

import java.util.Arrays;
Expand All @@ -48,16 +49,7 @@ public static String toStringNoRowKind(InternalRow row, RowType type) {
if (field instanceof byte[]) {
build.append(Arrays.toString((byte[]) field));
} else if (field instanceof InternalArray) {
InternalArray internalArray = (InternalArray) field;
ArrayType arrayType = (ArrayType) type.getTypeAt(i);
InternalArray.ElementGetter elementGetter =
InternalArray.createElementGetter(arrayType.getElementType());
String[] result = new String[internalArray.size()];
for (int j = 0; j < internalArray.size(); j++) {
Object object = elementGetter.getElementOrNull(internalArray, j);
result[j] = null == object ? null : object.toString();
}
build.append(Arrays.toString(result));
build.append(getArrayLikeString((InternalArray) field, type.getTypeAt(i)));
} else {
build.append(field);
}
Expand Down Expand Up @@ -90,19 +82,7 @@ public static String getDataFieldString(Object field, DataType type) {
if (field instanceof byte[]) {
return Arrays.toString((byte[]) field);
} else if (field instanceof InternalArray) {
InternalArray internalArray = (InternalArray) field;
ArrayType arrayType = (ArrayType) type;
InternalArray.ElementGetter elementGetter =
InternalArray.createElementGetter(arrayType.getElementType());
String[] result = new String[internalArray.size()];
for (int j = 0; j < internalArray.size(); j++) {
Object object = elementGetter.getElementOrNull(internalArray, j);
result[j] =
null == object
? null
: getDataFieldString(object, arrayType.getElementType());
}
return Arrays.toString(result);
return getArrayLikeString((InternalArray) field, type);
} else if (field instanceof InternalRow) {
return String.format("(%s)", toStringWithRowKind((InternalRow) field, (RowType) type));
} else if (field instanceof InternalMap) {
Expand Down Expand Up @@ -139,6 +119,24 @@ public static String getDataFieldString(Object field, DataType type) {
}
}

private static String getArrayLikeString(InternalArray internalArray, DataType type) {
DataType elementType;
if (type instanceof VectorType) {
elementType = ((VectorType) type).getElementType();
} else if (type instanceof ArrayType) {
elementType = ((ArrayType) type).getElementType();
} else {
throw new IllegalArgumentException("Unsupported type for array data: " + type);
}
InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(elementType);
String[] result = new String[internalArray.size()];
for (int j = 0; j < internalArray.size(); j++) {
Object object = elementGetter.getElementOrNull(internalArray, j);
result[j] = null == object ? null : getDataFieldString(object, elementType);
}
return Arrays.toString(result);
}

/** Stringify the given {@link InternalRow}. */
public static String internalRowToString(InternalRow row, RowType type) {
return row.getRowKind().shortString() + "[" + toStringNoRowKind(row, type) + ']';
Expand Down
53 changes: 53 additions & 0 deletions paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.BinaryVector;
import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
Expand Down Expand Up @@ -798,6 +799,58 @@ public void testJavaWriteCompressedTextAppendTable() throws Exception {
}
}

@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testJavaWriteVectorAppendTable() throws Exception {
Identifier identifier = identifier("mixed_test_vector_append_tablej_avro");
catalog.dropTable(identifier, true);
Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("embedding", DataTypes.VECTOR(3, DataTypes.FLOAT()))
.column("label", DataTypes.STRING())
.option("file.format", "avro")
.option("bucket", "-1")
.build();

catalog.createTable(identifier, schema, true);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);

BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
write.write(
GenericRow.of(
1,
BinaryVector.fromPrimitiveArray(new float[] {1.0f, 2.0f, 3.0f}),
BinaryString.fromString("first")));
write.write(
GenericRow.of(
2,
BinaryVector.fromPrimitiveArray(new float[] {4.0f, 5.0f, 6.0f}),
BinaryString.fromString("second")));
write.write(
GenericRow.of(
3,
BinaryVector.fromPrimitiveArray(new float[] {-1.0f, 0.5f, 2.5f}),
BinaryString.fromString("third")));
commit.commit(write.prepareCommit());
}

List<Split> splits = new ArrayList<>(table.newSnapshotReader().read().dataSplits());
TableRead read = table.newRead();
List<String> res =
getResult(
read,
splits,
row -> DataFormatTestUtil.toStringNoRowKind(row, table.rowType()));
assertThat(res)
.containsExactlyInAnyOrder(
"1, [1.0, 2.0, 3.0], first",
"2, [4.0, 5.0, 6.0], second",
"3, [-1.0, 0.5, 2.5], third");
}

@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testBlobWriteAlterCompact() throws Exception {
Expand Down
39 changes: 38 additions & 1 deletion paimon-python/dev/run_mixed_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,29 @@ run_compressed_text_test() {
fi
}

run_vector_append_table_test() {
echo -e "${YELLOW}=== Running Vector Append Table Test (Java Write, Python Read) ===${NC}"

cd "$PROJECT_ROOT"

echo "Running Maven test for JavaPyE2ETest.testJavaWriteVectorAppendTable..."
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testJavaWriteVectorAppendTable -pl paimon-core -q -Drun.e2e.tests=true; then
echo -e "${GREEN}✓ Java test completed successfully${NC}"
else
echo -e "${RED}✗ Java test failed${NC}"
return 1
fi
cd "$PAIMON_PYTHON_DIR"
echo "Running Python test for JavaPyReadWriteTest.test_read_vector_append_table..."
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest::test_read_vector_append_table -v; then
echo -e "${GREEN}✓ Python test completed successfully${NC}"
return 0
else
echo -e "${RED}✗ Python test failed${NC}"
return 1
fi
}

# Function to run Tantivy full-text index test (Java write index, Python read and search)
run_tantivy_fulltext_test() {
echo -e "${YELLOW}=== Step 8: Running Tantivy Full-Text Index Test (Java Write, Python Read) ===${NC}"
Expand Down Expand Up @@ -504,6 +527,7 @@ main() {
local pk_dv_result=0
local btree_index_result=0
local compressed_text_result=0
local vector_append_table_result=0
local tantivy_fulltext_result=0
local lumina_vector_result=0
local lumina_vector_btree_result=0
Expand Down Expand Up @@ -576,6 +600,13 @@ main() {

echo ""

# Run Vector append table test (Java write, Python read)
if ! run_vector_append_table_test; then
vector_append_table_result=1
fi

echo ""

# Run Tantivy full-text index test (requires Python >= 3.10)
if [[ "$PYTHON_MINOR" -ge 10 ]]; then
if ! run_tantivy_fulltext_test; then
Expand Down Expand Up @@ -693,6 +724,12 @@ main() {
echo -e "${RED}✗ Compressed Text Test (Java Write, Python Read): FAILED${NC}"
fi

if [[ $vector_append_table_result -eq 0 ]]; then
echo -e "${GREEN}✓ Vector Append Table Test (Java Write, Python Read): PASSED${NC}"
else
echo -e "${RED}✗ Vector Append Table Test (Java Write, Python Read): FAILED${NC}"
fi

if [[ $tantivy_fulltext_result -eq 0 ]]; then
echo -e "${GREEN}✓ Tantivy Full-Text Index Test (Java Write, Python Read): PASSED${NC}"
else
Expand Down Expand Up @@ -752,7 +789,7 @@ main() {
# Clean up warehouse directory after all tests
cleanup_warehouse

if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 && $data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq 0 && $py_variant_write_java_read_result -eq 0 ]]; then
if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && $python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && $btree_index_result -eq 0 && $compressed_text_result -eq 0 && $tantivy_fulltext_result -eq 0 && $lumina_vector_result -eq 0 && $lumina_vector_btree_result -eq 0 && $compact_conflict_result -eq 0 && $blob_alter_compact_result -eq 0 && $data_evolution_result -eq 0 && $data_evolution_py_write_result -eq 0 && $java_variant_write_py_read_result -eq 0 && $py_variant_write_java_read_result -eq 0 && $vector_append_table_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability verified.${NC}"
return 0
else
Expand Down
78 changes: 77 additions & 1 deletion paimon-python/pypaimon/schema/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,66 @@ def __str__(self) -> str:
return "ARRAY<{}>{}".format(self.element, null_suffix)


@dataclass
class VectorType(DataType):
element: DataType
length: int

VALID_ELEMENT_TYPES = {
"BOOLEAN",
"TINYINT",
"SMALLINT",
"INT",
"INTEGER",
"BIGINT",
"FLOAT",
"DOUBLE",
}

def __init__(self, nullable: bool, element_type: DataType, length: int):
super().__init__(nullable)
if length < 1:
raise ValueError("Vector length must be greater than or equal to 1.")
if not self.is_valid_element_type(element_type):
raise ValueError("Invalid element type for vector: {}".format(element_type))
self.element = element_type
self.length = length

@classmethod
def is_valid_element_type(cls, element_type: DataType) -> bool:
if not isinstance(element_type, AtomicType):
return False
return element_type.type.upper() in cls.VALID_ELEMENT_TYPES

def __eq__(self, other):
if self is other:
return True
if not isinstance(other, VectorType):
return False
return (self.element == other.element
and self.length == other.length
and self.nullable == other.nullable)

def __hash__(self):
return hash((self.element, self.length, self.nullable))

def to_dict(self) -> Dict[str, Any]:
return {
"type": "VECTOR" + (" NOT NULL" if not self.nullable else ""),
"element": self.element.to_dict() if self.element else None,
"length": self.length,
"nullable": self.nullable
}

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "VectorType":
return DataTypeParser.parse_data_type(data)

def __str__(self) -> str:
null_suffix = "" if self.nullable else " NOT NULL"
return "VECTOR<{}, {}>{}".format(self.element, self.length, null_suffix)


@dataclass
class MultisetType(DataType):
element: DataType
Expand Down Expand Up @@ -387,6 +447,14 @@ def parse_data_type(
nullable = "NOT NULL" not in type_string
return ArrayType(nullable, element)

elif type_string.startswith("VECTOR"):
element = DataTypeParser.parse_data_type(
json_data.get("element"), field_id
)
length = int(json_data.get("length"))
nullable = "NOT NULL" not in type_string
return VectorType(nullable, element, length)

elif type_string.startswith("MULTISET"):
element = DataTypeParser.parse_data_type(
json_data.get("element"), field_id
Expand Down Expand Up @@ -536,6 +604,8 @@ def from_paimon_type(data_type: DataType) -> pyarrow.DataType:
return pyarrow.time32('ms')
elif isinstance(data_type, ArrayType):
return pyarrow.list_(PyarrowFieldParser.from_paimon_type(data_type.element))
elif isinstance(data_type, VectorType):
return pyarrow.list_(PyarrowFieldParser.from_paimon_type(data_type.element), data_type.length)
elif isinstance(data_type, MapType):
key_type = PyarrowFieldParser.from_paimon_type(data_type.key)
value_type = PyarrowFieldParser.from_paimon_type(data_type.value)
Expand Down Expand Up @@ -603,6 +673,10 @@ def to_paimon_type(pa_type: pyarrow.DataType, nullable: bool) -> DataType:
type_name = 'DATE'
elif types.is_time(pa_type):
type_name = 'TIME(0)'
elif types.is_fixed_size_list(pa_type):
pa_type: pyarrow.FixedSizeListType
element_type = PyarrowFieldParser.to_paimon_type(pa_type.value_type, pa_type.value_field.nullable)
return VectorType(nullable, element_type, pa_type.list_size)
elif types.is_list(pa_type) or types.is_large_list(pa_type):
pa_type: pyarrow.ListType
element_type = PyarrowFieldParser.to_paimon_type(pa_type.value_type, nullable)
Expand Down Expand Up @@ -697,7 +771,9 @@ def to_avro_type(field_type: pyarrow.DataType, field_name: str,
return {"type": "long", "logicalType": "local-timestamp-micros"}
else:
raise ValueError(f"Avro does not support pyarrow timestamp with unit {unit}.")
elif pyarrow.types.is_list(field_type) or pyarrow.types.is_large_list(field_type):
elif pyarrow.types.is_fixed_size_list(field_type) or \
pyarrow.types.is_list(field_type) or \
pyarrow.types.is_large_list(field_type):
value_field = field_type.value_field
return {
"type": "array",
Expand Down
Loading
Loading