Skip to content

Conversation

@MehulBatra
Copy link
Contributor

Purpose

Linked issue: close #978

Brief change log

Read support for virtual binlog table

Added:

  • BinlogRowConverter — the core stateful converter that buffers -U records and
    merges them with +U into a single row with before/after nested fields
  • BinlogDeserializationSchema — thin wrapper over the converter
  • BinlogFlinkTableSource — Flink ScanTableSource implementation

Modifed:

  • TableDescriptor — add BEFORE_COLUMN and AFTER_COLUMN constants
  • FlinkCatalog — wire $binlog detection, add PK-only validation, build nested
    ROW schema
  • FlinkTableFactory — add binlog source creation

Tests

FlinkCatalogIT - test get virtualbinlog table
Unit tests — BinlogRowConverterTest.java
Integration tests — BinlogVirtualTableITCase.java

API and Format

Documentation

@MehulBatra MehulBatra requested a review from wuchong January 29, 2026 16:32
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @MehulBatra for the great pull request. I left some comments inline.

Besides, regarding the metadata column _change_type in the $changelog and $binlog virtual tables, the current design uses abbreviations like +I, -U, +U, and -D.

I re-considered it again, while these align with Flink's internal RowKind representation, I think maybe using full descriptive strings instead would be better, such as:

  • insert (instead of +I and +A)
  • update_before (instead of -U)
  • update_after (instead of +U)
  • delete (instead of -D)

And for $binlog tables

  • insert (instead of I)
  • update (instead of U)
  • delete (instead of D)

Reasoning:

  • Improved Readability: Full words are much more intuitive for SQL users, especially those who may not be familiar with Flink's internal row-kind symbols.

  • Better User Experience: It makes the audit and incremental ETL queries more self-documenting (e.g., WHERE _change_type = 'insert').

  • Consistency: This approach is widely adopted by other query engines, like databricks (uses insert, delete, update_preimage), trino (uses insert and delete), snowflake (uses INSERT, DELETE)

What do you think about this change? This requires to change the previous $changelog table implementation, but I think it is worth to do this before the release.

[1] databricks: https://docs.databricks.com/aws/en/sql/language-manual/functions/table_changes
[2] trino: https://trino.io/docs/current/connector/iceberg.html#table-changes
[3] snowflake: https://docs.snowflake.com/en/user-guide/streams-intro#label-streams-changes-clause

Comment on lines +267 to +268
assertThat(results.get(0)).startsWith("+I[I, 0, ");
assertThat(results.get(0)).contains("null, +I[1, Alice]");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we assert a full row string?

Comment on lines +340 to +343
// All should be INSERT change types
for (String result : earliestResults) {
assertThat(result).startsWith("+I[I,");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please assert the full row. If you want to make the result deterministic and ordered. You can change the TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM config to 1 in the before() method.

List<String> timestampResults = collectRowsWithTimeout(rowIterTimestamp, 2, true);
assertThat(timestampResults).hasSize(2);
// Sort results for deterministic assertion
Collections.sort(timestampResults);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. You can change the TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM config to 1 in the before() method.


// Verify metadata columns
assertThat(result.getString(0)).isEqualTo(StringData.fromString("D"));
assertThat(result.getLong(1)).isEqualTo(300L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert timestamp column as well?

* source. The value is a comma-separated list of column names.
*/
public static final ConfigOption<String> BINLOG_PARTITION_KEYS =
ConfigOptions.key("binlog.partition-keys")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this

FactoryUtil.TableFactoryHelper helper, ReadableConfig tableOptions) {
Optional<DataLakeFormat> datalakeFormat = getDatalakeFormat(tableOptions);
List<String> prefixesToSkip =
new ArrayList<>(Arrays.asList("table.", "client.", "fields.", "binlog."));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rebase to latest main branch, we have removed the helper.validateExcept check in connector. See 336216c

Comment on lines +389 to +397
int[] partitionKeyIndexes;
if (partitionKeysOption != null && !partitionKeysOption.isEmpty()) {
String[] partitionKeyNames = partitionKeysOption.split(",");
partitionKeyIndexes = new int[partitionKeyNames.length];
for (int i = 0; i < partitionKeyNames.length; i++) {
partitionKeyIndexes[i] = dataColumnsType.getFieldIndex(partitionKeyNames[i]);
}
} else {
partitionKeyIndexes = new int[0];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A boolean flag isPartitionedTable is enough.

public static final String LOG_OFFSET_COLUMN = "_log_offset";
public static final String COMMIT_TIMESTAMP_COLUMN = "_commit_timestamp";

// Reserved column names for $binlog virtual table nested row fields
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Reserved column names for $binlog virtual table nested row fields
// column names for $binlog virtual table nested row fields

They are not "Reserved" names. "Reserved" means it is not allowed users to use.

Comment on lines +36 to +39
* <p>This schema is stateful: it buffers UPDATE_BEFORE (-U) records and returns {@code null} for
* them. When the subsequent UPDATE_AFTER (+U) record arrives, it merges both into a single binlog
* row. The {@link org.apache.fluss.flink.source.emitter.FlinkRecordEmitter} handles null returns by
* skipping emission.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed a potential data consistency issue in the current implementation of FlinkRecordEmitter regarding how it handles offsets for $binlog tables.

The Problem:
Currently, FlinkRecordEmitter advances the log offset in LogSplitState by 1 immediately after a record is processed, regardless of whether the record was fully emitted as a complete transaction (e.g., an UPDATE pair).

Specifically, for $binlog tables, an UPDATE operation is split into two logical events: UPDATE_BEFORE and UPDATE_AFTER. If a checkpoint occurs exactly between these two events and the job subsequently fails:

  1. The LogSplitState will have already been updated to offset + 1.
  2. Upon recovery, the source will skip the original log record and start reading from the next offset.
  3. The UPDATE_AFTER event is lost, or more critically, BinlogRowConverter#toBinlogRowData will throw an IllegalStateException because pendingUpdateBefore is null when it expects to complete an update sequence.

Suggested Solution:
We should modify FlinkRecordEmitter to only advance the log offset in the state when deserializationSchema.deserialize actually returns a non-null record (signaling that the logical processing for that physical record is complete and emitted).


Revised Code Implementation

I suggest refining the logic to ensure the emitted flag correctly dictates the state update. Note that I've added a comment to clarify that "emitted" here implies the full logical unit has been sent downstream.

} else if (splitState.isLogSplitState()) {
    // Attempt to process and emit the record. 
    // For $binlog, this returns true only when a complete row (or the final part of a split) is emitted.
    boolean emitted = processAndEmitRecord(recordAndPosition.record(), sourceOutput);
    
    if (emitted) {
        // Only advance the offset in state if the record was successfully emitted.
        // This ensures that if a crash occurs mid-update (between BEFORE and AFTER),
        // the source will re-read the same log offset upon recovery, 
        // allowing the BinlogDeserializationSchema to correctly reconstruct the state.
        splitState.asLogSplitState().setNextOffset(recordAndPosition.record().logOffset() + 1);
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add tests in FlinkRecordEmitterTest to verify the state offset in the UPDATE_BEFORE and UPDATE_AFTER case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wuchong, for the quick feedback. I will go through and address them one by one.

@MehulBatra
Copy link
Contributor Author

Thank @MehulBatra for the great pull request. I left some comments inline.

Besides, regarding the metadata column _change_type in the $changelog and $binlog virtual tables, the current design uses abbreviations like +I, -U, +U, and -D.

I re-considered it again, while these align with Flink's internal RowKind representation, I think maybe using full descriptive strings instead would be better, such as:

  • insert (instead of +I and +A)
  • update_before (instead of -U)
  • update_after (instead of +U)
  • delete (instead of -D)

And for $binlog tables

  • insert (instead of I)
  • update (instead of U)
  • delete (instead of D)

Reasoning:

  • Improved Readability: Full words are much more intuitive for SQL users, especially those who may not be familiar with Flink's internal row-kind symbols.
  • Better User Experience: It makes the audit and incremental ETL queries more self-documenting (e.g., WHERE _change_type = 'insert').
  • Consistency: This approach is widely adopted by other query engines, like databricks (uses insert, delete, update_preimage), trino (uses insert and delete), snowflake (uses INSERT, DELETE)

What do you think about this change? This requires to change the previous $changelog table implementation, but I think it is worth to do this before the release.

[1] databricks: https://docs.databricks.com/aws/en/sql/language-manual/functions/table_changes [2] trino: https://trino.io/docs/current/connector/iceberg.html#table-changes [3] snowflake: https://docs.snowflake.com/en/user-guide/streams-intro#label-streams-changes-clause

Agreed. Following industry standards (Databricks, Trino, Snowflake) makes integration and migration much easier for users. The improved readability is worth the change before release.
I'll update the this PR accordingly and create a seperate one for changelog.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add binglog system table support for fluss tables

2 participants