Skip to content

[FLINK-39732] Introduce TableDiscoverer SPI for flexible table subscription (with default JdbcTableDiscoverer)#4409

Open
loserwang1024 wants to merge 2 commits into
apache:masterfrom
loserwang1024:plugin-discover
Open

[FLINK-39732] Introduce TableDiscoverer SPI for flexible table subscription (with default JdbcTableDiscoverer)#4409
loserwang1024 wants to merge 2 commits into
apache:masterfrom
loserwang1024:plugin-discover

Conversation

@loserwang1024
Copy link
Copy Markdown
Contributor

@loserwang1024 loserwang1024 commented May 22, 2026

fix https://issues.apache.org/jira/browse/FLINK-39732

Summary

Today, every Flink CDC source connector hard-codes its own way of deciding which tables to subscribe to — typically a regex on table names, a hard-coded list, or a connector-specific include/exclude DSL. As Flink CDC grows beyond a single source family (MySQL, Postgres, Fluss, …), this duplication becomes painful:

  • Users have to learn a different "table selector" syntax per connector.
  • New use cases (multi-tenant subscription tables, runtime-driven table sets, custom SQL filters) cannot be expressed without forking the connector.
  • There is no shared extension point for the community to plug in custom table-selection strategies.

This issue proposes a small, connector-agnostic, pluggable SPI — TableDiscoverer — together with a default JDBC-backed implementation that covers the most common "subscription metadata table" pattern. The SPI is loaded via Java ServiceLoader, selected by a single config option table.discoverer.type, and shares a unified table.discoverer.* configuration namespace.

Motivation

The pluggable subscription mechanism aims to satisfy three demands at once:

  1. Decouple table-selection logic from source connectors. A user who wants "subscribe to the table list returned by this SQL" should not have to fork MySQL CDC, Postgres CDC, or Fluss CDC separately.
  2. Provide a sensible default. The vast majority of users today maintain a small DB table that lists which tables a CDC job should follow; a JDBC-backed default avoids reinventing this every time.
  3. Leave room for custom strategies. Service providers, multi-tenant platforms and feature-flag systems should be able to provide their own discoverer (e.g., REST API, ZK, configcenter) by adding a single SPI implementation jar.

A non-goal of this issue is to refactor existing source connectors to use the new SPI. That migration will happen incrementally in follow-up issues; this PR only introduces and ships the SPI plus a default implementation so that downstream connectors can opt in.

Public API

TableDiscoverer

@PublicEvolving
public interface TableDiscoverer extends Serializable, AutoCloseable {

    /** Initialise resources (e.g., open a JDBC connection). */
    void open(Context context) throws Exception;

    /** Return the set of tables to subscribe to. */
    Set<TableId> discover() throws Exception;

    /** Release resources. */
    @Override
    void close() throws Exception;

    interface Context {
        Configuration getConfiguration();
        ClassLoader getUserCodeClassLoader();
    }
}

Lifecycle contract:

  • open(Context) is invoked exactly once before the first discover().
  • discover() may be called multiple times during the source lifetime; implementations are encouraged to be idempotent and fast.
  • close() is called when the discoverer is no longer needed (job teardown, source restart) and must release I/O resources held in open().

TableDiscovererFactory

@PublicEvolving
public interface TableDiscovererFactory {
    String identifier();                                // e.g. "jdbc"
    TableDiscoverer createDiscoverer();

    static TableDiscoverer createDiscoverer(String type, ClassLoader cl);
    static TableDiscoverer.Context createContext(Configuration config, ClassLoader cl);
}

Resolution rules:

  • Factories are discovered through ServiceLoader<TableDiscovererFactory>.
  • table.discoverer.type is matched case-insensitively against identifier().
  • Multiple factories sharing the same identifier ⇒ IllegalStateException at startup (clear error message lists offending classes).
  • Unknown identifier ⇒ IllegalArgumentException listing the set of known identifiers, which makes config typos easy to debug.

Configuration namespace

All built-in and third-party discoverers are expected to use the unified prefix table.discoverer.*. The single global key is:

Key Description
table.discoverer.type Identifier of the discoverer to load (e.g. jdbc).

Implementation-specific keys live under table.discoverer.<identifier>.*.

Default implementation: JdbcTableDiscoverer

The JDBC discoverer reads the subscription list from any JDBC-compatible database. It supports two modes, chosen implicitly by the configuration:

Mode A — Shared subscription table (default & recommended)

Designed for the common multi-tenant pattern where many CDC jobs share one metadata table and each job filters by a subscribe-id.

Effective query (built with a PreparedStatement, injection-safe on the parameter):

SELECT <column-name>
  FROM <table-name>
 WHERE <subscribe-id-column> = ?
Key Required Default Notes
table.discoverer.jdbc.url yes JDBC connection URL
table.discoverer.jdbc.username yes
table.discoverer.jdbc.password yes
table.discoverer.jdbc.table-name yes Shared metadata table
table.discoverer.jdbc.subscribe-id yes Identifies this subscription set; mandatory in Mode A
table.discoverer.jdbc.column-name no subscribe_table_name Column holding fully-qualified table names
table.discoverer.jdbc.subscribe-id-column no subscribe_id Column used in the WHERE clause

Recommended schema:

CREATE TABLE cdc_subscriptions (
    subscribe_id         VARCHAR(64)  NOT NULL,
    subscribe_table_name VARCHAR(255) NOT NULL,
    PRIMARY KEY (subscribe_id, subscribe_table_name)
);

The legacy "unset subscribe-id ⇒ scan the whole metadata table" fallback has been intentionally removed: leaving subscribe-id blank is now an error, which prevents silent over-subscription in shared deployments.

Mode B — Custom SELECT (escape hatch)

For uncommon layouts (JOINs, soft filters, multi-column metadata) users may set:

Key Required
table.discoverer.jdbc.subscribe-query yes (when used)

Example

# pipeline.yaml — Mode A (shared subscription table)
source:
  type: ...
  table.discoverer.type: jdbc
  table.discoverer.jdbc.url: jdbc:mysql://meta:3306/cdc_meta
  table.discoverer.jdbc.username: cdc
  table.discoverer.jdbc.password: ******
  table.discoverer.jdbc.table-name: cdc_subscriptions
  table.discoverer.jdbc.subscribe-id: orders-subscription
# pipeline.yaml — Mode B (custom query)
source:
  type: ...
  table.discoverer.type: jdbc
  table.discoverer.jdbc.url: jdbc:mysql://meta:3306/cdc_meta
  table.discoverer.jdbc.username: cdc
  table.discoverer.jdbc.password: ******
  table.discoverer.jdbc.subscribe-query: |
    SELECT CONCAT(db, '.', tbl) FROM ops.cdc_meta
    WHERE enabled = 1 AND owner = 'team-orders'

@loserwang1024
Copy link
Copy Markdown
Contributor Author

@leonardBang , CC

@loserwang1024 loserwang1024 requested a review from leonardBang May 22, 2026 11:12
@github-actions github-actions Bot added the docs Improvements or additions to documentation label May 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common docs Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant