[FLINK-39732] Introduce TableDiscoverer SPI for flexible table subscription (with default JdbcTableDiscoverer)#4409
Open
loserwang1024 wants to merge 2 commits into
Open
Conversation
…iption (with default JdbcTableDiscoverer)
Contributor
Author
|
@leonardBang , CC |
…iption (with default JdbcTableDiscoverer)
7d3ee3a to
8903047
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
fix https://issues.apache.org/jira/browse/FLINK-39732
Summary
Today, every Flink CDC source connector hard-codes its own way of deciding which tables to subscribe to — typically a regex on table names, a hard-coded list, or a connector-specific include/exclude DSL. As Flink CDC grows beyond a single source family (MySQL, Postgres, Fluss, …), this duplication becomes painful:
This issue proposes a small, connector-agnostic, pluggable SPI —
TableDiscoverer— together with a default JDBC-backed implementation that covers the most common "subscription metadata table" pattern. The SPI is loaded via JavaServiceLoader, selected by a single config optiontable.discoverer.type, and shares a unifiedtable.discoverer.*configuration namespace.Motivation
The pluggable subscription mechanism aims to satisfy three demands at once:
A non-goal of this issue is to refactor existing source connectors to use the new SPI. That migration will happen incrementally in follow-up issues; this PR only introduces and ships the SPI plus a default implementation so that downstream connectors can opt in.
Public API
TableDiscovererLifecycle contract:
open(Context)is invoked exactly once before the firstdiscover().discover()may be called multiple times during the source lifetime; implementations are encouraged to be idempotent and fast.close()is called when the discoverer is no longer needed (job teardown, source restart) and must release I/O resources held inopen().TableDiscovererFactoryResolution rules:
ServiceLoader<TableDiscovererFactory>.table.discoverer.typeis matched case-insensitively againstidentifier().IllegalStateExceptionat startup (clear error message lists offending classes).IllegalArgumentExceptionlisting the set of known identifiers, which makes config typos easy to debug.Configuration namespace
All built-in and third-party discoverers are expected to use the unified prefix
table.discoverer.*. The single global key is:table.discoverer.typejdbc).Implementation-specific keys live under
table.discoverer.<identifier>.*.Default implementation:
JdbcTableDiscovererThe JDBC discoverer reads the subscription list from any JDBC-compatible database. It supports two modes, chosen implicitly by the configuration:
Mode A — Shared subscription table (default & recommended)
Designed for the common multi-tenant pattern where many CDC jobs share one metadata table and each job filters by a
subscribe-id.Effective query (built with a
PreparedStatement, injection-safe on the parameter):table.discoverer.jdbc.urltable.discoverer.jdbc.usernametable.discoverer.jdbc.passwordtable.discoverer.jdbc.table-nametable.discoverer.jdbc.subscribe-idtable.discoverer.jdbc.column-namesubscribe_table_nametable.discoverer.jdbc.subscribe-id-columnsubscribe_idWHEREclauseRecommended schema:
The legacy "unset
subscribe-id⇒ scan the whole metadata table" fallback has been intentionally removed: leavingsubscribe-idblank is now an error, which prevents silent over-subscription in shared deployments.Mode B — Custom SELECT (escape hatch)
For uncommon layouts (JOINs, soft filters, multi-column metadata) users may set:
table.discoverer.jdbc.subscribe-queryExample