Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 86 additions & 70 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,103 +3,119 @@
import logging
import os
import re
from typing import Optional
from urllib.parse import parse_qs, unquote, urlencode, urlparse, urlunparse

import pytest

logger = logging.getLogger(__name__)


def _worker_schema_name(worker_id: str) -> str | None:
def _worker_schema_name(worker_id: str) -> Optional[str]:
"""Return a safe SQL schema name for an xdist worker ID."""
match = re.fullmatch(r"gw(\d+)", worker_id)
if match is None:
return None
return f"test_worker_{match.group(1)}"


@pytest.fixture(autouse=True)
def inject_worker_schema_for_sql_tests(monkeypatch, request):
"""Automatically inject worker-specific schema into SQL connection string.
def _build_worker_url(original_url: str, schema_name: str) -> str:
"""Return a copy of original_url with search_path set to schema_name."""
parsed = urlparse(original_url)
query_params = parse_qs(parsed.query)

if "options" in query_params:
current_options = unquote(query_params["options"][0])
new_options = f"{current_options} -csearch_path={schema_name}"
else:
new_options = f"-csearch_path={schema_name}"

query_params["options"] = [new_options]
new_query = urlencode(query_params, doseq=True)
return urlunparse(
(
parsed.scheme,
parsed.netloc,
parsed.path,
parsed.params,
new_query,
parsed.fragment,
)
)


This fixture enables parallel SQL test execution by giving each pytest-xdist worker its own PostgreSQL schema,
preventing table creation conflicts.
@pytest.fixture(scope="session")
def worker_sql_connection(request: pytest.FixtureRequest) -> Optional[str]:
"""Create the worker-specific PostgreSQL schema once per xdist worker session.

Returns the worker-specific connection URL, or None when schema isolation is not
needed (serial run or non-PostgreSQL backend). The schema is created with
``CREATE SCHEMA IF NOT EXISTS`` so this fixture is safe to run even if the schema
already exists from a previous interrupted run.

A non-None return value means "use this URL"; schema creation is attempted but may
fail silently (e.g. if SQLAlchemy is not installed or the DB is unreachable). Tests
that depend on the schema will fail at the DB level with a diagnostic error.

"""
# Only apply to SQL tests
if "sql" not in request.node.keywords:
yield
return
# Avoid touching SQL backends entirely when no SQL tests are collected.
has_sql_tests = any("sql" in item.keywords for item in request.session.items)
if not has_sql_tests:
return None

worker_id = os.environ.get("PYTEST_XDIST_WORKER", "master")

if worker_id == "master":
# Not running in parallel, no schema isolation needed
yield
return
return None

# Get the original SQL connection string
original_url = os.environ.get("SQLALCHEMY_DATABASE_URL", "sqlite:///:memory:")
if "postgresql" not in original_url:
return None

if "postgresql" in original_url:
# Create worker-specific schema name
schema_name = _worker_schema_name(worker_id)
if schema_name is None:
logger.warning("Unexpected worker ID for SQL schema isolation: %s", worker_id)
yield
return

# Parse the URL
parsed = urlparse(original_url)

# Get existing query parameters
query_params = parse_qs(parsed.query)

# Add or update the options parameter to set search_path
if "options" in query_params:
# Append to existing options
current_options = unquote(query_params["options"][0])
new_options = f"{current_options} -csearch_path={schema_name}"
else:
# Create new options
new_options = f"-csearch_path={schema_name}"

query_params["options"] = [new_options]

# Rebuild the URL with updated query parameters
new_query = urlencode(query_params, doseq=True)
new_url = urlunparse(
(
parsed.scheme,
parsed.netloc,
parsed.path,
parsed.params,
new_query,
parsed.fragment,
)
)
schema_name = _worker_schema_name(worker_id)
if schema_name is None:
logger.warning("Unexpected worker ID for SQL schema isolation: %s", worker_id)
return None

new_url = _build_worker_url(original_url, schema_name)

# Override both the environment variable and the module constant
monkeypatch.setenv("SQLALCHEMY_DATABASE_URL", new_url)
engine = None
try:
from sqlalchemy import create_engine, text

# Also patch the SQL_CONN_STR constant used in tests
import tests.sql_tests.test_sql_core
engine = create_engine(original_url)
with engine.connect() as conn:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
conn.commit()
except Exception as e:
logger.debug("Failed to create schema %s: %s", schema_name, e)
finally:
if engine is not None:
engine.dispose()

monkeypatch.setattr(tests.sql_tests.test_sql_core, "SQL_CONN_STR", new_url)
return new_url

# Ensure schema creation by creating it before tests run
try:
from sqlalchemy import create_engine, text

# Use original URL to create schema (without search_path)
engine = create_engine(original_url)
with engine.connect() as conn:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
conn.commit()
engine.dispose()
except Exception as e:
# If we can't create the schema, the test will fail anyway
logger.debug(f"Failed to create schema {schema_name}: {e}")
@pytest.fixture(autouse=True)
def inject_worker_schema_for_sql_tests(monkeypatch, request, worker_sql_connection):
"""Automatically inject worker-specific schema into SQL connection string.

This fixture enables parallel SQL test execution by giving each pytest-xdist worker
its own PostgreSQL schema, preventing table creation conflicts.

Schema creation is handled once per worker session by
:func:`worker_sql_connection`. This fixture only performs lightweight
per-test monkeypatching of the environment variable and module constant.

"""
if "sql" not in request.node.keywords or worker_sql_connection is None:
yield
return

monkeypatch.setenv("SQLALCHEMY_DATABASE_URL", worker_sql_connection)

import tests.sql_tests.test_sql_core

monkeypatch.setattr(tests.sql_tests.test_sql_core, "SQL_CONN_STR", worker_sql_connection)

yield

Expand Down Expand Up @@ -193,4 +209,4 @@ def cleanup_test_schemas(request):
engine.dispose()
except Exception as e:
# If cleanup fails, it's not critical
logger.debug(f"Failed to cleanup schema {schema_name}: {e}")
logger.debug("Failed to cleanup schema %s: %s", schema_name, e)
54 changes: 54 additions & 0 deletions tests/test_conftest_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Unit tests for private helpers in tests/conftest.py."""

from tests.conftest import _build_worker_url, _worker_schema_name


class TestWorkerSchemaName:
def test_valid_gw0(self):
"""Return a schema name for the first xdist worker."""
assert _worker_schema_name("gw0") == "test_worker_0"

def test_valid_gw99(self):
"""Return a schema name for multi-digit xdist workers."""
assert _worker_schema_name("gw99") == "test_worker_99"

def test_master_returns_none(self):
"""Return ``None`` when pytest is not running under xdist."""
assert _worker_schema_name("master") is None

def test_non_gw_id_returns_none(self):
"""Ignore worker IDs that do not match the xdist pattern."""
assert _worker_schema_name("worker1") is None

def test_partial_match_returns_none(self):
"""Require a full worker ID match before creating a schema name."""
# "gw0extra" should not match the fullmatch pattern
assert _worker_schema_name("gw0extra") is None


class TestBuildWorkerUrl:
def test_url_without_options(self):
"""Add a search path option when the URL has no existing options."""
url = "postgresql://user:pass@localhost/testdb"
result = _build_worker_url(url, "test_worker_0")
assert "options" in result
assert "search_path%3Dtest_worker_0" in result or "search_path=test_worker_0" in result

def test_url_with_existing_options_appends(self):
"""Append the worker search path to existing connection options."""
url = "postgresql://user:pass@localhost/testdb?options=-cstatement_timeout%3D5000"
result = _build_worker_url(url, "test_worker_1")
# The new search_path must be present
assert "search_path" in result
assert "test_worker_1" in result
# The original option must still be present
assert "statement_timeout" in result

def test_url_encoded_roundtrip(self):
"""Preserve existing URL-encoded options while adding the worker search path."""
url = "postgresql://user:pass@localhost/testdb?options=-cwork_mem%3D64MB"
result = _build_worker_url(url, "test_worker_2")
# Scheme, host, and path must be preserved
assert result.startswith("postgresql://user:pass@localhost/testdb")
assert "test_worker_2" in result
assert "work_mem" in result
Loading