diff --git a/README.md b/README.md index e790ba31d..f255a2f81 100644 --- a/README.md +++ b/README.md @@ -35,231 +35,14 @@ Springtail replicas are well-suited for traditional read replica workloads: Any read-only transaction that might cause contention on your primary database is an ideal candidate for Springtail. -## Prerequisites +## Documentation -- Docker and Docker Compose -- Python 3 -- AWS CLI (for local cluster only) +Full documentation is available at **[docs.springtail.io](https://docs.springtail.io/)**, including: -## Quick Start: Local Development Environment - -The local dev environment runs a PostgreSQL 16 instance and a development container with all build tooling pre-installed. - -### 1. Build the base Docker image - -From the repository root: - -```shell -cd docker -./docker_base.sh -``` - -This builds the `springtail:base` image from `Dockerfile.base`, which includes a patched PostgreSQL 16 (built from source with RLS support for foreign tables), Redis, the C++ toolchain, and all Ansible-provisioned dependencies. - -### 2. Start the dev environment - -```shell -export SPRINGTAIL_SRC=/path/to/springtail # absolute path to your repo checkout -docker compose up -d -``` - -This starts two services: - -| Service | Container | Description | Host Port | -|------------|-------------------|----------------------------------|-----------| -| `postgres` | `pg16` | PostgreSQL 16 with logical replication enabled | 5432 | -| `dev` | `dev-springtail` | Development container with build tools | 2222 (SSH) | - -The dev container mounts your source tree at `/home/dev/springtail` and starts PostgreSQL, Redis, and SSH automatically via its entrypoint. - -### 3. Build Springtail inside the container - -Shell into the dev container and run the debug build: - -```shell -docker exec -it dev-springtail bash - -# Inside the container: -cd ~/springtail -./vcpkg.sh # one-time: install C++ dependencies -./debug.sh # build debug binaries into ./debug/ -``` - -### 4. Run the unit tests (C++ / CTest) - -```shell -cd ~/springtail/debug -make build_tests -ctest -``` - -Or build and run in one step: - -```shell -cmake --build debug --target check -``` - -The `check` target kills any running Springtail processes, installs SQL triggers, builds the tests, and runs them via CTest. - -### 5. Run the integration tests - -The integration test runner is a Python script that exercises Springtail end-to-end against a real PostgreSQL instance. It must be run from its own directory: - -```shell -cd ~/springtail/python/testing -python3 test_runner.py -``` - -This runs the **default** test configuration, which includes the test sets `basic`, `framework`, `preload`, `enum_bits`, `complex`, `numeric`, `query_benchmark`, and `recovery` (with various overlay configurations). - -#### Common test_runner.py options - -```shell -# Run the default configuration (same as no arguments) -python3 test_runner.py - -# Run a specific named configuration (e.g., nightly, github_ci_p1) -python3 test_runner.py -c nightly - -# Run a single test set -python3 test_runner.py basic - -# Run specific test cases within a test set -python3 test_runner.py basic test_create.sql test_insert.sql - -# Run with a specific overlay -python3 test_runner.py -o small_log_rotate recovery - -# Skip downloading test data from S3 (useful offline or in CI) -python3 test_runner.py --skip-downloads - -# Output JUnit XML report -python3 test_runner.py -j results.xml -``` - -Available test sets: `basic`, `complex`, `enum_bits`, `framework`, `include_schema`, `large_data`, `live_startup`, `numeric`, `policy_roles`, `preload`, `query_benchmark`, `recovery`, `text_tables`. - -Available overlays: `small_log_rotate`, `small_log_rotate_with_streaming`, `small_cache_size`, `streaming_postgres_config`, `integration_test_config`, `include_schema_config`. - -### 6. Tear down - -```shell -cd /path/to/springtail/docker -docker compose down -v -``` - ---- - -## Quick Start: Local Cluster (Multi-Node) - -The local cluster simulates a full multi-node Springtail deployment using Docker Compose. It runs a primary database, Redis, a mock AWS environment, and the full set of Springtail services (proxy, ingestion, FDW nodes, and controller). - -All cluster commands are run from the `local-cluster/` directory: - -```shell -cd local-cluster -``` - -### 1. Build the required Docker images - -From the repository root, build the base service image if it doesn't already exist: - -```shell -docker build -t local-cluster-img:latest -f docker/Dockerfile.local-cluster . -``` - -The `cluster up` command will also build the controller image (`local-cluster-controller:latest`) and custom PostgreSQL image (`postgres-custom:16`) automatically if needed. - -### 2. Build a Springtail package - -```shell -./cluster build-package /tmp/springtail-packages -``` - -This runs the full build and packaging process inside the base image and outputs a tarball named `springtail--.tar.gz` into the specified directory. - -### 3. Start the cluster - -```shell -./cluster up /tmp/springtail-packages/springtail--.tar.gz -``` - -Optionally disable SSL for inter-service connections: - -```shell -./cluster up /path/to/package.tar.gz --disable-ssl -``` - -Startup takes 1-2 minutes. The cluster will: -1. Start the mock AWS service (Moto), Redis, and primary PostgreSQL -2. Upload the package to mock S3 -3. Run the bootstrap container to configure secrets, Redis auth, and shared environment -4. Launch the proxy, ingestion, and FDW services - -### 4. Check cluster status - -```shell -./cluster status -``` - -### 5. Interact with services - -```shell -# Shell into a service -./cluster sh proxy -./cluster sh ingestion -./cluster sh fdw1 -./cluster sh controller - -# View logs -./cluster logs proxy -./cluster logs ingestion - -# Restart a service -./cluster restart proxy - -# List all services -./cluster ls -``` - -### 6. Host ports - -Services are exposed on the host at these ports: - -| Service | Host Port | -|----------------|-----------| -| Primary DB | 15432 | -| Redis | 16379 | -| Proxy | 55432 | -| FDW 1 | 45432 | -| FDW 2 | 45433 | -| AWS Mock (Moto)| 29999 | -| Controller API | 19824 | - -Connect to the proxy from the host with any PostgreSQL client: - -```shell -psql -h localhost -p 55432 -U postgres -``` - -### 7. Tear down the cluster - -```shell -# Stop Springtail services but keep the primary DB and dependencies -./cluster down - -# Stop a specific service -./cluster down proxy - -# Stop everything and remove all data volumes and networks -./cluster down all -``` - -## Quick Start: Production Deployment - -In a production deployment, each Springtail host runs a `coordinator` process that supervises the daemons for one service tier (`ingestion`, `fdw`, or `proxy`), installs binaries from S3, monitors component liveness via Redis, and reacts to lifecycle state changes (startup, reload, drain, shutdown) driven by the controller. - -For details on starting Springtail, adding replica (FDW) nodes, removing replica nodes, and stopping Springtail, see [`python/coordinator/README.md`](python/coordinator/README.md). +- **[Quickstart](https://docs.springtail.io/quickstart)** — the fastest way to get a local cluster running for testing. +- **[Local dev environment](https://docs.springtail.io/local-dev-environment)** — build Springtail and run the unit and integration tests in a development container. +- **[Local cluster deployment](https://docs.springtail.io/local-cluster-deployment)** — run a full multi-node deployment locally with Docker Compose. +- **[Production deployment](https://docs.springtail.io/deployment)** — deploy and manage Springtail in production with the [coordinator](python/coordinator/README.md). ## License diff --git a/wiki/client-session.mdx b/wiki/client-session.mdx index fe9da0c39..f2ff7f52a 100644 --- a/wiki/client-session.mdx +++ b/wiki/client-session.mdx @@ -29,30 +29,24 @@ The Client Session builds upon a base Session class which provides: ### Primary States -``` -┌─────────────┐ -│ STARTUP │ Initial state, handling authentication -└──────┬──────┘ - │ - ↓ -┌─────────────┐ -│ AUTH_SERVER │ Waiting for server authentication -└──────┬──────┘ - │ - ↓ -┌─────────────┐ -│ READY │ Idle, waiting for next query -└──────┬──────┘ - │ - ↓ -┌─────────────┐ -│ QUERY │ Processing query (delegated to server session) -└──────┬──────┘ - │ - ↓ -┌─────────────┐ -│ ERROR │ Fatal error, connection closing -└─────────────┘ +```mermaid +stateDiagram-v2 + [*] --> STARTUP + STARTUP --> AUTH_SERVER: client auth completes + AUTH_SERVER --> READY: server auth completes + READY --> QUERY: client sends query message + QUERY --> READY: server sends ReadyForQuery + STARTUP --> ERROR: fatal error / disconnect + AUTH_SERVER --> ERROR: fatal error / disconnect + READY --> ERROR: fatal error / disconnect + QUERY --> ERROR: fatal error / disconnect + ERROR --> [*] + + note right of STARTUP: Initial state, handling authentication + note right of AUTH_SERVER: Waiting for server authentication + note right of READY: Idle, waiting for next query + note right of QUERY: Processing query (delegated to server session) + note right of ERROR: Fatal error, connection closing ``` ### State Transitions diff --git a/wiki/extension-support.mdx b/wiki/extension-support.mdx index 2418ad8d2..5cfbd6cad 100644 --- a/wiki/extension-support.mdx +++ b/wiki/extension-support.mdx @@ -66,38 +66,21 @@ Initialization flow during database setup that loads extensions configured in `s ## Extension Loading Flow -``` -Application Startup - │ - ▼ -PgCopyTable::init_pg_extn_registry(db_id, xid) - → Reads extension config from system.json - → For each extension: - │ - ├─→ PgExtnRegistry::init_libraries() - │ ├─ dlopen() loads extension .so file - │ └─ Stores library handle in _library_map - │ - ├─→ _load_extn_types() - │ ├─ Queries pg_type for extension types - │ ├─ For each type: - │ │ ├─ dlsym() loads type I/O functions (typinput, typoutput, typreceive, typsend) - │ │ └─ PgExtnRegistry::add_type() - │ └─ Creates extension types in Server::create_usertype() - │ - ├─→ _load_extn_operators() - │ ├─ Queries pg_operator for extension operators - │ ├─ For each operator: - │ │ ├─ dlsym() loads operator implementation function - │ │ └─ PgExtnRegistry::add_operator() - │ └─ Stores operator_name → function_ptr mappings - │ - └─→ _load_extn_opclasses() - ├─ Queries pg_opclass for GIN/GIST opclasses - ├─ For each opclass method (compress, penalty, union, etc.): - │ ├─ dlsym() loads support function - │ └─ PgExtnRegistry::add_opclass() - └─ Stores (opclass_name, support_number) → method mappings +```mermaid +flowchart TD + Start["Application Startup"] + Init["PgCopyTable::init_pg_extn_registry(db_id, xid)
Reads extension config from system.json
For each extension:"] + + Libs["PgExtnRegistry::init_libraries()
- dlopen() loads extension .so file
- Stores library handle in _library_map"] + Types["_load_extn_types()
- Queries pg_type for extension types
- dlsym() loads type I/O functions (typinput, typoutput, typreceive, typsend)
- PgExtnRegistry::add_type()
- Creates extension types in Server::create_usertype()"] + Ops["_load_extn_operators()
- Queries pg_operator for extension operators
- dlsym() loads operator implementation function
- PgExtnRegistry::add_operator()
- Stores operator_name → function_ptr mappings"] + OpClasses["_load_extn_opclasses()
- Queries pg_opclass for GIN/GIST opclasses
- dlsym() loads support function (compress, penalty, union, etc.)
- PgExtnRegistry::add_opclass()
- Stores (opclass_name, support_number) → method mappings"] + + Start --> Init + Init --> Libs + Init --> Types + Init --> Ops + Init --> OpClasses ``` --- diff --git a/wiki/gin-index-support.mdx b/wiki/gin-index-support.mdx index 4855d8f82..c2ece1833 100644 --- a/wiki/gin-index-support.mdx +++ b/wiki/gin-index-support.mdx @@ -54,65 +54,39 @@ Currently, only `gin_trgm_ops` opclass is supported, enabling `LIKE` and `ILIKE` ### Index Building -``` -INDEX CREATION - │ - ▼ -Server::_create_index() - → _check_gin_index_columns() validates opclass - → _upsert_index_name() persists metadata - │ - ▼ -Indexer::_build_index() - → Detects INDEX_TYPE_GIN - → Calls _build_gin_index() - │ - ▼ -Indexer::_build_gin_index() - (builds index at the XID x1 in a separate thread in Indexer) - → create_gin_index_root() initializes BTree - → For each row, for each indexed column: - → extract_trgm_from_value() extracts trigrams - → Insert (position, token, row_id) into BTree - -RECONCILIATION - │ - ▼ - → committer triggers process_index_reconciliation() - → Calls _reconcile_index() - → Reconciles index using new mutations happed post XID x1 - → Insert (position, token, row_id) into BTree for inserts/updates - → Remove (position, token, row_id) from BTree for deletes - -INCREMENTAL MAINTENANCE - │ - ▼ -MutableTable::apply_mutation() - → index_mutation_handler() checks index type via _index_lookup - → For GIN: extracts trigrams, inserts/removes tuples - → For BTree: standard key-value operation +```mermaid +flowchart TD + subgraph Creation [INDEX CREATION] + direction TB + Create["Server::_create_index()
→ _check_gin_index_columns() validates opclass
→ _upsert_index_name() persists metadata"] + BuildIdx["Indexer::_build_index()
→ Detects INDEX_TYPE_GIN
→ Calls _build_gin_index()"] + BuildGin["Indexer::_build_gin_index()
(builds index at XID x1 in a separate Indexer thread)
→ create_gin_index_root() initializes BTree
→ For each row, for each indexed column:
→ extract_trgm_from_value() extracts trigrams
→ Insert (position, token, row_id) into BTree"] + Create --> BuildIdx --> BuildGin + end + + subgraph Recon [RECONCILIATION] + direction TB + ReconStep["committer triggers process_index_reconciliation()
→ Calls _reconcile_index()
→ Reconciles index using new mutations after XID x1
→ Insert (position, token, row_id) for inserts/updates
→ Remove (position, token, row_id) for deletes"] + end + + subgraph Maint [INCREMENTAL MAINTENANCE] + direction TB + Apply["MutableTable::apply_mutation<INSERT/DELETE>()
→ index_mutation_handler() checks index type via _index_lookup
→ For GIN: extracts trigrams, inserts/removes tuples
→ For BTree: standard key-value operation"] + end + + Creation --> Recon --> Maint ``` ### Index Scanning (in-progress) -``` -QUERY: SELECT * FROM t WHERE col LIKE '%pattern%' - │ - ▼ -FDW::_init_quals() - → Selects GIN index with matching column - │ - ▼ -FDW::_set_scan_iterators() - → extract_gin_keys_from_string() extracts query trigrams - → Table::begin(index_id, tokens) - │ - ▼ -Table::Iterator (GINSecondary) - → Iterates BTree entries matching tokens - → Deduplicates rows via _visited_internal_row_ids - → Resolves row location via look_aside_index - → Returns matching table rows +```mermaid +flowchart TD + Query["QUERY: SELECT * FROM t WHERE col LIKE '%pattern%'"] + InitQuals["FDW::_init_quals()
→ Selects GIN index with matching column"] + SetIter["FDW::_set_scan_iterators()
→ extract_gin_keys_from_string() extracts query trigrams
→ Table::begin(index_id, tokens)"] + Iter["Table::Iterator (GINSecondary)
→ Iterates BTree entries matching tokens
→ Deduplicates rows via _visited_internal_row_ids
→ Resolves row location via look_aside_index
→ Returns matching table rows"] + + Query --> InitQuals --> SetIter --> Iter ``` --- diff --git a/wiki/gist-index-support.mdx b/wiki/gist-index-support.mdx index b509137fe..2da6239ba 100644 --- a/wiki/gist-index-support.mdx +++ b/wiki/gist-index-support.mdx @@ -70,57 +70,27 @@ struct GistEntry { ### Index Building -``` -INDEX CREATION - │ - ▼ -Server::_create_index() - → Validates opclass compatibility - → _upsert_index_name() persists metadata - │ - ▼ -MutableTable::create_gist_index_root() - → Creates schema via create_gist_index_schema() - → Initializes MutableBTree with INDEX_TYPE_GIST - → Stores opclass names for each indexed column - │ - ▼ -Indexer::_build_index() - → Detects INDEX_TYPE_GIST - → Calls build logic (to be implemented) +```mermaid +flowchart TD + Create["INDEX CREATION"] + CreateIdx["Server::_create_index()
→ Validates opclass compatibility
→ _upsert_index_name() persists metadata"] + Root["MutableTable::create_gist_index_root()
→ Creates schema via create_gist_index_schema()
→ Initializes MutableBTree with INDEX_TYPE_GIST
→ Stores opclass names for each indexed column"] + BuildIdx["Indexer::_build_index()
→ Detects INDEX_TYPE_GIST
→ Calls build logic (to be implemented)"] + + Create --> CreateIdx --> Root --> BuildIdx ``` ### Index Insertion -``` -INSERT/UPDATE Operation - │ - ▼ -MutableBTree::insert() - → Detects INDEX_TYPE_GIST - → extract_gist_entry_from_tuple() - ├─ For each indexed column: - │ ├─ make_datum_from_field() converts Springtail field to Datum - │ └─ Invokes GIST_COMPRESS via opclass method - └─ Returns GistEntry with compressed keys - │ - ▼ -Page::insert_gist() - → Marks page as dirty - → Delegates to StorageCache::Page::insert_gist() - │ - ▼ -StorageCache::Page::gist_choose_subtree() - → For each child page (internal nodes): - ├─ read_branch_entry_from_row() extracts child predicate - ├─ compute_gist_penalty() calculates insertion cost - └─ Selects child with minimum penalty - → Returns iterator to chosen subtree - │ - ▼ -StorageCache::Page::insert_gist() - → Inserts tuple into chosen extent/subtree - → (Split logic and tree rebalancing: TBD) +```mermaid +flowchart TD + Op["INSERT/UPDATE Operation"] + Insert["MutableBTree::insert()
→ Detects INDEX_TYPE_GIST
→ extract_gist_entry_from_tuple()
— For each indexed column:
— make_datum_from_field() converts Springtail field to Datum
— Invokes GIST_COMPRESS via opclass method
→ Returns GistEntry with compressed keys"] + PageInsert["Page::insert_gist()
→ Marks page as dirty
→ Delegates to StorageCache::Page::insert_gist()"] + Choose["StorageCache::Page::gist_choose_subtree()
→ For each child page (internal nodes):
— read_branch_entry_from_row() extracts child predicate
— compute_gist_penalty() calculates insertion cost
— Selects child with minimum penalty
→ Returns iterator to chosen subtree"] + CacheInsert["StorageCache::Page::insert_gist()
→ Inserts tuple into chosen extent/subtree
→ (Split logic and tree rebalancing: TBD)"] + + Op --> Insert --> PageInsert --> Choose --> CacheInsert ``` ### Incremental Maintenance diff --git a/wiki/index-support.mdx b/wiki/index-support.mdx index 0e4e3ecfb..a6cac361c 100644 --- a/wiki/index-support.mdx +++ b/wiki/index-support.mdx @@ -41,48 +41,38 @@ enum class IndexStatus { ``` **State Transitions:** -``` - create_index drop_index - │ (index NOT in _work_set) - ▼ │ - ┌──────────┐ ▼ - │ BUILDING │ ┌──────────┐ - └────┬─────┘ │ DELETING │ - │ └────┬─────┘ - │ │ - ├──────────────────┐ │ - │ │ │ - │ drop_index │ build complete │ - │ (index IN │ │ - │ _work_set) │ │ - │ │ │ - ▼ ▼ │ - ┌──────────┐ ┌───────────────┐ │ - │ ABORTING │───►│ pending │◄────────────┘ - └──────────┘ │ reconciliation│ - └───────┬───────┘ - │ - ▼ - ┌───────────────┐ - │_reconcile_ │ - │ index() │ - └───────┬───────┘ - │ - ┌─────────────┼─────────────┐ - │ │ │ - ▼ ▼ ▼ - ┌───────────┐ ┌───────────┐ ┌───────────┐ - │ DELETING │ │ ABORTING │ │ BUILDING │ - │ ↓ │ │ ↓ │ │ ↓ │ - │ _drop() │ │_commit_ │ │_commit_ │ - │ │ │build() │ │build() │ - │ │ │(truncate) │ │(finalize) │ - └─────┬─────┘ └─────┬─────┘ └─────┬─────┘ - │ │ │ - ▼ ▼ ▼ - ┌─────────┐ ┌─────────┐ ┌───────┐ - │ DELETED │ │ DELETED │ │ READY │ - └─────────┘ └─────────┘ └───────┘ +```mermaid +flowchart TD + BUILDING(["BUILDING"]) + DELETING1(["DELETING"]) + ABORTING(["ABORTING"]) + Pending["pending reconciliation"] + Recon["_reconcile_index()"] + + DELETING2(["DELETING
_drop()"]) + ABORTING2(["ABORTING
_commit_build() (truncate)"]) + BUILDING2(["BUILDING
_commit_build() (finalize)"]) + + DELETED1(["DELETED"]) + DELETED2(["DELETED"]) + READY(["READY"]) + + Start1["create_index"] --> BUILDING + Start2["drop_index
(index NOT in _work_set)"] --> DELETING1 + + BUILDING -->|"drop_index (index IN _work_set)"| ABORTING + BUILDING -->|build complete| Pending + ABORTING --> Pending + DELETING1 --> Pending + + Pending --> Recon + Recon --> DELETING2 + Recon --> ABORTING2 + Recon --> BUILDING2 + + DELETING2 --> DELETED1 + ABORTING2 --> DELETED2 + BUILDING2 --> READY ``` **Notes:** @@ -154,276 +144,85 @@ using Key = std::pair; // (db_id, index_id) ### Index Creation Flow -``` -┌─────────────────────────────────────────────────────────────────────────────┐ -│ INDEX CREATION FLOW │ -└─────────────────────────────────────────────────────────────────────────────┘ - - Committer._commit_batch() - │ - │ Collects index requests across batch XIDs - │ Calls at final_xid - ▼ - process_requests(db_id, final_xid, combined_index_requests) - │ - │ Sets DDL counter = request count - │ Routes each request by action type - ▼ - ┌──────────────┐ - │ build() │ - └──────┬───────┘ - │ - │ 1. Check if index already READY → skip if so - │ 2. Add key to _table_idx_map (for abort tracking) - │ 3. Add IndexParams to _work_set - │ 4. Push key to _queue - │ 5. Register with RedisDDL - │ 6. Notify workers via _cv - ▼ - ┌──────────────────────┐ - │ Worker: task() │ ◄─── Worker thread picks up key from queue - └──────────┬───────────┘ - │ - │ Fetches IndexParams from _work_set - │ Calls _build() if status is BUILDING - ▼ - ┌──────────────────────┐ - │ _build() │ - │ │ - │ 1. Invalidate table │ - │ cache │ - │ 2. Create B-tree │ - │ root │ - │ 3. Build look-aside │ - │ (if first index) │ - │ 4. Scan all rows │ - │ 5. Insert into │ - │ B-tree │ - └──────────┬───────────┘ - │ - │ Returns IndexState with B-tree root - ▼ - ┌────────────────────────────┐ - │ _add_to_pending_ │ - │ reconciliation() │ - │ │ - │ 1. Add to pending map │ - │ 2. Decrement DDL counter │ - │ 3. If counter == 0: │ - │ Push IndexReconcile- │ - │ Request to queue │ - └────────────────────────────┘ - │ - │ Queue read by pg_log_mgr - │ (via IndexReconciliationQueueManager) - ▼ - ┌────────────────────────────┐ - │ pg_log_mgr enqueues │ - │ RECONCILE_INDEX message │ - │ for pg_log_reader │ - └──────────┬─────────────────┘ - │ - │ pg_log_reader pushes - │ msg to committer queue - │ - ▼ - ┌────────────────────────────┐ - │ Committer receives │ - │ RECONCILE_INDEX message │ - │ │ - │ Calls process_index_ │ - │ reconciliation() │ - └──────────┬─────────────────┘ - │ - ▼ - ┌────────────────────────────┐ - │ _reconcile_index() │ - │ │ - │ Catches up with changes │ - │ made during build: │ - │ - Invalidate old extents │ - │ - Populate new extents │ - └──────────┬─────────────────┘ - │ - ▼ - ┌────────────────────────────┐ - │ _commit_build() │ - │ │ - │ 1. Finalize B-tree │ - │ 2. Update table roots │ - │ 3. Set index state READY │ - │ 4. Cleanup tracking maps │ - └────────────────────────────┘ +```mermaid +flowchart TD + CB["Committer._commit_batch()
Collects index requests across batch XIDs, calls at final_xid"] + PR["process_requests(db_id, final_xid, combined_index_requests)
Sets DDL counter = request count, routes each request by action type"] + Build["build()
1. Skip if index already READY
2. Add key to _table_idx_map (abort tracking)
3. Add IndexParams to _work_set
4. Push key to _queue
5. Register with RedisDDL
6. Notify workers via _cv"] + Worker["Worker: task()
Picks up key from queue, fetches IndexParams from _work_set,
calls _build() if status is BUILDING"] + BuildFn["_build()
1. Invalidate table cache
2. Create B-tree root
3. Build look-aside (if first index)
4. Scan all rows
5. Insert into B-tree"] + AddPending["_add_to_pending_reconciliation()
1. Add to pending map
2. Decrement DDL counter
3. If counter == 0: push IndexReconcileRequest to queue"] + LogMgr["pg_log_mgr enqueues RECONCILE_INDEX message for pg_log_reader
(via IndexReconciliationQueueManager)"] + CommitterRecv["Committer receives RECONCILE_INDEX message
Calls process_index_reconciliation()"] + Recon["_reconcile_index()
Catches up with changes made during build:
- Invalidate old extents
- Populate new extents"] + Commit["_commit_build()
1. Finalize B-tree
2. Update table roots
3. Set index state READY
4. Cleanup tracking maps"] + + CB --> PR --> Build --> Worker + Worker --> BuildFn + BuildFn -->|Returns IndexState with B-tree root| AddPending + AddPending -->|Queue read by pg_log_mgr| LogMgr + LogMgr -->|pg_log_reader pushes msg to committer queue| CommitterRecv + CommitterRecv --> Recon --> Commit ``` ### Index Drop Flow -``` -┌─────────────────────────────────────────────────────────────────────────────┐ -│ INDEX DROP FLOW │ -└─────────────────────────────────────────────────────────────────────────────┘ - - process_requests() - │ - │ action == "drop_index" - ▼ - ┌──────────────┐ - │ drop() │ - └──────┬───────┘ - │ - ├───────────────────────────────────────┐ - │ │ - ▼ ▼ - ┌────────────────────┐ ┌────────────────────────┐ - │ Index NOT in │ │ Index IS in │ - │ _work_set │ │ _work_set (building) │ - │ │ │ │ - │ Fresh drop: │ │ Concurrent drop: │ - │ Index exists and │ │ Build in progress │ - │ not being built │ │ │ - └─────────┬──────────┘ └───────────┬────────────┘ - │ │ - ▼ ▼ - ┌────────────────────┐ ┌────────────────────────┐ - │ Create work item │ │ Mark existing item │ - │ with DELETING │ │ as ABORTING │ - │ status │ │ │ - │ │ │ Decrement DDL counter │ - │ Push to queue │ │ (no new work item) │ - │ Register RedisDDL │ │ │ - └─────────┬──────────┘ └───────────┬────────────┘ - │ │ - ▼ ▼ - ┌────────────────────┐ ┌────────────────────────┐ - │ Worker picks up │ │ _build() detects │ - │ │ │ ABORTING via │ - │ Since !BUILDING: │ │ _was_dropped() check │ - │ → Add to pending │ │ every 1000 rows │ - │ reconciliation │ │ │ - │ directly │ │ Returns early with │ - └─────────┬──────────┘ │ partial B-tree │ - │ └───────────┬────────────┘ - │ ..... Reconciliation pipe │ - | similar to | - | create_index flow ..... | - ▼ ▼ - ┌────────────────────┐ ┌────────────────────────┐ - │ _reconcile_index() │ │ _reconcile_index() │ - │ │ │ │ - │ Detects DELETING │ │ Detects ABORTING │ - │ → calls _drop() │ │ → calls _commit_build()│ - └─────────┬──────────┘ └───────────┬────────────┘ - │ │ - ▼ ▼ - ┌────────────────────┐ ┌────────────────────────┐ - │ _drop() │ │ _commit_build() │ - │ │ │ │ - │ 1. Truncate B-tree │ │ 1. Truncate B-tree │ - │ 2. Truncate look- │ │ 2. Truncate look-aside │ - │ aside if last │ │ if present │ - │ secondary index │ │ 3. Set state DELETED │ - │ 3. Set DELETED │ │ │ - │ 4. Update roots │ │ │ - └────────────────────┘ └────────────────────────┘ +```mermaid +flowchart TD + PR["process_requests()
action == 'drop_index'"] + Drop["drop()"] + + NotIn["Index NOT in _work_set
Fresh drop: index exists and not being built"] + IsIn["Index IS in _work_set (building)
Concurrent drop: build in progress"] + + CreateItem["Create work item with DELETING status
Push to queue, register RedisDDL"] + MarkAbort["Mark existing item as ABORTING
Decrement DDL counter (no new work item)"] + + WorkerPicks["Worker picks up
Since !BUILDING: add to pending reconciliation directly"] + BuildDetect["_build() detects ABORTING via _was_dropped() check every 1000 rows
Returns early with partial B-tree"] + + ReconDel["_reconcile_index()
Detects DELETING → calls _drop()"] + ReconAbort["_reconcile_index()
Detects ABORTING → calls _commit_build()"] + + DropFn["_drop()
1. Truncate B-tree
2. Truncate look-aside if last secondary index
3. Set DELETED
4. Update roots"] + CommitFn["_commit_build()
1. Truncate B-tree
2. Truncate look-aside if present
3. Set state DELETED"] + + PR --> Drop + Drop --> NotIn + Drop --> IsIn + NotIn --> CreateItem --> WorkerPicks + IsIn --> MarkAbort --> BuildDetect + WorkerPicks -.->|Reconciliation pipe similar to create_index flow| ReconDel + BuildDetect -.->|Reconciliation pipe similar to create_index flow| ReconAbort + ReconDel --> DropFn + ReconAbort --> CommitFn ``` ### Index Recovery Flow -``` -┌─────────────────────────────────────────────────────────────────────────────┐ -│ INDEX RECOVERY FLOW │ -└─────────────────────────────────────────────────────────────────────────────┘ - - Committer receives INDEX_RECOVERY_TRIGGER - (pg_log_mgr pushes this message when the db gets added) - │ - ▼ - recover_indexes(db_id) - │ - ▼ - ┌──────────────────────┐ - │ _cleanup_for_db() │ - │ │ - │ Clears stale data: │ - │ - _work_set entries │ - │ - Pending recon map │ - │ - Table-index map │ - └──────────┬───────────┘ - │ - ▼ - ┌────────────────────────────────────┐ - │ sys_tbl_mgr::Server:: │ - │ get_unfinished_indexes_info() │ - │ │ - │ Returns indexes in states: │ - │ - NOT_READY (incomplete builds) │ - │ - BEING_DELETED (incomplete drops) │ - └──────────┬─────────────────────────┘ - │ - ▼ - ┌────────────────────────────────────┐ - │ For each XID with unfinished │ - │ indexes: │ - │ │ - │ Build IndexProcessRequest list: │ - │ - NOT_READY → action="create_index│ - │ - BEING_DELETED → action="drop_ │ - │ index" │ - └──────────┬─────────────────────────┘ - │ - ▼ - ┌────────────────────────────────────┐ - │ process_requests() │ - │ │ - │ Schedules recovery work through │ - │ normal build/drop paths │ - └────────────────────────────────────┘ +```mermaid +flowchart TD + Trigger["Committer receives INDEX_RECOVERY_TRIGGER
(pg_log_mgr pushes this message when the db gets added)"] + Recover["recover_indexes(db_id)"] + Cleanup["_cleanup_for_db()
Clears stale data:
- _work_set entries
- Pending recon map
- Table-index map"] + GetUnfinished["sys_tbl_mgr::Server::get_unfinished_indexes_info()
Returns indexes in states:
- NOT_READY (incomplete builds)
- BEING_DELETED (incomplete drops)"] + ForEach["For each XID with unfinished indexes:
Build IndexProcessRequest list:
- NOT_READY → action='create_index'
- BEING_DELETED → action='drop_index'"] + PR["process_requests()
Schedules recovery work through normal build/drop paths"] + + Trigger --> Recover --> Cleanup --> GetUnfinished --> ForEach --> PR ``` ### Abort Indexes Flow (Table Resync) -``` -┌─────────────────────────────────────────────────────────────────────────────┐ -│ ABORT INDEXES FLOW │ -│ (During Table Resync) │ -└─────────────────────────────────────────────────────────────────────────────┘ - - pg_log_reader pushes abort_index message into - committer queue for the table - as part of table_resync processing - │ - │ - ▼ - committer calls process_requests() - │ - │ action == "abort_index" - ▼ - ┌──────────────────────────────┐ - │ abort_indexes() │ - │ (db_id, table_id, xid) │ - └──────────┬───────────────────┘ - │ - ▼ - ┌──────────────────────────────┐ - │ 1. Decrement DDL counter │ - │ │ - │ 2. Find all index keys for │ - │ (db_id, table_id) in │ - │ _table_idx_map │ - │ │ - │ 3. For each key: │ - │ Set work_item._status │ - │ = ABORTING │ - └──────────────────────────────┘ - │ - ▼ - ┌──────────────────────────────┐ - │ Workers detect ABORTING │ - │ status during build via │ - │ _was_dropped() checks │ - │ │ - │ → Clean up and mark DELETED │ - └──────────────────────────────┘ +```mermaid +flowchart TD + Push["pg_log_reader pushes abort_index message into
committer queue for the table as part of table_resync processing"] + PR["committer calls process_requests()
action == 'abort_index'"] + Abort["abort_indexes()(db_id, table_id, xid)"] + Steps["1. Decrement DDL counter
2. Find all index keys for (db_id, table_id) in _table_idx_map
3. For each key: set work_item._status = ABORTING"] + Workers["Workers detect ABORTING status during build
via _was_dropped() checks
→ Clean up and mark DELETED"] + + Push --> PR --> Abort --> Steps --> Workers ``` --- diff --git a/wiki/shared-memory-caches.mdx b/wiki/shared-memory-caches.mdx index cb85eb6b3..ad7ca3e76 100644 --- a/wiki/shared-memory-caches.mdx +++ b/wiki/shared-memory-caches.mdx @@ -20,35 +20,25 @@ The Shared Memory Cache Subsystem enables efficient inter-process communication ### High-Level Data Flow -``` -┌─────────────────────────────────────┐ -│ XID Manager (xid_mgr_daemon) │ -│ - Manages transaction commitments │ -└────────────────┬────────────────────┘ - │ gRPC: XidPushResponse - ↓ -┌─────────────────────────────────────────────────────────────┐ -│ PgXidSubscriberMgr (daemon, separate thread) │ -│ - Receives XID notifications │ -│ - Creates 5 shared memory caches │ -│ - Populates caches on transaction commit │ -│ - Worker threads fetch metadata proactively │ -└───┬─────────────────────────────────────────────────────────┘ - │ Shared Memory IPC - ├─ SHM_CACHE_ROOTS (table metadata) - ├─ SHM_CACHE_SCHEMAS (table schemas) - ├─ SHM_CACHE_USERTYPES (user-defined types) - ├─ SHM_CACHE_TABLE_IDS (pending mutations) - └─ SHM_CACHE_EXTENTS (mutation records) - │ - ↓ (Opens and reads) -┌──────────────────────────────┐ -│ PgFdwMgr (FDW process) │ -│ - Runs in PostgreSQL │ -│ - Opens existing caches │ -│ - Queries pending XIDs │ -│ - Applies mutations │ -└──────────────────────────────┘ +```mermaid +flowchart TD + XID["XID Manager (xid_mgr_daemon)
Manages transaction commitments"] + Sub["PgXidSubscriberMgr (daemon, separate thread)
- Receives XID notifications
- Creates 5 shared memory caches
- Populates caches on transaction commit
- Worker threads fetch metadata proactively"] + + subgraph SHM [Shared Memory IPC] + direction TB + Roots["SHM_CACHE_ROOTS (table metadata)"] + Schemas["SHM_CACHE_SCHEMAS (table schemas)"] + UserTypes["SHM_CACHE_USERTYPES (user-defined types)"] + TableIds["SHM_CACHE_TABLE_IDS (pending mutations)"] + Extents["SHM_CACHE_EXTENTS (mutation records)"] + end + + FDW["PgFdwMgr (FDW process)
- Runs in PostgreSQL
- Opens existing caches
- Queries pending XIDs
- Applies mutations"] + + XID -->|"gRPC: XidPushResponse"| Sub + Sub --> SHM + SHM -->|opens and reads| FDW ``` ### Components diff --git a/wiki/syncing-tables.mdx b/wiki/syncing-tables.mdx index 2c42edd34..e71ff7bf5 100644 --- a/wiki/syncing-tables.mdx +++ b/wiki/syncing-tables.mdx @@ -353,39 +353,28 @@ The stall mechanism uses **inter-thread coordination** via state synchronizer an **Detailed Stall Sequence:** -``` -PgLogMgr Copy Thread PgLogMgr Reader Thread Committer -───────────────────── ────────────────────── ───────── -1. Detect sync request - (from Redis queue) - -2. State transition: - RUNNING → SYNC_STALL - -3. SyncTracker::block_commits() - ├─ Push TABLE_SYNC_START ──────────────────────────────────────→ Blocks commits - │ to committer queue - └─ Returns immediately - -4. Push STALL to logger queue ──→ 5. Pop STALL message - 6. Set state: SYNCING - 7. Block in wait loop: - _internal_state.wait_for_state( - {STATE_REPLAYING, STATE_RUNNING}) - -8. Detect state == SYNCING - (stall acknowledged) - -9. Execute table copies: - - PgCopyTable::copy_tables() - - Workers capture snapshots - - Workers mark_inflight() - - Workers emit sync messages - -10. Set state: REPLAYING ──→ 11. Detect state change - 12. Exit wait loop - 13. Set state: RUNNING - 14. Resume processing logs +```mermaid +sequenceDiagram + participant Copy as PgLogMgr Copy Thread + participant Reader as PgLogMgr Reader Thread + participant Committer + + Note over Copy: 1. Detect sync request (from Redis queue) + Note over Copy: 2. State: RUNNING → SYNC_STALL + Copy->>Committer: 3. block_commits(): push TABLE_SYNC_START to committer queue + Note over Committer: Blocks commits + Note over Copy: block_commits() returns immediately + Copy->>Reader: 4. Push STALL to logger queue + Note over Reader: 5. Pop STALL message + Note over Reader: 6. Set state: SYNCING + Note over Reader: 7. Block in wait loop:
wait_for_state({REPLAYING, RUNNING}) + Note over Copy: 8. Detect state == SYNCING (stall acknowledged) + Note over Copy: 9. Execute table copies:
copy_tables(), capture snapshots,
mark_inflight(), emit sync messages + Copy->>Reader: 10. Set state: REPLAYING + Note over Reader: 11. Detect state change + Note over Reader: 12. Exit wait loop + Note over Reader: 13. Set state: RUNNING + Note over Reader: 14. Resume processing logs ``` **Why This Works:** @@ -495,31 +484,26 @@ PgCopyResult { **XID Flow Through System:** -``` -PostgreSQL PgCopyTable SyncTracker PgLogReader -────────── ─────────── ─────────── ─────────── -pg_xid = → Capture snapshot → mark_inflight() - target_xid = - Store xmin/xmax - pg_xid = - Store xips - - Store schema - - → Emit sync message → → Process mutations: - {target_xid: T, if (pg_xid < xmax && - pg_xid: X} !in xips): - skip mutation - - → → add_sync() → Check if X committed - - Move to - _sync_map[X] - - → check_commit(X) → If commit seen at X: - - Return return SwapRequest - SwapRequest - - → → Assign new XID - swap_sync_table(S) - - → clear_tables() +```mermaid +sequenceDiagram + participant PG as PostgreSQL + participant Copy as PgCopyTable + participant ST as SyncTracker + participant Reader as PgLogReader + + PG->>Copy: pg_xid = X + Note over Copy: Capture snapshot
(target_xid = T, pg_xid = X) + Copy->>ST: mark_inflight() + Note over ST: Store xmin/xmax, xips, schema + Copy->>Reader: Emit sync message {target_xid: T, pg_xid: X} + Note over Reader: Process mutations:
if (pg_xid < xmax && !in xips): skip mutation + Reader->>ST: add_sync() + Note over ST: Move to _sync_map[X] + Note over Reader: Check if X committed + Reader->>ST: check_commit(X) + ST-->>Reader: SwapRequest (if commit seen at X) + Note over Reader: Assign new XID S
swap_sync_table(S) + Reader->>ST: clear_tables() ``` #### Snapshot Visibility Rules @@ -815,130 +799,65 @@ This prevents double-application of transactions during recovery. ### Full Sync Lifecycle -``` -Time Copy Thread Logger Queue Reader Thread SyncTracker PostgreSQL -──── ─────────── ──────────── ───────────── ─────────── ────────── -T0 Wait on Redis - sync request - -T1 Receive request - table_oids=[100] - -T2 State: - RUNNING→SYNC_STALL - -T3 block_commits() ────────────────────────────────────────→ _table_map[100] ←─ Lock table - = resync -T4 Push STALL ────→ [STALL] - │ -T5 └──────────→ Pop STALL - State: SYNCING - Enter wait loop - -T6 Detect SYNCING - -T7 xid=5000 - copy_tables(xid) ───────────────────────────────────────────────────────────→ pg_current_xact_id() - = 2000 - pg_current_snapshot() - = "1990:2000:1995" - -T8 Worker captures ─────────────────────────────────────────→ mark_inflight( - snapshot table=100, - pg_xid=2000 xid=5000, - xmin=1990 xmin=1990, - xmax=2000 xmax=2000, - xips=[1995] xips=[1995]) - - _inflight_map[100] - = Inflight{2000, - 2000, - [1995]} - -T9 Worker executes ────────────────────────────────────────────────────────────→ COPY table TO STDOUT - COPY Returns 1M rows - -T10 Worker inserts - to snapshot_table - -T11 Worker emits ────────────────────────────────────────────────────────────────→ pg_logical_emit_message( - sync message 'table_sync', - '{"target_xid":5000, - "pg_xid":2000}') - - Message enters ←────────────────────────────────────────────────────────────── Replication stream - replication log - -T12 All workers - complete - -T13 State: - SYNCING→REPLAYING - -T14 Detect REPLAYING - Exit wait loop - State: RUNNING - Resume processing - -T15 Process queued - replication logs - -T16 [BEGIN 1996] ──→ _process_begin(1996) - Create batch - -T17 [INSERT t100] ─→ add_mutation(t100) - pg_xid=1996 should_skip(100,1996)? - ├─ 1996 < 2000 ✓ - └─ 1996 in [1995]? ✗ - → SKIP ✓ - -T18 [COMMIT 1996] ─→ _process_commit(1996) - check_commit(1996)? - → No sync at 1996 - -T19 [BEGIN 1995] ──→ _process_begin(1995) - -T20 [INSERT t100] ─→ add_mutation(t100) - pg_xid=1995 should_skip(100,1995)? - ├─ 1995 < 2000 ✓ - └─ 1995 in [1995]? ✓ - → APPLY ✓ - -T21 [COMMIT 1995] ─→ _process_commit(1995) - check_commit(1995)? - → No sync at 1995 - -T22 [BEGIN 2001] ──→ _process_begin(2001) - -T23 [INSERT t100] ─→ add_mutation(t100) - pg_xid=2001 should_skip(100,2001)? - ├─ 2001 >= 2000 ✓ - → APPLY ✓ - -T24 [TABLE_SYNC] ──→ Process COPY_SYNC ──→ add_sync( - msg from T11 table=100, - pg_xid=2000, - xmin=1990, - xmax=2000, - xips=[1995]) - - _inflight_map.erase(100) - _sync_map[2000] = XidRecord - _table_map[100] = XidRecord - -T25 [COMMIT 2000] ─→ _process_commit(2000) - check_commit(2000)? ─→ YES! Found sync - at pg_xid=2000 - - clear_tables() ─────→ _sync_map.erase(2000) - - xid=5001 - swap_sync_table(5001) - - Notify committer ───→ SWAP table 100@5001 - -T26 ─→ Table 100 now - visible to queries +```mermaid +sequenceDiagram + participant Copy as Copy Thread + participant LQ as Logger Queue + participant Reader as Reader Thread + participant ST as SyncTracker + participant PG as PostgreSQL + + Note over Copy: T0 Wait on Redis sync request + Note over Copy: T1 Receive request, table_oids=[100] + Note over Copy: T2 State: RUNNING → SYNC_STALL + Copy->>ST: T3 block_commits() + Note over ST: _table_map[100] = resync (lock table) + Copy->>LQ: T4 Push STALL + LQ->>Reader: T5 Pop STALL + Note over Reader: State: SYNCING, enter wait loop + Note over Copy: T6 Detect SYNCING + + Copy->>PG: T7 copy_tables(xid=5000) + PG-->>Copy: pg_current_xact_id()=2000
snapshot "1990:2000:1995" + Copy->>ST: T8 mark_inflight(table=100, xid=5000,
xmin=1990, xmax=2000, xips=[1995]) + Note over ST: _inflight_map[100] = Inflight{2000, 2000, [1995]} + Copy->>PG: T9 COPY table TO STDOUT + PG-->>Copy: returns 1M rows + Note over Copy: T10 Worker inserts to snapshot_table + Copy->>PG: T11 pg_logical_emit_message('table_sync',
{target_xid:5000, pg_xid:2000}) + Note over Copy: T12 All workers complete + Note over Copy: T13 State: SYNCING → REPLAYING + + Note over Reader: T14 Detect REPLAYING, exit wait loop,
State: RUNNING, resume processing + Note over Reader: T15 Process queued replication logs + + LQ->>Reader: T16 [BEGIN 1996] + Note over Reader: _process_begin(1996), create batch + LQ->>Reader: T17 [INSERT t100] pg_xid=1996 + Note over Reader: should_skip? 1996 < 2000 ✓,
1996 in [1995]? ✗ → SKIP + LQ->>Reader: T18 [COMMIT 1996] + Note over Reader: check_commit(1996)? → No sync at 1996 + LQ->>Reader: T19 [BEGIN 1995] + LQ->>Reader: T20 [INSERT t100] pg_xid=1995 + Note over Reader: should_skip? 1995 < 2000 ✓,
1995 in [1995]? ✓ → APPLY + LQ->>Reader: T21 [COMMIT 1995] + Note over Reader: check_commit(1995)? → No sync at 1995 + LQ->>Reader: T22 [BEGIN 2001] + LQ->>Reader: T23 [INSERT t100] pg_xid=2001 + Note over Reader: should_skip? 2001 >= 2000 ✓ → APPLY + + LQ->>Reader: T24 [TABLE_SYNC] (msg from T11) + Reader->>ST: add_sync(table=100, pg_xid=2000,
xmin=1990, xmax=2000, xips=[1995]) + Note over ST: _inflight_map.erase(100)
_sync_map[2000] = XidRecord
_table_map[100] = XidRecord + + LQ->>Reader: T25 [COMMIT 2000] + Reader->>ST: check_commit(2000)? + ST-->>Reader: YES! Found sync at pg_xid=2000 + Reader->>ST: clear_tables() + Note over ST: _sync_map.erase(2000) + Note over Reader: xid=5001, swap_sync_table(5001) + Reader->>PG: Notify committer → SWAP table 100@5001 + Note over PG: T26 Table 100 now visible to queries ``` ## Performance Considerations diff --git a/wiki/system-metadata.mdx b/wiki/system-metadata.mdx index b46c38e4b..1ab4e4c27 100644 --- a/wiki/system-metadata.mdx +++ b/wiki/system-metadata.mdx @@ -4,17 +4,6 @@ title: "System Metadata" # Schema Management Architecture -## Table of Contents -1. [Overview](#overview) -2. [System Tables](#system-tables) -3. [Schema Information Management](#schema-information-management) -4. [Caching Architecture](#caching-architecture) -5. [System Table Manager](#system-table-manager) -6. [Data Flow](#data-flow) -7. [Key Design Patterns](#key-design-patterns) - ---- - ## Overview The Springtail schema management system provides versioned metadata tracking for all database objects including tables, schemas, indexes, and user-defined types. The architecture is designed to support: @@ -651,232 +640,98 @@ Allows client to opt-in to specific SHM caches. ### Schema Read Flow (FDW → Server) -``` -┌─────────────────────────────────────────────────────────────┐ -│ FDW (PostgreSQL Foreign Data Wrapper) │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Access table schema - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ TableMgrClient (client-side) │ -│ get_schema(db_id, table_id, xid) │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Check SchemaCache (in-process) - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ SchemaCache::get() │ -│ - Cache hit? Return constructed SchemaMetadata │ -│ - Cache miss? Call populate function │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Cache miss - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Client::get_schema() - Check SHM Cache │ -│ - ShmCache hit? Deserialize protobuf, return │ -│ - ShmCache miss? Issue gRPC call │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ gRPC: GetSchemaRequest - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ sys_tbl_mgr::Service (server-side gRPC handler) │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Route to Server - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Server::get_schema(db_id, table_id, xid) │ -│ - Acquire read lock (_read_mutex) │ -│ - Check uncommitted caches first │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Not in uncommitted caches - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Read from System Tables (via SystemTableMgr) │ -│ - Read Schemas table: columns at XID │ -│ - Read Indexes table: indexes at XID │ -│ - Read IndexNames table: index metadata │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Construct SchemaMetadata - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ SchemaMetadata object constructed │ -│ - columns: vector │ -│ - indexes: vector │ -│ - access_range: XID validity range │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Serialize to protobuf - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ proto::GetSchemaResponse │ -│ - Serialized schema columns │ -│ - Serialized indexes │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ gRPC response - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Client receives response │ -│ - Store in SHM cache (for other processes) │ -│ - Store in SchemaCache (for local reuse) │ -│ - Return SchemaMetadataPtr to caller │ -└─────────────────────────────────────────────────────────────┘ +```mermaid +flowchart TD + FDW["FDW (PostgreSQL Foreign Data Wrapper)"] + Client1["TableMgrClient.get_schema(db_id, table_id, xid)"] + SC{"SchemaCache.get()
(in-process)"} + SHM{"Client.get_schema()
check SHM cache"} + GRPC["gRPC: GetSchemaRequest"] + Svc["sys_tbl_mgr::Service
(server-side gRPC handler)"] + Srv["Server.get_schema()
read lock, check uncommitted caches"] + Disk["Read system tables
Schemas / Indexes / IndexNames"] + Build["Construct SchemaMetadata
(columns, indexes, access_range)"] + Ser["Serialize to proto::GetSchemaResponse"] + Ret["Client stores in SHM + SchemaCache,
returns SchemaMetadataPtr"] + + FDW --> Client1 --> SC + SC -->|hit| Ret + SC -->|miss| SHM + SHM -->|"hit (deserialize)"| Ret + SHM -->|miss| GRPC --> Svc --> Srv + Srv -->|hit in uncommitted| Ser + Srv -->|miss| Disk --> Build --> Ser --> Ret ``` ### Table Roots Read Flow -``` -FDW - ↓ -TableMgrClient::get_roots(db_id, table_id, xid) - ↓ -Check SHM cache (springtail.roots) - ↓ (miss) -gRPC: GetRootsRequest - ↓ -Server::get_roots() - ↓ -Check _roots_cache (uncommitted) - ↓ (miss) -Read TableRoots system table - ↓ -Read TableStats system table - ↓ -Construct TableMetadata - ↓ -Serialize to proto::GetRootsResponse - ↓ -Return via gRPC - ↓ -Client stores in SHM cache - ↓ -Return TableMetadataPtr +```mermaid +flowchart TD + FDW["FDW"] + C["TableMgrClient.get_roots(db_id, table_id, xid)"] + SHM{"Check SHM cache
(springtail.roots)"} + GRPC["gRPC: GetRootsRequest"] + Srv["Server.get_roots()"] + UC{"Check _roots_cache
(uncommitted)"} + RR["Read TableRoots system table"] + RS["Read TableStats system table"] + Build["Construct TableMetadata"] + Ser["Serialize to proto::GetRootsResponse"] + Ret["Client stores in SHM cache,
returns TableMetadataPtr"] + + FDW --> C --> SHM + SHM -->|hit| Ret + SHM -->|miss| GRPC --> Srv --> UC + UC -->|hit| Ser + UC -->|miss| RR --> RS --> Build --> Ser --> Ret ``` ### DDL Write Flow (CREATE TABLE) -``` -┌─────────────────────────────────────────────────────────────┐ -│ PostgreSQL Event Trigger │ -│ - Captures CREATE TABLE event │ -│ - Sends PgMsgTable via replication stream │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Replication message - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Server::create_table(db_id, xid, msg) │ -│ - Acquire write lock (_write_mutex) │ -│ - Assign new table_id │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Populate uncommitted caches - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ _table_cache[db][table_id][xid] = TableCacheRecord │ -│ - name, namespace_id, exists=true, etc. │ -└─────────────────────────────────────────────────────────────┘ - │ -┌─────────────────────────────────────────────────────────────┐ -│ _schema_cache[db][table_id][col_id] = vector │ -│ - For each column in the table │ -└─────────────────────────────────────────────────────────────┘ - │ -┌─────────────────────────────────────────────────────────────┐ -│ _index_cache[db][table_id][PRIMARY_INDEX] = IndexCacheItem │ -│ - Primary index definition │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Generate DDL JSON for DDL manager - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Return DDL JSON string │ -│ - DDL manager will track and apply on commit │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Transaction commits... - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Server::finalize(db_id, xid, call_sync=true) │ -│ - Acquire unique lock on _write_mutex │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Persist to disk - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Write to System Tables │ -│ - TableNames: Insert (table_id, name, xid, exists=true) │ -│ - Schemas: Insert (table_id, col, xid) for each column │ -│ - Indexes: Insert (table_id, PRIMARY_INDEX, xid) │ -│ - IndexNames: Insert (PRIMARY_INDEX, state=READY) │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Sync to disk - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ SystemTable::sync() for each modified system table │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Update SHM caches - ▼ -┌──────────────────────────────────────────────────────────────────┐ -│ ShmCache::update_committed_xid(db, xid, has_schema_changes=true) │ -│ - Records schema change at this XID │ -│ - Updates committed XID timestamp │ -└────────────────────┬─────────────────────────────────────────────┘ - │ - │ Clear uncommitted caches - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ _table_cache.clear(db) │ -│ _schema_cache.clear(db) │ -│ _index_cache.clear(db) │ -│ ... (but NOT _table_existence_cache) │ -└─────────────────────────────────────────────────────────────┘ +```mermaid +flowchart TD + ET["PostgreSQL Event Trigger
captures CREATE TABLE, sends PgMsgTable"] + CT["Server.create_table(db_id, xid, msg)
write lock, assign table_id"] + + subgraph UnCache [Populate uncommitted caches] + direction TB + TC["_table_cache[db][table_id][xid]"] + SCH["_schema_cache[db][table_id][col_id]"] + IDX["_index_cache[db][table_id][PRIMARY_INDEX]"] + end + + DDL["Return DDL JSON for DDL manager"] + FIN["Server.finalize(db_id, xid, call_sync=true)
unique write lock"] + + subgraph Persist [Write to system tables] + direction TB + TN["TableNames"] + SchT["Schemas"] + IdxT["Indexes"] + InT["IndexNames"] + end + + Sync["SystemTable.sync() per modified table"] + Upd["ShmCache.update_committed_xid(has_schema_changes=true)"] + Clr["Clear uncommitted caches
(except _table_existence_cache)"] + + ET --> CT --> UnCache --> DDL --> FIN --> Persist --> Sync --> Upd --> Clr ``` ### Cache Invalidation Flow When DDL changes occur, caches must be invalidated: -``` -┌─────────────────────────────────────────────────────────────┐ -│ DDL Operation Committed (e.g., ALTER TABLE ADD COLUMN) │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ DDL manager notifies FDW - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ FDW detects schema change at new XID │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Invalidate local cache - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ Client::invalidate_table(db_id, table_id, xid) │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ Propagate to SchemaCache - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ SchemaCache::invalidate_table(db, tid, xid) │ -│ - Marks schema entry as ending at xid │ -│ - Future access beyond xid triggers refetch │ -└────────────────────┬────────────────────────────────────────┘ - │ - │ SHM cache already updated by Server during finalize - ▼ -┌─────────────────────────────────────────────────────────────┐ -│ ShmCache has new schema version cached │ -│ - Next get_schema(xid > old_xid) will hit SHM cache │ -│ - Or fetch from server if not in SHM │ -└─────────────────────────────────────────────────────────────┘ +```mermaid +flowchart TD + DDL["DDL operation committed
(e.g. ALTER TABLE ADD COLUMN)"] + Notify["DDL manager notifies FDW"] + Detect["FDW detects schema change at new XID"] + Inv["Client.invalidate_table(db_id, table_id, xid)"] + SCInv["SchemaCache.invalidate_table(db, tid, xid)
marks entry ending at xid; next access refetches"] + SHM["SHM cache already updated by Server during finalize
next get_schema(xid > old) hits SHM or server"] + + DDL --> Notify --> Detect --> Inv --> SCInv --> SHM ``` **Key Points:** diff --git a/wiki/vacuumer.mdx b/wiki/vacuumer.mdx index bcf293b4d..468f21565 100644 --- a/wiki/vacuumer.mdx +++ b/wiki/vacuumer.mdx @@ -45,49 +45,16 @@ title: "Vacuumer" 4. VACUUM RUN (background thread, every 1 second) ``` -``` - _do_vacuum_run() - │ - ▼ - ┌─────────────────────────────────────────────┐ - │ Flush in-memory expired entries to global │ - │ vacuum file if count exceeds threshold │ - └─────────────────────────────────────────────┘ - │ - ▼ - ┌─────────────────────────────────────────────┐ - │ Read expired extents from global vacuum │ - │ file │ - └─────────────────────────────────────────────┘ - │ - ▼ - ┌─────────────────────────────────────────────┐ - │ For each file with expired extents: │ - │ │ - │ 1. Merge current expired extents with │ - │ leftover partials from previous runs │ - │ │ - │ 2. Align extent boundaries to multiples │ - │ of filesystem block size (4KB) │ - │ │ - │ 3. _hole_punch_file() -> fallocate() │ - │ to reclaim aligned blocks │ - │ │ - │ 4. Save unaligned remainders as partials │ - │ for future coalescing │ - └─────────────────────────────────────────────┘ - │ - ▼ - ┌─────────────────────────────────────────────┐ - │ Delete expired snapshot directories │ - │ (dropped tables/indexes) │ - └─────────────────────────────────────────────┘ - │ - ▼ - ┌─────────────────────────────────────────────┐ - │ Rotate/truncate global vacuum file │ - │ (clear processed, keep unprocessed) │ - └─────────────────────────────────────────────┘ +```mermaid +flowchart TD + Start["_do_vacuum_run()"] + Flush["Flush in-memory expired entries to global
vacuum file if count exceeds threshold"] + ReadE["Read expired extents from
global vacuum file"] + PerFile["For each file with expired extents:
1. Merge current expired extents with leftover partials
2. Align extent boundaries to filesystem block size (4KB)
3. _hole_punch_file() → fallocate() to reclaim aligned blocks
4. Save unaligned remainders as partials for future coalescing"] + DelSnap["Delete expired snapshot directories
(dropped tables/indexes)"] + Rotate["Rotate/truncate global vacuum file
(clear processed, keep unprocessed)"] + + Start --> Flush --> ReadE --> PerFile --> DelSnap --> Rotate ``` ---