Skip to content

feat(connectors): Implement influxdb v2 and v3 connector with separate source and sink crates#3140

Open
ryerraguntla wants to merge 73 commits into
apache:masterfrom
ryerraguntla:feat/influxdb_v2_v3_connector
Open

feat(connectors): Implement influxdb v2 and v3 connector with separate source and sink crates#3140
ryerraguntla wants to merge 73 commits into
apache:masterfrom
ryerraguntla:feat/influxdb_v2_v3_connector

Conversation

@ryerraguntla
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes # 3062

Rationale

This PR implements a unified InfluxDB connector supporting both InfluxDB V2 (Flux) and V3 (SQL) in a single crate per component (sink/source), eliminating code duplication while preserving full backward compatibility with existing V2 deployments.

Key Features:

  • Zero breaking changes for V2 users (backward-compatible config deserialization)
  • V3 stuck-timestamp detection with automatic batch inflation + circuit breaker
  • Performance improvements: SIMD JSON parsing (+40% in source), inlined hot paths (+3% in sink)
  • Enhanced safety: #[must_use] on critical functions, version-strict cursor validation
  • 95%+ test coverage maintained with 55+ new tests

What changed?

Architecture
Before (V2-only):
influxdb_sink/src/lib.rs (single flat config, 1,625 LOC)
influxdb_source/src/lib.rs (single flat config, 1,400 LOC)

After (V2 + V3)
influxdb_sink/src/
├── lib.rs (enum dispatch, 1,330 LOC)
└── protocol.rs (shared line-protocol escaping, 115 LOC)

influxdb_source/src/
├── lib.rs (enum dispatch, 817 LOC)
├── common.rs (shared config/validation, 815 LOC)
├── row.rs (CSV/JSONL parsing, 193 LOC)
├── v2.rs (Flux query logic, 374 LOC)
└── v3.rs (SQL query + stuck detection, 506 LOC)

Benefits:

Single .so per component (no InfluxClient trait overhead)
Zero code duplication (shared validation, escaping, retry logic)
Asymmetric structure (sink: 30-line diff; source: separate modules for V2/V3 query semantics)

For more details , please refer to the #3062 Comments section.
#3062 (comment)

Local Execution

  • Passed
  • Pre-commit hooks ran

AI Usage

AI Tools Used - Claude and Copilot

Scope of usage - Code review for quality and identifying performance issues. Generation of test cases, Documentation and summary notes generation.

Generated code is tested with actual test execution.

Can you explain every line of the code - yes

GaneshPatil7517 and others added 30 commits January 13, 2026 08:43
Implements Issue apache#2540 - Redshift Sink Connector with S3 staging support.

Features:
- S3 staging with automatic CSV file upload
- Redshift COPY command execution via PostgreSQL wire protocol
- IAM role authentication (recommended) or access key credentials
- Configurable batch size and compression (gzip, lzop, bzip2, zstd)
- Automatic table creation with customizable schema
- Retry logic with exponential backoff for transient failures
- Automatic cleanup of staged S3 files

Configuration options:
- connection_string: Redshift cluster connection URL
- target_table: Destination table name
- iam_role: IAM role ARN for S3 access (recommended)
- s3_bucket/s3_region/s3_prefix: S3 staging location
- batch_size: Messages per batch (default: 10000)
- compression: COPY compression format
- delete_staged_files: Auto-cleanup toggle (default: true)
- auto_create_table: Create table if missing (default: true)

Closes apache#2540
- Fix markdown lint issues in README.md (table formatting, blank lines, code fence language)
- Fix trailing newline in Cargo.toml
- Apply TOML formatting via taplo
- Add missing dependencies to DEPENDENCIES.md (rust-s3, rxml, rxml_validation, static_assertions)
- Add Redshift sink integration test using PostgreSQL (Redshift-compatible) and LocalStack for S3
- Add s3_endpoint config option to support custom endpoints (LocalStack, MinIO)
- Add path-style S3 access for custom endpoints
- Add localstack feature to testcontainers-modules
- Create test configuration files for Redshift connector
- Add s3_endpoint: None to test_config() in lib.rs (fixes E0063)
- Add endpoint parameter to S3Uploader tests in s3.rs
- Fix formatting for long line in init_s3_uploader()
- Add iggy_connector_redshift_sink to DEPENDENCIES.md
- Add maybe-async, md5, minidom to DEPENDENCIES.md
Critical fixes:
- Change Rust edition from 2024 to 2021 in Cargo.toml
- Fix S3 cleanup to happen regardless of COPY result (prevents orphaned files)

Moderate fixes:
- Remove zstd from valid compression options (not supported by Redshift)
- Update README to remove zstd from compression list
- Handle bucket creation error in integration tests with expect()
- Log JSON serialization errors instead of silent unwrap_or_default()

Performance:
- Cache escaped quote string to avoid repeated format! allocations

Windows compatibility (for local testing):
- Add #[cfg(unix)] conditionals for Unix-specific code in sender/mod.rs
Fixes clippy warning about unused 'runtime' field in test setup struct.
The runtime field is kept for future test expansion.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Changed CONFIG_ to PLUGIN_CONFIG_ for plugin configuration fields
- Changed TOPICS_0 to TOPICS with proper JSON array format
- Added CONSUMER_GROUP environment variable
…ort with latest S3 crate

Migrate S3 usage from rust-s3 to s3-tokio and update related dependencies. Top-level Cargo.toml updated (http, lz4_flex, toml) and DEPENDENCIES.md adjusted. redshift_sink/Cargo.toml switched to s3-tokio, made sqlx a workspace dependency and added rustls as a dev-dependency. Code changes: S3Uploader now owns Bucket (removed Box) and tests install the rustls crypto provider. Integration tests were refactored to remove the manual testcontainers setup in favor of the iggy_harness-based test harness.
Introduce a new core/connectors/influxdb_common crate that provides a version-abstraction layer for InfluxDB (InfluxDB V2 and V3). Adds the InfluxDbAdapter trait, ApiVersion factory, line-protocol escaping helpers, CSV/JSONL response parsers, and concrete V2/V3 adapters plus unit tests and architecture notes. Wire the new crate into the workspace (Cargo.toml/Cargo.lock) and update existing influxdb sink/source connector manifests/sources to depend on it. Also add integration test fixtures and v3-specific integration tests and configs to exercise V3 behavior.
Remove the external influxdb common adapter and refactor the sink to natively support both V2 and V3 configurations.

Key changes:
- Removed iggy_connector_influxdb_common dependency (Cargo.toml & Cargo.lock) and inlined adapter logic.
- Introduced InfluxDbSinkConfig enum with V2/V3 variants and helper methods (url, auth header, build_write_url/health_url, precision mapping, feature flags, etc.).
- Reworked InfluxDbSink struct: store unified config, auth_header, measurement/precision, metadata flags, batch size limit, and other derived fields.
- Added line-protocol escaping helpers (write_measurement, write_tag_value, write_field_string) and simplified PayloadFormat handling.
- Adjusted client initialization, connectivity checks, retry middleware setup, and improved error messages and transient vs permanent error handling.
- Updated Sink impl: open(), consume(), process/ batching, circuit breaker interactions, and close() behavior.
- Expanded and updated unit tests to cover v2/v3 config behavior, URL/precision mapping, escaping, and append_line error/success cases.
- Added new source modules and test script files related to InfluxDB connectors.

This refactor centralises version-specific behaviour, improves configurability, and prepares the connector for V3 line-protocol and auth differences.
Delete influx_dB_test_proc_docs/scripts/test-connectors.sh — an interactive Bash end-to-end test harness for InfluxDB v2/v3 connector scenarios (Iggy messaging, polling, and five connector tests). Removes helper functions, polling logic and all test cases bundled in the script.
Extract shared parsing and protocol logic into the influxdb_common crate and update sinks/sources to consume it. Introduces delegate! macros to remove repetitive variant matching, unifies URL/auth handling via InfluxDbAdapter (including V3 precision mapping), and centralises line-protocol escaping/row parsing. Optimises body construction (build_body) and Bytes usage, adds extensive unit & HTTP integration tests (axum dev-dep), and updates Cargo.toml entries accordingly to reflect the new shared dependency.
Ensure health_url trims a trailing '/' from the base URL in both V2 and V3 adapters to avoid double slashes when appending /health, and add tests verifying the behavior. Add tests that verify write_url percent-encodes bucket/org/db query parameters and that decoding recovers the original values. Improve CSV row parsing by preallocating Row with capacity based on active headers. Clean up influxdb_source Cargo.toml by removing unused csv and futures deps, add a comment explaining dashmap/once_cell are required due to macro expansion, and update the ignored list.
Refactor and harden InfluxDB connector common code: move Row type into row.rs and re-export it; make ApiVersion::from_config return Result and error on unknown values (avoid silent defaulting); make V3 precision mapping return Result and reject invalid precisions; validate sink precision early in open() to prevent silent timestamp mistakes. Add tab escaping to line-protocol writers and expand unit tests (empty inputs, tab escapes, unicode). Make CSV parser flexible for multi-table results and handle header updates. Strengthen RFC3339 cursor regex to reject out-of-range date parts. Improve test fixture container port handling to support IPv6 mappings and better error messages. Misc: minor visibility changes, JSONL format constant, Cargo description tweak, and additional tests to cover URL/health/build_query error cases.
Add validation and runtime fixes across InfluxDB connectors:

- Require timezone suffix for cursor/initial_offset timestamps to avoid UTC-vs-local ambiguity and update regex/tests accordingly.
- Validate V2 sink config to reject empty or whitespace-only orgs at open() to prevent runtime 400s.
- Validate initial_offset early in source open() and add tests for invalid/timezone-free offsets.
- Warn when a V2 Flux query lacks an explicit sort() because Skip-N dedup relies on stable ordering.
- In V3 source row processing, emit a warning when no row contains the cursor column and ensure messages are still emitted while max_cursor remains None; add tests.
- Simplify auth header and health URL construction (removed dynamic adapter usage for these paths).
- Ensure circuit breaker records successes for successful batches and move record_success into the per-batch success path; add a test to prevent tripping on intermittent failures.
- Change several atomic counter loads to SeqCst for correctness in tests and tighten an unreachable branch where precision is validated.
- Minor protocol.rs doc clarifications about tab escaping in line protocol.

Includes multiple unit/integration tests covering the new validations and circuit-breaker behavior.
core/connectors/influxdb_common: broaden CSV header detection to recognize any of `_time`, `_start`, or `_stop` so Flux window-aggregate results are parsed correctly; add tests covering _start/_stop-only headers and aggregation queries.

core/connectors/sinks/influxdb_sink: strengthen atomic orderings (use AcqRel for fetch_add and Acquire for loads) to ensure correct cross-thread visibility of counters; update tests to use Acquire loads.

core/connectors/sources/influxdb_source: derive Debug for RowProcessingResult and change process_rows to return an Err(Error::InvalidRecordValue) when no row contains the configured cursor field (instead of silently leaving max_cursor None). Update tests to expect the error — this prevents silent infinite re-delivery and surfaces misconfigured queries to the operator.
Delete the shared iggy_connector_influxdb_common crate and fold its functionality into the sink and source connectors. protocol.rs was moved/renamed into core/connectors/sinks/influxdb_sink/src/protocol.rs (helper functions made crate-private); row parsing was moved into core/connectors/sources/influxdb_source/src/row.rs and made crate-private. Adapter/config/v2/v3 logic was inlined into the respective sink/source code (URL builders, auth header generation, precision mapping, query builders, health URL checks), and relevant visibility and call sites were updated. Workspace Cargo.toml and Cargo.lock were updated to remove the member/dependency and to add CSV where needed; tests were adapted/added for the inlined helpers and validation behavior.
Implement backward-compatible deserialization for InfluxDB configs by adding custom Deserialize impls for InfluxDbSinkConfig and InfluxDbSourceConfig that default missing version to "v2" and reject unknown versions with a clear error. Add V3-specific options and safety checks: introduce include_metadata to omit the cursor field from emitted payloads, add QUERY_FORMAT_JSONL, and enforce MAX_STUCK_CAP_FACTOR (100) with validation on open to avoid extremely large queries. Make timestamp comparison conservative (return false on parse failure) to avoid skipping data. Switch message ID generation to per-message UUIDs (remove uuid_base usage), adjust payload building to filter cursor when include_metadata=false, and small sink fix to append lines without producing trailing newlines. Update and add tests covering config deserialization, timestamp behavior, stuck-cap validation, and other affected behaviors.
Various refactors and improvements to InfluxDB source/sink connectors:

- Make many config fields pub(crate) to improve encapsulation.
- Add toml as a dev-dependency for connectors and add default "version = \"v2\"" to example config.toml files.
- Introduce base_url() helpers to normalize URLs (strip trailing slashes) and use them when building endpoints; validate V2 org is non-empty in sink config.
- Introduce RowContext to consolidate per-poll parameters passed to row-processing routines; simplify signatures for process_rows and poll functions and propagate include_metadata consistently.
- Optimize per-message UUID generation by deriving IDs from a single per-poll base UUID to reduce PRNG calls.
- Add query_has_sort_call heuristic to detect Flux sort() calls (avoids false positives on identifier prefixes) and use it when checking V2 queries.
- Improve error messages for cursor_field validation to be version-specific and add related tests.
- Add comments clarifying escaping rules and rationale for using simd_json in the sink hot path.
- Update integration test TOML keys from api_version to version and add unit tests verifying TOML deserialization defaults and behavior.

These changes are focused on robustness, performance, and clearer configuration/validation behavior.
ryerraguntla and others added 2 commits May 13, 2026 04:07
Remove an extra space before the inline comment in normalize_v3_timestamp (core/connectors/sources/influxdb_source/src/v3.rs). Pure formatting change, no behavioral impact.
@ryerraguntla
Copy link
Copy Markdown
Contributor Author

@hubcio - I was traveling on family events and got delayed. I have implemented the review comments , please review and do the needful.

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 13, 2026

@ryerraguntla no worries. take your time, family always should come first. i'll this review in upcoming days.

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 14, 2026

please resolve conflicts

/author

@github-actions github-actions Bot added the S-waiting-on-author PR is waiting on author response label May 14, 2026
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 14, 2026

for future: see CONTRIBUTING.md. you can mark PRs as /ready once it's ready for review :) i'll do it now.

/review

@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 14, 2026

/ready

@github-actions github-actions Bot added S-waiting-on-review PR is waiting on a reviewer and removed S-waiting-on-author PR is waiting on author response labels May 14, 2026
Comment thread core/connectors/sources/influxdb_source/src/v3.rs
Comment thread core/connectors/sources/influxdb_source/src/v3.rs
Comment thread core/connectors/sources/influxdb_source/src/v2.rs
Comment thread core/connectors/influxdb_v3_architecture.md Outdated
Comment thread core/connectors/sources/influxdb_source/src/common.rs
Comment thread core/connectors/sources/influxdb_source/src/v3.rs Outdated
Comment thread core/connectors/sources/influxdb_source/src/lib.rs
Comment thread core/connectors/sinks/influxdb_sink/src/lib.rs Outdated
Comment thread core/connectors/sources/influxdb_source/src/row.rs Outdated
Comment thread core/connectors/sources/influxdb_source/src/row.rs
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 20, 2026

/author

@github-actions github-actions Bot added S-waiting-on-author PR is waiting on author response and removed S-waiting-on-review PR is waiting on a reviewer labels May 20, 2026
Add comprehensive READMEs for InfluxDB sink and source and register them in the connectors index; remove obsolete influxdb_v3_architecture.md. Fix and harden connector code: switch atomic counters to Relaxed ordering for lower overhead, document deliberate escaping of `\n`/`\r` in line-protocol string fields, and prevent accidental placeholder collisions by matching whole placeholder tokens when applying query params. Improve V3 timestamp handling: normalize_v3_timestamp now returns Result (append `Z` when appropriate, and error on invalid input) and update processing and tests accordingly. Misc: clarify V3 query offset error message, add TODO to avoid wakeup churn while circuit-breaker is open, and note a perf TODO in row handling. TODO Performance improvement will be considered in later commits.
@hubcio
Copy link
Copy Markdown
Contributor

hubcio commented May 22, 2026

@ryerraguntla please check discord, I sent you PM

Improve InfluxDB v2/v3 connector robustness and tests. Key changes: strip the top-level "version" discriminator before deserializing and enable #[serde(deny_unknown_fields)] on V2/V3 config structs so unknown keys are rejected; make sink/source custom deserializers extract version as an owned String. Tighten line-protocol handling: write_measurement/write_tag_value now return Result and explicitly reject tab characters (tabs would corrupt LP parsing), and tests updated accordingly. Several sink improvements: use base64::Engine::encode_string to avoid intermediate allocation, warn once per batch for messages with timestamp==0 and reduce initial body capacity, relax atomic loads to Relaxed for counters, and add many unit/integration tests covering config validation, escaping, payload formats, timestamps, build/close/get_client, and append_line behavior (including simd-json JSON payload path). Source changes: add state_restore_error to record state-restore rejection causes, validate restored cursor values and return descriptive error if invalid, and #[serde(default)] V2State/V3State to maintain backward compatibility. Dependency updates: add ahash to the source crate and simd-json to sink dev deps, and add dependencies.md documents for both connectors. Misc: small refactors and helpers (strip_version_key), and many test assertions updated to reflect the new APIs and error paths.
@ryerraguntla
Copy link
Copy Markdown
Contributor Author

Thanks @hubcio . Let me go through the note before requesting the review.

ryerraguntla and others added 8 commits May 22, 2026 20:27
Normalize blockquote formatting in core/connectors/sources/influxdb_source/dependencies.md by removing trailing spaces and adjusting line breaks. Whitespace-only change; no functional impact.
Reflowed and wrapped long test assertions and function calls across influxdb sink and source tests to improve readability and adhere to line length. Changes are cosmetic only (whitespace/formatting) and do not alter logic or behavior. Affected files: core/connectors/sinks/influxdb_sink/src/lib.rs, core/connectors/sources/influxdb_source/src/lib.rs, core/connectors/sources/influxdb_source/src/v2.rs, core/connectors/sources/influxdb_source/src/v3.rs.
Normalize Markdown table separator rows and escape inline pipe characters in InfluxDB connector docs so tables and code snippets render correctly. Updates README and dependencies files for both sinks and sources: core/connectors/sinks/influxdb_sink/{README.md,dependencies.md} and core/connectors/sources/influxdb_source/{README.md,dependencies.md}.
Remove trailing spaces and normalize line wrapping in core/connectors/sinks/influxdb_sink/dependencies.md. This is a pure formatting change (no dependency or behavior changes); it preserves the note about switching to base64::Engine::encode_string in the feat/influxdb_v2_v3_connector branch.
Normalize V3 timestamp handling and simplify stuck-batch detection, plus docs and minor fixes.

- v3: normalize_v3_timestamp now returns (string, DateTime<Utc>) to avoid re-parsing and preserve zero-allocation behavior; processing uses the parsed DateTime for stable IDs and cursor comparisons.
- Stuck-detection: simplified logic to only treat a batch as stuck when rows_at_max_cursor >= effective_batch (prevents false positives for partial batches and avoids data loss). Added tests covering small/full stuck scenarios and offset/inflation behavior.
- Add warning logs when state serialization/persistence fails so operators see potential re-delivery risk.
- Fix apply_query_params capacity to account for offset length.
- README: clarify JSON payload metadata behavior, payload value type differences between v2/v3, note breaking schema change, and document stuck_batch_cap_factor=0 behavior.

These changes fix timestamp parsing inefficiencies, prevent incorrect stuck detection that could drop or duplicate rows, and improve operational visibility.
Update comments in core/connectors/sources/influxdb_source/src/v3.rs to clarify V3 cursor behavior and test intent. Reword the note about stuck batches to specify the case of a full batch where all rows share the same timestamp, and replace a reference to validate_cursor with normalize_v3_timestamp while noting the RFC 3339 + trailing "Z" check. Also tweak a test comment to clearly state the non-stuck condition (rows_at_max_cursor < effective_batch). These are documentation/comment only changes with no functional impact.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

S-waiting-on-author PR is waiting on author response

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants