-
Notifications
You must be signed in to change notification settings - Fork 493
[lake/lance] Add Array type support for Lance #2537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[lake/lance] Add Array type support for Lance #2537
Conversation
dece07f to
0146245
Compare
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luoyuxia could you help to review this?
| } | ||
|
|
||
| @Test | ||
| void testTieringWriteTableWithArrayType() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test looks unusual—it manually instantiates components like LakeWriter and LakeCommitter using LanceLakeTieringFactory to perform tiering. However, this approach is highly error-prone and doesn’t represent the actual end-to-end path that needs to be covered when adding support for a new type like ARRAY. Consider this: if we were to add 10 new types, would we really need to write 10 separate, hand-wired integration tests like this? Such tests are not maintainable.
To properly validate ARRAY type support, we should perform an end-to-end verification. Specifically, we can simply add a few ARRAY-type columns (like array<string> and array<int>) to the logTable created in org.apache.fluss.lake.lance.tiering.LanceTieringITCase#testTiering, and then verify that these array fields are correctly written and read in the result assertions.
Additionally, for the Lance format, ARRAY types are especially important for vector embeddings. Therefore, we should also include a dedicated test case for ARRAY<FLOAT> to ensure proper handling of vector data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for Array data types in the Lance storage format for Fluss lake storage, addressing issue #2186. The implementation enables storing and retrieving tables with array columns by adding proper type mapping and data conversion logic between Fluss's internal representation and Arrow's List format used by Lance.
Changes:
- Added ArrayType to Arrow List type conversion in LanceArrowUtils with proper recursive handling for nested array structures
- Implemented specialized ListVector data copying in ArrowDataConverter to bridge shaded and non-shaded Arrow implementations
- Added test coverage for basic array functionality with STRING and INT array types
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java | Adds ArrayType conversion to Arrow List type with recursive child field creation for array element types |
| fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java | Implements ListVector-specific data copying logic with recursive handling for nested array elements |
| fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java | Adds test case for array type support with helper methods for generating and verifying array data |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ListVector tagsVector = (ListVector) root.getVector(2); | ||
| Object tagsObject = tagsVector.getObject(i); | ||
| assertThat(tagsObject).isNotNull(); | ||
|
|
||
| ListVector scoresVector = (ListVector) root.getVector(3); | ||
| Object scoresObject = scoresVector.getObject(i); | ||
| assertThat(scoresObject).isNotNull(); |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test verification only checks that array objects are not null, but doesn't verify the actual array contents. This means the test would pass even if the arrays are empty or contain incorrect values. The verification should compare the actual array elements against the expected values from the generated records. For example, for the tags array, you should extract the list of strings and compare them with the expected BinaryString values from the expectRecord.
| @Test | ||
| void testTieringWriteTableWithArrayType() throws Exception { | ||
| int bucketNum = 2; | ||
| TablePath tablePath = TablePath.of("lance", "arrayTable"); | ||
| Map<String, String> customProperties = new HashMap<>(); | ||
| customProperties.put("lance.batch_size", "256"); | ||
| LanceConfig config = | ||
| LanceConfig.from( | ||
| configuration.toMap(), | ||
| customProperties, | ||
| tablePath.getDatabaseName(), | ||
| tablePath.getTableName()); | ||
|
|
||
| List<Schema.Column> columns = new ArrayList<>(); | ||
| columns.add(new Schema.Column("id", DataTypes.INT())); | ||
| columns.add(new Schema.Column("name", DataTypes.STRING())); | ||
| columns.add(new Schema.Column("tags", DataTypes.ARRAY(DataTypes.STRING()))); | ||
| columns.add(new Schema.Column("scores", DataTypes.ARRAY(DataTypes.INT()))); | ||
| Schema.Builder schemaBuilder = Schema.newBuilder().fromColumns(columns); | ||
| Schema schema = schemaBuilder.build(); | ||
|
|
||
| WriteParams params = LanceConfig.genWriteParamsFromConfig(config); | ||
| LanceDatasetAdapter.createDataset( | ||
| config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params); | ||
|
|
||
| TableDescriptor descriptor = | ||
| TableDescriptor.builder() | ||
| .schema(schema) | ||
| .distributedBy(bucketNum) | ||
| .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) | ||
| .customProperties(customProperties) | ||
| .build(); | ||
| TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); | ||
|
|
||
| List<LanceWriteResult> lanceWriteResults = new ArrayList<>(); | ||
| SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer = | ||
| lanceLakeTieringFactory.getWriteResultSerializer(); | ||
| SimpleVersionedSerializer<LanceCommittable> committableSerializer = | ||
| lanceLakeTieringFactory.getCommittableSerializer(); | ||
|
|
||
| Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>(); | ||
|
|
||
| for (int bucket = 0; bucket < bucketNum; bucket++) { | ||
| try (LakeWriter<LanceWriteResult> lakeWriter = | ||
| createLakeWriter(tablePath, bucket, null, tableInfo)) { | ||
| List<LogRecord> writtenRecords = genArrayTableRecords(bucket, 5); | ||
| recordsByBucket.put(bucket, writtenRecords); | ||
| for (LogRecord logRecord : writtenRecords) { | ||
| lakeWriter.write(logRecord); | ||
| } | ||
| LanceWriteResult lanceWriteResult = lakeWriter.complete(); | ||
| byte[] serialized = writeResultSerializer.serialize(lanceWriteResult); | ||
| lanceWriteResults.add( | ||
| writeResultSerializer.deserialize( | ||
| writeResultSerializer.getVersion(), serialized)); | ||
| } | ||
| } | ||
|
|
||
| try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter = | ||
| createLakeCommitter(tablePath, tableInfo)) { | ||
| LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults); | ||
| byte[] serialized = committableSerializer.serialize(lanceCommittable); | ||
| lanceCommittable = | ||
| committableSerializer.deserialize( | ||
| committableSerializer.getVersion(), serialized); | ||
| Map<String, String> snapshotProperties = | ||
| Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets"); | ||
| long snapshot = lakeCommitter.commit(lanceCommittable, snapshotProperties); | ||
| assertThat(snapshot).isEqualTo(2); | ||
| } | ||
|
|
||
| try (Dataset dataset = | ||
| Dataset.open( | ||
| new RootAllocator(), | ||
| config.getDatasetUri(), | ||
| LanceConfig.genReadOptionFromConfig(config))) { | ||
| ArrowReader reader = dataset.newScan().scanBatches(); | ||
| VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); | ||
|
|
||
| for (int bucket = 0; bucket < bucketNum; bucket++) { | ||
| reader.loadNextBatch(); | ||
| List<LogRecord> expectRecords = recordsByBucket.get(bucket); | ||
| verifyArrayTableRecords(readerRoot, expectRecords); | ||
| } | ||
| assertThat(reader.loadNextBatch()).isFalse(); | ||
| } | ||
| } |
Copilot
AI
Feb 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test only covers non-null arrays with values. Consider adding test cases for edge cases such as:
- Null array values (e.g.,
genericRow.setField(2, null)) - Empty arrays (e.g.,
new GenericArray(new BinaryString[0])) - Nested arrays (e.g.,
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
These edge cases are important to verify that the Arrow schema conversion and data copying logic handle all scenarios correctly.
Purpose
Linked issue: close #2186
Add Array type support for Lance storage format to enable storing and retrieving tables with array columns.
Brief change log
LanceArrowUtils.java:
toArrowType()method to convert Fluss ArrayType to Arrow List typetoArrowField()method to create children fields for ArrayType, properly handling nested array structures by recursively creating element fieldsArrowDataConverter.java:
copyListVectorData()method to handle copying of ListVector data between shaded and non-shaded Arrow implementationscopyVectorData()method to detect and delegate ListVector copying to the specialized handlerLanceTieringTest.java:
testTieringWriteTableWithArrayType()to verify array type supportgenArrayTableRecords()andverifyArrayTableRecords()for array data generation and verificationTests
LanceTieringTest#testTieringWriteTableWithArrayType- Tests writing and reading tables with array columns (STRING array and INT array) through Lance storageAPI and Format
Documentation