Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3886,6 +3886,16 @@ definitions:
type: array
items:
type: string
skip_rows_before_header:
title: Skip Rows Before Header
description: The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.
type: integer
default: 0
skip_rows_after_header:
title: Skip Rows After Header
description: The number of rows to skip after the header row.
type: integer
default: 0
AsyncJobStatusMap:
description: Matches the api job status to Async Job Status.
type: object
Expand Down
23 changes: 17 additions & 6 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,35 @@ class CsvParser(Parser):
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","
set_values_to_none: Optional[List[str]] = None
skip_rows_before_header: int = 0
skip_rows_after_header: int = 0

def _get_delimiter(self) -> Optional[str]:
"""
Get delimiter from the configuration. Check for the escape character and decode it.
"""
"""Get delimiter from the configuration. Check for the escape character and decode it."""
if self.delimiter is not None:
if self.delimiter.startswith("\\"):
self.delimiter = self.delimiter.encode("utf-8").decode("unicode_escape")

return self.delimiter

@staticmethod
def _skip_rows(text_data: io.TextIOBase, rows_to_skip: int) -> None:
"""Skip a specified number of rows from the current position in the text stream."""
for _ in range(rows_to_skip):
text_data.readline()

def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
"""
Parse CSV data from decompressed bytes.
"""
"""Parse CSV data from decompressed bytes."""
text_data = TextIOWrapper(data, encoding=self.encoding) # type: ignore
self._skip_rows(text_data, self.skip_rows_before_header)
reader = csv.DictReader(text_data, delimiter=self._get_delimiter() or ",")
# After DictReader reads the header (first row it encounters), skip additional rows
# We need to handle skip_rows_after_header by consuming rows from the reader
skipped_after_header = 0
for row in reader:
if skipped_after_header < self.skip_rows_after_header:
skipped_after_header += 1
continue
if self.set_values_to_none:
row = {k: (None if v in self.set_values_to_none else v) for k, v in row.items()}
yield row
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1200,6 +1198,16 @@ class CsvDecoder(BaseModel):
encoding: Optional[str] = "utf-8"
delimiter: Optional[str] = ","
set_values_to_none: Optional[List[str]] = None
skip_rows_before_header: Optional[int] = Field(
0,
description="The number of rows to skip before the header row. For example, if the header row is on the 3rd row, enter 2 in this field.",
title="Skip Rows Before Header",
)
skip_rows_after_header: Optional[int] = Field(
0,
description="The number of rows to skip after the header row.",
title="Skip Rows After Header",
)


class AsyncJobStatusMap(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2672,6 +2672,12 @@ def _get_parser(model: BaseModel, config: Config) -> Parser:
encoding=model.encoding,
delimiter=model.delimiter,
set_values_to_none=model.set_values_to_none,
skip_rows_before_header=model.skip_rows_before_header
if model.skip_rows_before_header
else 0,
skip_rows_after_header=model.skip_rows_after_header
if model.skip_rows_after_header
else 0,
)
elif isinstance(model, GzipDecoderModel):
return GzipParser(
Expand Down
77 changes: 77 additions & 0 deletions unit_tests/sources/declarative/decoders/test_composite_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,83 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response():
thread.join(timeout=5) # ensure thread is cleaned up


def _generate_csv_with_metadata(
metadata_lines: int = 0,
post_header_lines: int = 0,
encoding: str = "utf-8",
delimiter: str = ",",
) -> bytes:
"""Generate CSV bytes with metadata lines before and/or after the header."""
lines = []
for i in range(metadata_lines):
lines.append(f"# metadata line {i + 1}")
header = delimiter.join(["id", "name", "age"])
lines.append(header)
for i in range(post_header_lines):
lines.append(f"# post-header line {i + 1}")
lines.append(delimiter.join(["1", "John", "28"]))
lines.append(delimiter.join(["2", "Alice", "34"]))
return ("\n".join(lines) + "\n").encode(encoding)


@pytest.mark.parametrize(
"skip_before,skip_after,metadata_lines,post_header_lines,expected_count",
[
pytest.param(0, 0, 0, 0, 2, id="no_skip_rows_default_behavior"),
pytest.param(3, 0, 3, 0, 2, id="skip_3_rows_before_header"),
pytest.param(0, 2, 0, 2, 2, id="skip_2_rows_after_header"),
pytest.param(2, 1, 2, 1, 2, id="skip_both_before_and_after"),
pytest.param(1, 0, 1, 0, 2, id="skip_1_row_before_header"),
],
)
def test_csv_parser_skip_rows(
requests_mock,
skip_before: int,
skip_after: int,
metadata_lines: int,
post_header_lines: int,
expected_count: int,
):
"""Verify CsvParser skips metadata rows before and/or after the header."""
csv_content = _generate_csv_with_metadata(
metadata_lines=metadata_lines,
post_header_lines=post_header_lines,
)
requests_mock.register_uri("GET", "https://airbyte.io/", content=csv_content)
response = requests.get("https://airbyte.io/", stream=True)

parser = CsvParser(
skip_rows_before_header=skip_before,
skip_rows_after_header=skip_after,
)
composite_raw_decoder = CompositeRawDecoder(parser=parser)
parsed_records = list(composite_raw_decoder.decode(response))

assert len(parsed_records) == expected_count
assert parsed_records[0] == {"id": "1", "name": "John", "age": "28"}
assert parsed_records[1] == {"id": "2", "name": "Alice", "age": "34"}


def test_csv_parser_skip_rows_before_header_with_tsv(requests_mock):
"""Verify skip_rows_before_header works with TSV (tab-delimited) data, matching the reporter's use case."""
csv_content = _generate_csv_with_metadata(
metadata_lines=3,
delimiter="\t",
)
requests_mock.register_uri("GET", "https://airbyte.io/", content=csv_content)
response = requests.get("https://airbyte.io/", stream=True)

parser = CsvParser(
delimiter="\t",
skip_rows_before_header=3,
)
composite_raw_decoder = CompositeRawDecoder(parser=parser)
parsed_records = list(composite_raw_decoder.decode(response))

assert len(parsed_records) == 2
assert parsed_records[0] == {"id": "1", "name": "John", "age": "28"}


def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock):
requests_mock.register_uri(
"GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()
Expand Down