diff --git a/tests/conftest.py b/tests/conftest.py index 4450efa6..baa5b488 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,6 +3,7 @@ import logging import os import re +from typing import Optional from urllib.parse import parse_qs, unquote, urlencode, urlparse, urlunparse import pytest @@ -10,7 +11,7 @@ logger = logging.getLogger(__name__) -def _worker_schema_name(worker_id: str) -> str | None: +def _worker_schema_name(worker_id: str) -> Optional[str]: """Return a safe SQL schema name for an xdist worker ID.""" match = re.fullmatch(r"gw(\d+)", worker_id) if match is None: @@ -18,88 +19,103 @@ def _worker_schema_name(worker_id: str) -> str | None: return f"test_worker_{match.group(1)}" -@pytest.fixture(autouse=True) -def inject_worker_schema_for_sql_tests(monkeypatch, request): - """Automatically inject worker-specific schema into SQL connection string. +def _build_worker_url(original_url: str, schema_name: str) -> str: + """Return a copy of original_url with search_path set to schema_name.""" + parsed = urlparse(original_url) + query_params = parse_qs(parsed.query) + + if "options" in query_params: + current_options = unquote(query_params["options"][0]) + new_options = f"{current_options} -csearch_path={schema_name}" + else: + new_options = f"-csearch_path={schema_name}" + + query_params["options"] = [new_options] + new_query = urlencode(query_params, doseq=True) + return urlunparse( + ( + parsed.scheme, + parsed.netloc, + parsed.path, + parsed.params, + new_query, + parsed.fragment, + ) + ) + - This fixture enables parallel SQL test execution by giving each pytest-xdist worker its own PostgreSQL schema, - preventing table creation conflicts. +@pytest.fixture(scope="session") +def worker_sql_connection(request: pytest.FixtureRequest) -> Optional[str]: + """Create the worker-specific PostgreSQL schema once per xdist worker session. + + Returns the worker-specific connection URL, or None when schema isolation is not + needed (serial run or non-PostgreSQL backend). The schema is created with + ``CREATE SCHEMA IF NOT EXISTS`` so this fixture is safe to run even if the schema + already exists from a previous interrupted run. + + A non-None return value means "use this URL"; schema creation is attempted but may + fail silently (e.g. if SQLAlchemy is not installed or the DB is unreachable). Tests + that depend on the schema will fail at the DB level with a diagnostic error. """ - # Only apply to SQL tests - if "sql" not in request.node.keywords: - yield - return + # Avoid touching SQL backends entirely when no SQL tests are collected. + has_sql_tests = any("sql" in item.keywords for item in request.session.items) + if not has_sql_tests: + return None worker_id = os.environ.get("PYTEST_XDIST_WORKER", "master") - if worker_id == "master": - # Not running in parallel, no schema isolation needed - yield - return + return None - # Get the original SQL connection string original_url = os.environ.get("SQLALCHEMY_DATABASE_URL", "sqlite:///:memory:") + if "postgresql" not in original_url: + return None - if "postgresql" in original_url: - # Create worker-specific schema name - schema_name = _worker_schema_name(worker_id) - if schema_name is None: - logger.warning("Unexpected worker ID for SQL schema isolation: %s", worker_id) - yield - return - - # Parse the URL - parsed = urlparse(original_url) - - # Get existing query parameters - query_params = parse_qs(parsed.query) - - # Add or update the options parameter to set search_path - if "options" in query_params: - # Append to existing options - current_options = unquote(query_params["options"][0]) - new_options = f"{current_options} -csearch_path={schema_name}" - else: - # Create new options - new_options = f"-csearch_path={schema_name}" - - query_params["options"] = [new_options] - - # Rebuild the URL with updated query parameters - new_query = urlencode(query_params, doseq=True) - new_url = urlunparse( - ( - parsed.scheme, - parsed.netloc, - parsed.path, - parsed.params, - new_query, - parsed.fragment, - ) - ) + schema_name = _worker_schema_name(worker_id) + if schema_name is None: + logger.warning("Unexpected worker ID for SQL schema isolation: %s", worker_id) + return None + + new_url = _build_worker_url(original_url, schema_name) - # Override both the environment variable and the module constant - monkeypatch.setenv("SQLALCHEMY_DATABASE_URL", new_url) + engine = None + try: + from sqlalchemy import create_engine, text - # Also patch the SQL_CONN_STR constant used in tests - import tests.sql_tests.test_sql_core + engine = create_engine(original_url) + with engine.connect() as conn: + conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) + conn.commit() + except Exception as e: + logger.debug("Failed to create schema %s: %s", schema_name, e) + finally: + if engine is not None: + engine.dispose() - monkeypatch.setattr(tests.sql_tests.test_sql_core, "SQL_CONN_STR", new_url) + return new_url - # Ensure schema creation by creating it before tests run - try: - from sqlalchemy import create_engine, text - # Use original URL to create schema (without search_path) - engine = create_engine(original_url) - with engine.connect() as conn: - conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) - conn.commit() - engine.dispose() - except Exception as e: - # If we can't create the schema, the test will fail anyway - logger.debug(f"Failed to create schema {schema_name}: {e}") +@pytest.fixture(autouse=True) +def inject_worker_schema_for_sql_tests(monkeypatch, request, worker_sql_connection): + """Automatically inject worker-specific schema into SQL connection string. + + This fixture enables parallel SQL test execution by giving each pytest-xdist worker + its own PostgreSQL schema, preventing table creation conflicts. + + Schema creation is handled once per worker session by + :func:`worker_sql_connection`. This fixture only performs lightweight + per-test monkeypatching of the environment variable and module constant. + + """ + if "sql" not in request.node.keywords or worker_sql_connection is None: + yield + return + + monkeypatch.setenv("SQLALCHEMY_DATABASE_URL", worker_sql_connection) + + import tests.sql_tests.test_sql_core + + monkeypatch.setattr(tests.sql_tests.test_sql_core, "SQL_CONN_STR", worker_sql_connection) yield @@ -193,4 +209,4 @@ def cleanup_test_schemas(request): engine.dispose() except Exception as e: # If cleanup fails, it's not critical - logger.debug(f"Failed to cleanup schema {schema_name}: {e}") + logger.debug("Failed to cleanup schema %s: %s", schema_name, e) diff --git a/tests/test_conftest_helpers.py b/tests/test_conftest_helpers.py new file mode 100644 index 00000000..5d936cbb --- /dev/null +++ b/tests/test_conftest_helpers.py @@ -0,0 +1,54 @@ +"""Unit tests for private helpers in tests/conftest.py.""" + +from tests.conftest import _build_worker_url, _worker_schema_name + + +class TestWorkerSchemaName: + def test_valid_gw0(self): + """Return a schema name for the first xdist worker.""" + assert _worker_schema_name("gw0") == "test_worker_0" + + def test_valid_gw99(self): + """Return a schema name for multi-digit xdist workers.""" + assert _worker_schema_name("gw99") == "test_worker_99" + + def test_master_returns_none(self): + """Return ``None`` when pytest is not running under xdist.""" + assert _worker_schema_name("master") is None + + def test_non_gw_id_returns_none(self): + """Ignore worker IDs that do not match the xdist pattern.""" + assert _worker_schema_name("worker1") is None + + def test_partial_match_returns_none(self): + """Require a full worker ID match before creating a schema name.""" + # "gw0extra" should not match the fullmatch pattern + assert _worker_schema_name("gw0extra") is None + + +class TestBuildWorkerUrl: + def test_url_without_options(self): + """Add a search path option when the URL has no existing options.""" + url = "postgresql://user:pass@localhost/testdb" + result = _build_worker_url(url, "test_worker_0") + assert "options" in result + assert "search_path%3Dtest_worker_0" in result or "search_path=test_worker_0" in result + + def test_url_with_existing_options_appends(self): + """Append the worker search path to existing connection options.""" + url = "postgresql://user:pass@localhost/testdb?options=-cstatement_timeout%3D5000" + result = _build_worker_url(url, "test_worker_1") + # The new search_path must be present + assert "search_path" in result + assert "test_worker_1" in result + # The original option must still be present + assert "statement_timeout" in result + + def test_url_encoded_roundtrip(self): + """Preserve existing URL-encoded options while adding the worker search path.""" + url = "postgresql://user:pass@localhost/testdb?options=-cwork_mem%3D64MB" + result = _build_worker_url(url, "test_worker_2") + # Scheme, host, and path must be preserved + assert result.startswith("postgresql://user:pass@localhost/testdb") + assert "test_worker_2" in result + assert "work_mem" in result