Skip to content

Commit 45dcc9a

Browse files
Merge branch 'main' of https://github.com/NHSDigital/data-validation-engine into build/pyspark_35
2 parents 2ae7fd1 + a6a0c36 commit 45dcc9a

30 files changed

Lines changed: 352 additions & 286 deletions

CHANGELOG.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,27 @@
1+
## v0.7.6 (2026-04-30)
2+
3+
### Fix
4+
5+
- ddb xml reader connection args consistent with other ddb readers
6+
- remove default connection from ddb json reader
7+
- remove default connection in csv reader causing hanging in multithreaded setups
8+
- configured refdata loader to be instantiated when required without class vars #99
9+
10+
### Refactor
11+
12+
- add in backend kwargs for readers to allow reader args not determinable at config write time to be passed
13+
14+
## v0.7.5 (2026-04-29)
15+
16+
### Fix
17+
18+
- configured refdata loader to be instantiated when required without need for class vars (#98)
19+
120
## v0.7.4 (2026-04-17)
221

322
### Fix
423

5-
- fix failures not being properly audited in foundry pipeline
24+
- fix failures not being properly audited in foundry pipeline (#94)
625

726
## v0.7.3 (2026-04-16)
827

poetry.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ Issues = "https://github.com/NHSDigital/data-validation-engine/issues"
2424
Changelog = "https://github.com/NHSDigital/data-validation-engine/blob/main/CHANGELOG.md"
2525

2626
[tool.poetry]
27-
version = "0.7.4"
27+
version = "0.7.6"
2828
packages = [
2929
{ include = "dve", from = "src" },
3030
]

src/dve/core_engine/backends/base/backend.py

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44
import warnings
55
from abc import ABC, abstractmethod
6-
from collections.abc import Mapping, MutableMapping
6+
from collections.abc import MutableMapping
77
from typing import Any, ClassVar, Generic, Optional
88

99
from pyspark.sql import DataFrame, SparkSession
@@ -41,14 +41,12 @@ def __init__( # pylint: disable=unused-argument
4141
self,
4242
contract: BaseDataContract[EntityType],
4343
steps: BaseStepImplementations[EntityType],
44-
reference_data_loader_type: Optional[type[BaseRefDataLoader[EntityType]]],
4544
logger: Optional[logging.Logger] = None,
4645
**kwargs: Any,
4746
) -> None:
4847
for component_name, component in (
4948
("Contract", contract),
5049
("Step implementation", steps),
51-
("Reference data loader", reference_data_loader_type),
5250
):
5351
component_entity_type = getattr(component, "__entity_type__", None)
5452
if component_entity_type != self.__entity_type__:
@@ -61,42 +59,16 @@ def __init__( # pylint: disable=unused-argument
6159
"""The data contract implementation used by the backend."""
6260
self.step_implementations = steps
6361
"""The step implementations used by the backend."""
64-
self.reference_data_loader_type = reference_data_loader_type
65-
"""
66-
The loader type to use for the reference data. If `None`, do not
67-
load any reference data and error if it is provided.
68-
69-
"""
7062
self.logger = logger or get_logger(type(self).__name__)
7163
"""The `logging.Logger instance for the backend."""
7264

7365
def load_reference_data(
7466
self,
7567
reference_entity_config: dict[EntityName, ReferenceConfigUnion],
7668
submission_info: Optional[SubmissionInfo],
77-
) -> Mapping[EntityName, EntityType]:
78-
"""Load the reference data as specified in the reference entity config."""
79-
sub_info_entity: Optional[EntityType] = None
80-
if submission_info:
81-
sub_info_entity = self.convert_submission_info(submission_info)
82-
83-
if self.reference_data_loader_type is None:
84-
if reference_entity_config:
85-
raise ValueError(
86-
"Reference data has been specified but no reference data loader is "
87-
+ "configured for this backend"
88-
)
89-
90-
reference_data_dict = {}
91-
if sub_info_entity is not None:
92-
reference_data_dict["dve_submission_info"] = sub_info_entity
93-
return reference_data_dict
94-
95-
reference_data_loader = self.reference_data_loader_type(reference_entity_config)
96-
if sub_info_entity is not None:
97-
reference_data_loader.entity_cache["dve_submission_info"] = sub_info_entity
98-
99-
return reference_data_loader
69+
) -> BaseRefDataLoader[EntityType]:
70+
"""Supply configured reference data loader for use with business rules"""
71+
raise NotImplementedError()
10072

10173
@abstractmethod
10274
def convert_submission_info(self, submission_info: SubmissionInfo) -> EntityType:

src/dve/core_engine/backends/base/reference_data.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from dve.core_engine.backends.base.core import get_entity_type
1212
from dve.core_engine.backends.exceptions import (
1313
MissingRefDataEntity,
14+
NoRefDataConfigSupplied,
1415
RefdataLacksFileExtensionSupport,
1516
)
1617
from dve.core_engine.backends.types import EntityType
@@ -147,11 +148,11 @@ class variable for the subclass.
147148
# pylint: disable=unused-argument
148149
def __init__(
149150
self,
150-
reference_entity_config: dict[EntityName, ReferenceConfig],
151-
dataset_config_uri: Optional[URI] = None,
151+
reference_data_config: dict[EntityName, ReferenceConfig],
152+
dataset_config_uri: URI,
152153
**kwargs,
153154
) -> None:
154-
self.reference_entity_config = reference_entity_config
155+
self.reference_entity_config = reference_data_config
155156
self.dataset_config_uri = dataset_config_uri
156157
"""
157158
Configuration options for the reference data. This is likely to vary
@@ -207,6 +208,8 @@ def __getitem__(self, key: EntityName) -> EntityType:
207208
try:
208209
config = self.reference_entity_config[key]
209210
return self.load_entity(entity_name=key, config=config)
211+
except TypeError as err:
212+
raise NoRefDataConfigSupplied() from err
210213
except Exception as err:
211214
raise MissingRefDataEntity(entity_name=key) from err
212215

src/dve/core_engine/backends/exceptions.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,20 @@ def get_message_preamble(self) -> str:
119119
return f"Missing reference data entity {self.entity_name!r}"
120120

121121

122+
class NoRefDataConfigSupplied(BackendError):
123+
"""An error raised when trying to load a refdata entity when no refdata
124+
config has been supplied.
125+
126+
"""
127+
128+
def __init__(self, *args: object) -> None:
129+
super().__init__(*args)
130+
131+
def get_message_preamble(self) -> EntityName:
132+
"""Message for logging purposes"""
133+
return "Refdata loader not supplied with refdata config - unable to load refdata entities"
134+
135+
122136
class ConstraintError(ValueError, BackendErrorMixin):
123137
"""Raised when a given constraint is violated."""
124138

src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ def get_duckdb_cast_statement_from_annotation(
411411
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{date_regex}') THEN TRY_CAST(TRIM({quoted_name}) as DATE) ELSE NULL END" # pylint: disable=C0301
412412
return stmt
413413
if issubclass(type_, time):
414-
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{time_regex}') THEN TRY_CAST(TRIM({quoted_name}) as TIME) ELSE NULL END" # pylint: disable=C0301
414+
stmt = rf"CASE WHEN REGEXP_MATCHES(TRIM({quoted_name}), '{time_regex}') THEN TRY_CAST(TRIM({quoted_name}) as TIME) ELSE NULL END" # pylint: disable=C0301
415415
return stmt
416416
duck_type = get_duckdb_type_from_annotation(type_)
417417
if duck_type:

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,7 @@
66

77
import duckdb as ddb
88
import polars as pl
9-
from duckdb import (
10-
DuckDBPyConnection,
11-
DuckDBPyRelation,
12-
StarExpression,
13-
default_connection,
14-
read_csv,
15-
)
9+
from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression, read_csv
1610
from pydantic import BaseModel
1711

1812
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
@@ -61,7 +55,7 @@ def __init__(
6155
self.header = header
6256
self.delim = delim
6357
self.quotechar = quotechar
64-
self._connection = connection if connection else default_connection
58+
self._connection = connection if connection else ddb.connect(":memory:")
6559
self.field_check = field_check
6660
self.field_check_error_code = field_check_error_code
6761
self.field_check_error_message = field_check_error_message
@@ -181,7 +175,7 @@ def read_to_relation( # pylint: disable=unused-argument
181175
] + [pl.col(RECORD_INDEX_COLUMN_NAME)]
182176
df = df.select(pl_exprs)
183177

184-
return ddb.sql("SELECT * FROM df")
178+
return self._connection.sql("SELECT * FROM df")
185179

186180

187181
class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):

src/dve/core_engine/backends/implementations/duckdb/readers/json.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from collections.abc import Iterator
55
from typing import Any, Optional
66

7-
from duckdb import DuckDBPyRelation, read_json
7+
import duckdb
8+
from duckdb import DuckDBPyConnection, DuckDBPyRelation
89
from pydantic import BaseModel
910

1011
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
@@ -26,9 +27,11 @@ def __init__(
2627
self,
2728
*,
2829
json_format: Optional[str] = "array",
30+
connection: Optional[DuckDBPyConnection] = None,
2931
**_,
3032
):
3133
self._json_format = json_format
34+
self._connection = duckdb.connect(":memory:") if not connection else connection
3235

3336
super().__init__()
3437

@@ -50,5 +53,7 @@ def read_to_relation( # pylint: disable=unused-argument
5053
}
5154

5255
return self.add_record_index(
53-
read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore
56+
self._connection.read_json(
57+
resource, columns=ddb_schema, format=self._json_format # type: ignore
58+
)
5459
)

src/dve/core_engine/backends/implementations/duckdb/readers/xml.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
from typing import Optional
55

6+
import duckdb
67
import polars as pl
7-
from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection
8+
from duckdb import DuckDBPyConnection, DuckDBPyRelation
89
from pydantic import BaseModel
910

1011
from dve.core_engine.backends.base.reader import read_function
@@ -24,8 +25,8 @@
2425
class DuckDBXMLStreamReader(XMLStreamReader):
2526
"""A reader for XML files"""
2627

27-
def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
28-
self.ddb_connection = ddb_connection if ddb_connection else default_connection
28+
def __init__(self, *, connection: Optional[DuckDBPyConnection] = None, **kwargs):
29+
self._connection = connection if connection else duckdb.connect(":memory:")
2930
super().__init__(**kwargs)
3031

3132
@read_function(DuckDBPyRelation)
@@ -49,4 +50,4 @@ def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseMod
4950
data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
5051
)
5152
)
52-
return self.ddb_connection.sql("select * from _lazy_frame")
53+
return self._connection.sql("select * from _lazy_frame")

0 commit comments

Comments
 (0)