Skip to content

Commit fb62b7d

Browse files
fix: add greater error handling around polars and duckdb csv reader
1 parent 65dbb8e commit fb62b7d

4 files changed

Lines changed: 121 additions & 11 deletions

File tree

src/dve/core_engine/backends/exceptions.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,33 @@ def __init__(self, *args: object, messages: Messages) -> None:
3333
"""The messages to be returned as part of the error."""
3434

3535

36+
class UnableToParseCSVError(MessageBearingError):
37+
"""An error raised when unable to parse a CSV file"""
38+
39+
def __init__(
40+
self,
41+
entity_name: str,
42+
field_check_error_message: str,
43+
field_check_error_code: str
44+
):
45+
super().__init__(
46+
messages=[
47+
FeedbackMessage(
48+
entity="csv_structure",
49+
record={
50+
entity_name: "Unable to parse file. Please check the structure of the file."
51+
},
52+
failure_type="submission",
53+
is_informational=False,
54+
error_type="csv read",
55+
error_location=entity_name,
56+
error_message=field_check_error_message,
57+
error_code=field_check_error_code,
58+
)
59+
]
60+
)
61+
62+
3663
class BackendErrorMixin(ABC, BackendError):
3764
"""A mixin used to create backend error type."""
3865

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,21 @@
66

77
import duckdb as ddb
88
import polars as pl
9-
from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression, read_csv
9+
from duckdb import (
10+
DuckDBPyConnection,
11+
DuckDBPyRelation,
12+
InvalidInputException,
13+
StarExpression,
14+
read_csv,
15+
)
1016
from pydantic import BaseModel
1117

1218
from dve.core_engine.backends.base.reader import read_function
13-
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
19+
from dve.core_engine.backends.exceptions import (
20+
EmptyFileError,
21+
MessageBearingError,
22+
UnableToParseCSVError,
23+
)
1424
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
1525
duckdb_record_index,
1626
duckdb_write_parquet,
@@ -107,7 +117,14 @@ def read_to_relation( # pylint: disable=unused-argument
107117

108118
reader_options["columns"] = ddb_schema
109119

110-
rel = self.add_record_index(read_csv(resource, **reader_options, parallel=False))
120+
try:
121+
rel = self.add_record_index(read_csv(resource, **reader_options, parallel=False))
122+
except InvalidInputException as exc:
123+
raise UnableToParseCSVError(
124+
entity_name="csv_structure",
125+
field_check_error_message=self.field_check_error_message,
126+
field_check_error_code=self.field_check_error_code,
127+
) from exc
111128

112129
if self.null_empty_strings:
113130
cleaned_cols = ",".join(
@@ -156,11 +173,18 @@ def read_to_relation( # pylint: disable=unused-argument
156173

157174
# there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85
158175
# redundant
159-
df = self.add_record_index( # pylint: disable=W0612
160-
pl.scan_csv(resource, **reader_options).select( # type: ignore
161-
list(polars_types.keys())
176+
try:
177+
df = self.add_record_index( # pylint: disable=W0612
178+
pl.scan_csv(resource, **reader_options).select( # type: ignore
179+
list(polars_types.keys())
180+
)
162181
)
163-
)
182+
except pl.exceptions.PolarsError as exc:
183+
raise UnableToParseCSVError(
184+
entity_name="csv_structure",
185+
field_check_error_message=self.field_check_error_message,
186+
field_check_error_code=self.field_check_error_code,
187+
) from exc
164188

165189
if self.null_empty_strings:
166190
pl_exprs = [
@@ -170,7 +194,16 @@ def read_to_relation( # pylint: disable=unused-argument
170194
] + [pl.col(RECORD_INDEX_COLUMN_NAME)]
171195
df = df.select(pl_exprs)
172196

173-
return self._connection.sql("SELECT * FROM df")
197+
entity = self._connection.sql("SELECT * FROM df")
198+
199+
if entity.pl().shape[0] == 0:
200+
raise UnableToParseCSVError(
201+
entity_name="csv_structure",
202+
field_check_error_message=self.field_check_error_message,
203+
field_check_error_code=self.field_check_error_code,
204+
)
205+
206+
return entity
174207

175208

176209
class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):

tests/test_core_engine/test_backends/test_readers/test_csv.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@
55
import csv
66
from pathlib import Path
77
from typing import Dict, Iterator, Optional
8+
from uuid import uuid4
89

910
import pandas as pd
1011
import pytest
1112
from pydantic import BaseModel
1213

13-
from dve.core_engine.backends.exceptions import EmptyFileError, FieldCountMismatch, MessageBearingError
14+
from dve.core_engine.backends.exceptions import (
15+
EmptyFileError,
16+
FieldCountMismatch,
17+
MessageBearingError,
18+
UnableToParseCSVError,
19+
)
1420
from dve.core_engine.backends.readers import CSVFileReader
1521
from dve.core_engine.backends.readers.utilities import get_all_model_fields
1622
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME

tests/test_core_engine/test_backends/test_readers/test_duckdb/test_ddb_csv.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@
22
from datetime import date, datetime
33
from pathlib import Path
44
from tempfile import TemporaryDirectory
5+
from uuid import uuid4
56

67
import duckdb
78
import polars as pl
89
import pytest
910
from duckdb import DuckDBPyRelation
1011
from pydantic import BaseModel
1112

12-
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
13+
from dve.core_engine.backends.exceptions import (
14+
EmptyFileError,
15+
MessageBearingError,
16+
UnableToParseCSVError,
17+
)
1318
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
1419
get_duckdb_type_from_annotation,
1520
)
@@ -21,7 +26,7 @@
2126
from dve.core_engine.backends.utilities import stringify_model
2227
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
2328

24-
# pylint: disable=C0115,C0116,W0621
29+
# pylint: disable=C0103,C0115,C0116,W0621
2530

2631

2732
class SimpleModel(BaseModel):
@@ -164,6 +169,25 @@ def test_DuckDBCSVReader_with_null_empty_strings(self, temp_dir):
164169
assert entity.shape[0] == 3
165170
assert entity.filter("test_col IS NULL").shape[0] == 2
166171

172+
def test_DuckDBCSVReader_with_malformed_header(self, temp_dir):
173+
test_data_headers = '"varchar_field,bigint_field,date_field,timestamp_field"'
174+
row_data = "hello,1,2023-04-01,2023-04-01T12:30:00"
175+
temp_id = uuid4().hex
176+
fqp = Path(temp_dir, f"{temp_id}.csv")
177+
178+
with open(fqp, mode="w", encoding="utf-8") as f:
179+
f.write(f"{test_data_headers}\n{row_data}")
180+
181+
reader = DuckDBCSVReader(
182+
header=True,
183+
delim=",",
184+
connection=duckdb.connect(),
185+
)
186+
187+
with pytest.raises(UnableToParseCSVError) as err:
188+
reader.read_to_relation(fqp.as_posix(), "test", SimpleModel)
189+
assert len(err.messages) == 1
190+
167191

168192
class TestPolarsToDuckDBCSVReader:
169193
"""Test PolarsToDuckDBCSVReader"""
@@ -199,6 +223,26 @@ def test_PolarsToDuckDBCSVReader_with_null_empty_strings(self, temp_dir):
199223
assert entity.shape[0] == 3
200224
assert entity.filter("test_col IS NULL").shape[0] == 2
201225

226+
def test_PolarsToDuckDBCSVReader_with_malformed_header(self, temp_dir):
227+
test_data_headers = '"varchar_field,bigint_field,date_field,timestamp_field"'
228+
row_data = "hello,1,2023-04-01,2023-04-01T12:30:00"
229+
temp_id = uuid4().hex
230+
fqp = Path(temp_dir, f"{temp_id}.csv")
231+
232+
with open(fqp, mode="w", encoding="utf-8") as f:
233+
f.write(f"{test_data_headers}\n{row_data}")
234+
235+
reader = PolarsToDuckDBCSVReader(
236+
header=True,
237+
delim=",",
238+
connection=duckdb.connect(),
239+
)
240+
241+
with pytest.raises(UnableToParseCSVError) as err:
242+
reader.read_to_relation(fqp.as_posix(), "test", SimpleModel)
243+
assert len(err.messages) == 1
244+
245+
202246
class TestDuckDBCSVRepeatingHeaderReader:
203247
"""Test DuckDBCSVRepeatingHeaderReader"""
204248

0 commit comments

Comments
 (0)