From f18b55032b3f5b10f432d836810807e6c1c20aba Mon Sep 17 00:00:00 2001 From: Norbert Orzechowicz Date: Fri, 6 Mar 2026 09:48:19 -0600 Subject: [PATCH] fix: readLines() dropping falsy values in S3 and Azure source streams array_filter() without callback was removing lines containing '0' or empty strings, causing schema detection to produce different results compared to local filesystem. --- .../Bridge/AsyncAWS/AsyncAWSS3SourceStream.php | 16 +++++++--------- .../Integration/AsyncAWSS3SourceStreamTest.php | 14 ++++++++++++++ .../Bridge/Azure/AzureBlobSourceStream.php | 16 +++++++--------- .../Integration/AzureBlobSourceStreamTest.php | 14 ++++++++++++++ 4 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/bridge/filesystem/async-aws/src/Flow/Filesystem/Bridge/AsyncAWS/AsyncAWSS3SourceStream.php b/src/bridge/filesystem/async-aws/src/Flow/Filesystem/Bridge/AsyncAWS/AsyncAWSS3SourceStream.php index b0acf3df64..dc8f4cea44 100644 --- a/src/bridge/filesystem/async-aws/src/Flow/Filesystem/Bridge/AsyncAWS/AsyncAWSS3SourceStream.php +++ b/src/bridge/filesystem/async-aws/src/Flow/Filesystem/Bridge/AsyncAWS/AsyncAWSS3SourceStream.php @@ -72,18 +72,16 @@ public function readLines(string $separator = "\n", ?int $length = null) : \Gene } if (\substr_count($content, $separator) > 1) { - /** - * @phpstan-ignore-next-line - */ - $lines = \array_filter(\explode($separator, $content)); + /** @phpstan-ignore argument.type */ + $lines = \explode($separator, $content); + + $lastIndex = \count($lines) - 1; - // Yield all lines except the last one - foreach (\array_slice($lines, 0, -1) as $line) { - yield $line; + for ($i = 0; $i < $lastIndex; $i++) { + yield $lines[$i]; } - // The last line is incomplete, so we need to keep it for the next iteration - $content = \end($lines); + $content = $lines[$lastIndex]; } elseif (\substr_count($content, $separator) === 1) { // Split the content by the separator /** diff --git a/src/bridge/filesystem/async-aws/tests/Flow/Filesystem/Bridge/AsyncAWS/Tests/Integration/AsyncAWSS3SourceStreamTest.php b/src/bridge/filesystem/async-aws/tests/Flow/Filesystem/Bridge/AsyncAWS/Tests/Integration/AsyncAWSS3SourceStreamTest.php index 3c8a1244d8..fc81e62749 100644 --- a/src/bridge/filesystem/async-aws/tests/Flow/Filesystem/Bridge/AsyncAWS/Tests/Integration/AsyncAWSS3SourceStreamTest.php +++ b/src/bridge/filesystem/async-aws/tests/Flow/Filesystem/Bridge/AsyncAWS/Tests/Integration/AsyncAWSS3SourceStreamTest.php @@ -85,4 +85,18 @@ public function test_reading_lines_from_blob(int $lineLength) : void $stream->close(); } + + public function test_reading_lines_with_falsy_values_preserves_all_lines() : void + { + $content = "header\n0\n\nvalue\n0\nlast"; + $this->givenFileExists(path('aws-s3://falsy.csv'), $content); + + $stream = aws_s3_filesystem($this->bucket(), $this->s3Client())->readFrom(path('aws-s3://falsy.csv')); + + $lines = \iterator_to_array($stream->readLines()); + + self::assertSame(['header', '0', '', 'value', '0', 'last'], $lines); + + $stream->close(); + } } diff --git a/src/bridge/filesystem/azure/src/Flow/Filesystem/Bridge/Azure/AzureBlobSourceStream.php b/src/bridge/filesystem/azure/src/Flow/Filesystem/Bridge/Azure/AzureBlobSourceStream.php index 74e687115a..c86afe8669 100644 --- a/src/bridge/filesystem/azure/src/Flow/Filesystem/Bridge/Azure/AzureBlobSourceStream.php +++ b/src/bridge/filesystem/azure/src/Flow/Filesystem/Bridge/Azure/AzureBlobSourceStream.php @@ -74,18 +74,16 @@ public function readLines(string $separator = "\n", ?int $length = null) : \Gene } if (\substr_count($content, $separator) > 1) { - /** - * @phpstan-ignore-next-line - */ - $lines = \array_filter(\explode($separator, $content)); + /** @phpstan-ignore argument.type */ + $lines = \explode($separator, $content); + + $lastIndex = \count($lines) - 1; - // Yield all lines except the last one - foreach (\array_slice($lines, 0, -1) as $line) { - yield $line; + for ($i = 0; $i < $lastIndex; $i++) { + yield $lines[$i]; } - // The last line is incomplete, so we need to keep it for the next iteration - $content = \end($lines); + $content = $lines[$lastIndex]; } elseif (\substr_count($content, $separator) === 1) { // Split the content by the separator /** diff --git a/src/bridge/filesystem/azure/tests/Flow/Filesystem/Bridge/Azure/Tests/Integration/AzureBlobSourceStreamTest.php b/src/bridge/filesystem/azure/tests/Flow/Filesystem/Bridge/Azure/Tests/Integration/AzureBlobSourceStreamTest.php index 29183d4d59..02d5fc6bda 100644 --- a/src/bridge/filesystem/azure/tests/Flow/Filesystem/Bridge/Azure/Tests/Integration/AzureBlobSourceStreamTest.php +++ b/src/bridge/filesystem/azure/tests/Flow/Filesystem/Bridge/Azure/Tests/Integration/AzureBlobSourceStreamTest.php @@ -85,4 +85,18 @@ public function test_reading_lines_from_blob(int $lineLength) : void $stream->close(); } + + public function test_reading_lines_with_falsy_values_preserves_all_lines() : void + { + $content = "header\n0\n\nvalue\n0\nlast"; + $this->givenFileExists('flow-php', 'falsy.csv', $content); + + $stream = azure_filesystem($this->blobService('flow-php'))->readFrom(path('azure-blob://falsy.csv')); + + $lines = \iterator_to_array($stream->readLines()); + + self::assertSame(['header', '0', '', 'value', '0', 'last'], $lines); + + $stream->close(); + } }