From 4909eade291a45add15484d7ff9cbb5ecc4b096c Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Mon, 20 Apr 2026 17:06:37 +0200 Subject: [PATCH 1/2] fix: normalize PEM line endings in certificate reading --- influxdb_client_3/query/query_api.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/influxdb_client_3/query/query_api.py b/influxdb_client_3/query/query_api.py index 7631795..ad50293 100644 --- a/influxdb_client_3/query/query_api.py +++ b/influxdb_client_3/query/query_api.py @@ -61,7 +61,9 @@ def __init__(self, root_certs_path: str, def _read_certs(self, path: str) -> bytes: with open(path, "rb") as certs_file: - return certs_file.read() + certs = certs_file.read() + # Normalize PEM line endings so behavior is stable across platforms. + return certs.replace(b"\r\n", b"\n").replace(b"\r", b"\n") class QueryApiOptionsBuilder(object): From f622a489cb4ee3306e511c97466ef0a9fdcedfea Mon Sep 17 00:00:00 2001 From: Adrian Immer Date: Mon, 20 Apr 2026 17:07:13 +0200 Subject: [PATCH 2/2] feat: add CLI for querying InfluxDB 3 Add a new query CLI to support quick read/debug workflows from terminal and AI agents. - add influx3 query with json, jsonl, csv, and pretty output - add module execution path via python -m influxdb_client_3 - wire console entry point in setup - add CLI tests --- CHANGELOG.md | 5 + README.md | 38 +++++++ influxdb_client_3/__main__.py | 5 + influxdb_client_3/cli.py | 180 ++++++++++++++++++++++++++++++++++ setup.py | 3 + tests/test_cli.py | 140 ++++++++++++++++++++++++++ 6 files changed, 371 insertions(+) create mode 100644 influxdb_client_3/__main__.py create mode 100644 influxdb_client_3/cli.py create mode 100644 tests/test_cli.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 54c278f..7ae7f4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ 1. [#198](https://github.com/InfluxCommunity/influxdb3-python/pull/198): Support custom tag order via `tag_order` write option. See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more. 1. [#202](https://github.com/InfluxCommunity/influxdb3-python/pull/202): Add escape for field keys when serializing to line protocol in `PolarsDataframeSerializer`. +1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Add `influx3 query` CLI support for executing SQL/InfluxQL queries with JSON/JSONL/CSV/pretty output, including module execution via `python -m influxdb_client_3`. + +### Bug Fixes + +1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Normalize PEM certificate line endings when loading Flight query root certificates to ensure consistent SSL option behavior on Windows. ## 0.18.0 [2026-02-19] diff --git a/README.md b/README.md index 807ce13..21cff2c 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,44 @@ Note: This does not include Pandas support. If you would like to use key feature *Note: Please make sure you are using 3.9 or above. For the best performance use 3.11+* +## CLI (Agent-Friendly Query Tool) + +This package includes an `influx3` CLI for read/query workflows. + +### Run a query + +```bash +influx3 query -d my_database "SELECT * FROM cpu LIMIT 5" +``` + +By default, output is JSON to stdout. + +### Supported formats + +- `json` (default) +- `jsonl` +- `csv` +- `pretty` + +```bash +influx3 query -d my_database --format csv "SELECT * FROM cpu LIMIT 5" +``` + +### Config precedence + +Configuration values are resolved in this order: + +1. CLI flags +2. `INFLUXDB3_*` environment variables +3. legacy `INFLUX_*` environment variables +4. built-in defaults (host defaults to `http://127.0.0.1:8181`) + +Relevant environment variables: + +- `INFLUXDB3_HOST_URL` (legacy fallback: `INFLUX_HOST`) +- `INFLUXDB3_DATABASE_NAME` (legacy fallback: `INFLUX_DATABASE`) +- `INFLUXDB3_AUTH_TOKEN` (legacy fallback: `INFLUX_TOKEN`) + # Usage One of the easiest ways to get started is to checkout the ["Pokemon Trainer Cookbook"](https://github.com/InfluxCommunity/influxdb3-python/blob/main/Examples/pokemon-trainer/cookbook.ipynb). This scenario takes you through the basics of both the client library and Pyarrow. diff --git a/influxdb_client_3/__main__.py b/influxdb_client_3/__main__.py new file mode 100644 index 0000000..f36f60c --- /dev/null +++ b/influxdb_client_3/__main__.py @@ -0,0 +1,5 @@ +from influxdb_client_3.cli import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/influxdb_client_3/cli.py b/influxdb_client_3/cli.py new file mode 100644 index 0000000..1c0b672 --- /dev/null +++ b/influxdb_client_3/cli.py @@ -0,0 +1,180 @@ +import argparse +import csv +import io +import json +import os +import sys +from typing import Mapping, Optional + +import pyarrow as pa + +from influxdb_client_3 import ( + INFLUX_DATABASE, + INFLUX_HOST, + INFLUX_TOKEN, + InfluxDBClient3, +) +from influxdb_client_3.exceptions import InfluxDB3ClientQueryError, InfluxDBError + + +def _resolve_option( + cli_value: Optional[str], + env: Mapping[str, str], + primary_env: str, + secondary_env: Optional[str] = None, + default: Optional[str] = None, +) -> Optional[str]: + if cli_value is not None: + return cli_value + + for var in (primary_env, secondary_env): + if not var: + continue + value = env.get(var) + if value not in (None, ""): + return value + + return default + + +def _rows_to_csv(rows, fieldnames): + buff = io.StringIO() + writer = csv.DictWriter(buff, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow(row) + return buff.getvalue() + + +def _rows_to_pretty(rows, fieldnames): + if not rows: + return "(0 rows)" + + widths = {name: len(name) for name in fieldnames} + for row in rows: + for name in fieldnames: + widths[name] = max(widths[name], len(str(row.get(name, "")))) + + header = " | ".join(name.ljust(widths[name]) for name in fieldnames) + sep = "-+-".join("-" * widths[name] for name in fieldnames) + lines = [header, sep] + for row in rows: + lines.append(" | ".join(str(row.get(name, "")).ljust(widths[name]) for name in fieldnames)) + return "\n".join(lines) + + +def _rows_to_json(rows, fieldnames): + return json.dumps(rows, default=str) + + +def _rows_to_jsonl(rows, fieldnames): + return "\n".join(json.dumps(row, default=str) for row in rows) + + +_FORMATTERS = { + "json": _rows_to_json, + "jsonl": _rows_to_jsonl, + "csv": _rows_to_csv, + "pretty": _rows_to_pretty, +} + + +def _format_table(table: pa.Table, output_format: str) -> str: + rows = table.to_pylist() + fieldnames = table.schema.names + return _FORMATTERS[output_format](rows, fieldnames) + + +def _ensure_trailing_nl(text: str) -> str: + return text if text.endswith("\n") else text + "\n" + + +def _write_error(stderr, message: str): + stderr.write(json.dumps({"error": str(message)}) + "\n") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="influx3", description="InfluxDB 3 query CLI") + subparsers = parser.add_subparsers(dest="command", required=True) + + query_parser = subparsers.add_parser("query", aliases=["q"], help="Run a SQL or InfluxQL query") + query_parser.add_argument("query", nargs="?", help="The query string to execute") + query_parser.add_argument("-f", "--file", dest="file_path", help="File containing the query") + query_parser.add_argument("-H", "--host", dest="host", help="InfluxDB host URL") + query_parser.add_argument("-d", "--database", dest="database", help="Database name") + query_parser.add_argument("--token", dest="token", help="Authentication token") + query_parser.add_argument( + "-l", + "--language", + dest="language", + choices=["sql", "influxql"], + default="sql", + help="Query language", + ) + query_parser.add_argument( + "--format", + dest="output_format", + choices=list(_FORMATTERS), + default="json", + help="Output format", + ) + query_parser.add_argument("-o", "--output", dest="output_file_path", help="Write output to file") + query_parser.add_argument("--query-timeout", dest="query_timeout", type=int, help="Query timeout in ms") + query_parser.set_defaults(func=_run_query) + return parser + + +def _run_query(args, stdout, stderr, env: Optional[Mapping[str, str]] = None) -> int: + if env is None: + env = os.environ + + host = _resolve_option(args.host, env, "INFLUXDB3_HOST_URL", INFLUX_HOST, "http://127.0.0.1:8181") + database = _resolve_option(args.database, env, "INFLUXDB3_DATABASE_NAME", INFLUX_DATABASE) + token = _resolve_option(args.token, env, "INFLUXDB3_AUTH_TOKEN", INFLUX_TOKEN) + + if (args.query is None) == (args.file_path is None): + _write_error(stderr, "Provide exactly one of query or --file.") + return 1 + + if not database: + _write_error(stderr, "Database is required. Set --database or INFLUXDB3_DATABASE_NAME.") + return 1 + + try: + query = args.query + if args.file_path: + with open(args.file_path, "r", encoding="utf-8") as file_handle: + query = file_handle.read() + + query_kwargs = {} + if args.query_timeout is not None: + query_kwargs["query_timeout"] = args.query_timeout + + with InfluxDBClient3(host=host, database=database, token=token, **query_kwargs) as client: + table = client.query( + query=query, + language=args.language, + mode="all", + database=database, + ) + + payload = _ensure_trailing_nl(_format_table(table, args.output_format)) + if args.output_file_path: + with open(args.output_file_path, "w", encoding="utf-8", newline="") as file_handle: + file_handle.write(payload) + else: + stdout.write(payload) + return 0 + except (InfluxDB3ClientQueryError, InfluxDBError, OSError, pa.ArrowException) as error: + _write_error(stderr, str(error)) + return 1 + + +def main(argv=None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + return args.func(args, sys.stdout, sys.stderr) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/setup.py b/setup.py index 2920869..5883e43 100644 --- a/setup.py +++ b/setup.py @@ -60,6 +60,9 @@ def get_version(): ] }, install_requires=requires, + entry_points={ + 'console_scripts': ['influx3 = influxdb_client_3.cli:main'], + }, python_requires='>=3.9', classifiers=[ 'Development Status :: 4 - Beta', diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..4422f3f --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,140 @@ +import io +import json +from argparse import Namespace +from unittest.mock import MagicMock, patch + +import pyarrow as pa + +from influxdb_client_3.cli import _run_query, build_parser, main +from influxdb_client_3.exceptions import InfluxDB3ClientQueryError + + +def _args(**overrides): + base = dict( + command="query", + query=None, + file_path=None, + host=None, + database=None, + token=None, + language="sql", + output_format="json", + output_file_path=None, + query_timeout=None, + ) + return Namespace(**{**base, **overrides}) + + +def _mock_client(return_value=None, side_effect=None): + client = MagicMock() + client.__enter__.return_value = client + if side_effect is not None: + client.query.side_effect = side_effect + else: + client.query.return_value = return_value + return client + + +def test_build_parser_query_alias(): + parser = build_parser() + args = parser.parse_args(["q", "SELECT 1"]) + assert args.command == "q" + assert args.query == "SELECT 1" + + +def test_run_query_uses_env_precedence(): + args = _args(query="SELECT 1") + env = { + "INFLUXDB3_HOST_URL": "http://from-primary:8181", + "INFLUX_HOST": "http://from-legacy:8181", + "INFLUXDB3_DATABASE_NAME": "db_primary", + "INFLUX_DATABASE": "db_legacy", + "INFLUXDB3_AUTH_TOKEN": "token_primary", + "INFLUX_TOKEN": "token_legacy", + } + mock_client = _mock_client(return_value=pa.Table.from_pylist([{"value": 1}])) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client) as client_ctor: + rc = _run_query(args, stdout, stderr, env=env) + + assert rc == 0 + client_ctor.assert_called_once_with( + host="http://from-primary:8181", + database="db_primary", + token="token_primary", + ) + mock_client.query.assert_called_once_with( + query="SELECT 1", + language="sql", + mode="all", + database="db_primary", + ) + + +def test_run_query_formats_jsonl(): + args = _args( + query="SELECT * FROM m", + host="http://localhost:8181", + database="db1", + output_format="jsonl", + ) + mock_client = _mock_client(return_value=pa.Table.from_pylist([{"a": 1}, {"a": 2}])) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 0 + assert stdout.getvalue() == '{"a": 1}\n{"a": 2}\n' + assert stderr.getvalue() == "" + + +def test_run_query_reads_query_from_file(tmp_path): + query_file = tmp_path / "query.sql" + query_file.write_text("SELECT * FROM cpu LIMIT 1", encoding="utf-8") + + args = _args( + file_path=str(query_file), + host="http://localhost:8181", + database="db1", + ) + mock_client = _mock_client(return_value=pa.Table.from_pylist([{"ok": True}])) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 0 + mock_client.query.assert_called_once_with( + query="SELECT * FROM cpu LIMIT 1", + language="sql", + mode="all", + database="db1", + ) + + +def test_run_query_writes_json_error_for_query_exception(): + args = _args(query="SELECT bad", host="http://localhost:8181", database="db1") + mock_client = _mock_client(side_effect=InfluxDB3ClientQueryError("bad query")) + + stdout, stderr = io.StringIO(), io.StringIO() + + with patch("influxdb_client_3.cli.InfluxDBClient3", return_value=mock_client): + rc = _run_query(args, stdout, stderr, env={}) + + assert rc == 1 + assert json.loads(stderr.getvalue()) == {"error": "bad query"} + + +def test_main_returns_1_when_database_missing(): + with patch("influxdb_client_3.cli.sys.stdout", new=io.StringIO()), patch( + "influxdb_client_3.cli.sys.stderr", new=io.StringIO() + ) as stderr: + rc = main(["query", "SELECT 1"]) + + assert rc == 1 + assert "Database is required" in stderr.getvalue()