Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
1. [#198](https://github.com/InfluxCommunity/influxdb3-python/pull/198): Support custom tag order via `tag_order` write option.
See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more.
1. [#202](https://github.com/InfluxCommunity/influxdb3-python/pull/202): Add escape for field keys when serializing to line protocol in `PolarsDataframeSerializer`.
1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Add `influx3 query` CLI support for executing SQL/InfluxQL queries with JSON/JSONL/CSV/pretty output, including module execution via `python -m influxdb_client_3`.

### Bug Fixes

1. [#208](https://github.com/InfluxCommunity/influxdb3-python/pull/208): Normalize PEM certificate line endings when loading Flight query root certificates to ensure consistent SSL option behavior on Windows.

## 0.18.0 [2026-02-19]

Expand Down
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,44 @@ Note: This does not include Pandas support. If you would like to use key feature

*Note: Please make sure you are using 3.9 or above. For the best performance use 3.11+*

## CLI (Agent-Friendly Query Tool)

This package includes an `influx3` CLI for read/query workflows.

### Run a query

```bash
influx3 query -d my_database "SELECT * FROM cpu LIMIT 5"
```

By default, output is JSON to stdout.

### Supported formats

- `json` (default)
- `jsonl`
- `csv`
- `pretty`

```bash
influx3 query -d my_database --format csv "SELECT * FROM cpu LIMIT 5"
```

### Config precedence

Configuration values are resolved in this order:

1. CLI flags
2. `INFLUXDB3_*` environment variables
3. legacy `INFLUX_*` environment variables
4. built-in defaults (host defaults to `http://127.0.0.1:8181`)

Relevant environment variables:

- `INFLUXDB3_HOST_URL` (legacy fallback: `INFLUX_HOST`)
- `INFLUXDB3_DATABASE_NAME` (legacy fallback: `INFLUX_DATABASE`)
- `INFLUXDB3_AUTH_TOKEN` (legacy fallback: `INFLUX_TOKEN`)

# Usage
One of the easiest ways to get started is to checkout the ["Pokemon Trainer Cookbook"](https://github.com/InfluxCommunity/influxdb3-python/blob/main/Examples/pokemon-trainer/cookbook.ipynb). This scenario takes you through the basics of both the client library and Pyarrow.

Expand Down
5 changes: 5 additions & 0 deletions influxdb_client_3/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from influxdb_client_3.cli import main


if __name__ == "__main__":
raise SystemExit(main())
180 changes: 180 additions & 0 deletions influxdb_client_3/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import argparse
import csv
import io
import json
import os
import sys
from typing import Mapping, Optional

import pyarrow as pa

from influxdb_client_3 import (
INFLUX_DATABASE,
INFLUX_HOST,
INFLUX_TOKEN,
InfluxDBClient3,
)
from influxdb_client_3.exceptions import InfluxDB3ClientQueryError, InfluxDBError


def _resolve_option(
cli_value: Optional[str],
env: Mapping[str, str],
primary_env: str,
secondary_env: Optional[str] = None,
default: Optional[str] = None,
) -> Optional[str]:
if cli_value is not None:
return cli_value

for var in (primary_env, secondary_env):
if not var:
continue
value = env.get(var)
if value not in (None, ""):
return value

return default


def _rows_to_csv(rows, fieldnames):
buff = io.StringIO()
writer = csv.DictWriter(buff, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)
return buff.getvalue()


def _rows_to_pretty(rows, fieldnames):
if not rows:
return "(0 rows)"

widths = {name: len(name) for name in fieldnames}
for row in rows:
for name in fieldnames:
widths[name] = max(widths[name], len(str(row.get(name, ""))))

header = " | ".join(name.ljust(widths[name]) for name in fieldnames)
sep = "-+-".join("-" * widths[name] for name in fieldnames)
lines = [header, sep]
for row in rows:
lines.append(" | ".join(str(row.get(name, "")).ljust(widths[name]) for name in fieldnames))
return "\n".join(lines)


def _rows_to_json(rows, fieldnames):
return json.dumps(rows, default=str)


def _rows_to_jsonl(rows, fieldnames):
return "\n".join(json.dumps(row, default=str) for row in rows)


_FORMATTERS = {
"json": _rows_to_json,
"jsonl": _rows_to_jsonl,
"csv": _rows_to_csv,
"pretty": _rows_to_pretty,
}


def _format_table(table: pa.Table, output_format: str) -> str:
rows = table.to_pylist()
fieldnames = table.schema.names
return _FORMATTERS[output_format](rows, fieldnames)


def _ensure_trailing_nl(text: str) -> str:
return text if text.endswith("\n") else text + "\n"


def _write_error(stderr, message: str):
stderr.write(json.dumps({"error": str(message)}) + "\n")


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="influx3", description="InfluxDB 3 query CLI")
subparsers = parser.add_subparsers(dest="command", required=True)

query_parser = subparsers.add_parser("query", aliases=["q"], help="Run a SQL or InfluxQL query")
query_parser.add_argument("query", nargs="?", help="The query string to execute")
query_parser.add_argument("-f", "--file", dest="file_path", help="File containing the query")
query_parser.add_argument("-H", "--host", dest="host", help="InfluxDB host URL")
query_parser.add_argument("-d", "--database", dest="database", help="Database name")
query_parser.add_argument("--token", dest="token", help="Authentication token")
query_parser.add_argument(
"-l",
"--language",
dest="language",
choices=["sql", "influxql"],
default="sql",
help="Query language",
)
query_parser.add_argument(
"--format",
dest="output_format",
choices=list(_FORMATTERS),
default="json",
help="Output format",
)
query_parser.add_argument("-o", "--output", dest="output_file_path", help="Write output to file")
query_parser.add_argument("--query-timeout", dest="query_timeout", type=int, help="Query timeout in ms")
query_parser.set_defaults(func=_run_query)
return parser


def _run_query(args, stdout, stderr, env: Optional[Mapping[str, str]] = None) -> int:
if env is None:
env = os.environ

host = _resolve_option(args.host, env, "INFLUXDB3_HOST_URL", INFLUX_HOST, "http://127.0.0.1:8181")
database = _resolve_option(args.database, env, "INFLUXDB3_DATABASE_NAME", INFLUX_DATABASE)
token = _resolve_option(args.token, env, "INFLUXDB3_AUTH_TOKEN", INFLUX_TOKEN)

if (args.query is None) == (args.file_path is None):
_write_error(stderr, "Provide exactly one of query or --file.")
return 1

if not database:
_write_error(stderr, "Database is required. Set --database or INFLUXDB3_DATABASE_NAME.")
return 1

try:
query = args.query
if args.file_path:
with open(args.file_path, "r", encoding="utf-8") as file_handle:
query = file_handle.read()

query_kwargs = {}
if args.query_timeout is not None:
query_kwargs["query_timeout"] = args.query_timeout

with InfluxDBClient3(host=host, database=database, token=token, **query_kwargs) as client:
table = client.query(
query=query,
language=args.language,
mode="all",
database=database,
)

payload = _ensure_trailing_nl(_format_table(table, args.output_format))
if args.output_file_path:
with open(args.output_file_path, "w", encoding="utf-8", newline="") as file_handle:
file_handle.write(payload)
else:
stdout.write(payload)
return 0
except (InfluxDB3ClientQueryError, InfluxDBError, OSError, pa.ArrowException) as error:
_write_error(stderr, str(error))
return 1


def main(argv=None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
return args.func(args, sys.stdout, sys.stderr)


if __name__ == "__main__":
raise SystemExit(main())
4 changes: 3 additions & 1 deletion influxdb_client_3/query/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def __init__(self, root_certs_path: str,

def _read_certs(self, path: str) -> bytes:
with open(path, "rb") as certs_file:
return certs_file.read()
certs = certs_file.read()
# Normalize PEM line endings so behavior is stable across platforms.
return certs.replace(b"\r\n", b"\n").replace(b"\r", b"\n")


class QueryApiOptionsBuilder(object):
Expand Down
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ def get_version():
]
},
install_requires=requires,
entry_points={
'console_scripts': ['influx3 = influxdb_client_3.cli:main'],
},
python_requires='>=3.9',
classifiers=[
'Development Status :: 4 - Beta',
Expand Down
Loading
Loading