Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 157 additions & 1 deletion documentation/components/core/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,160 @@ $dataFrame->partitionBy('id'); // If IDs are unique, creates many tiny partition

// Good partitioning - moderate cardinality
$dataFrame->partitionBy('department'); // Assuming reasonable number of departments
```
```

## Save Modes with Partitioning

When writing partitioned data, the save mode determines how existing partition directories are handled.

### Overwrite Mode

```php
<?php

use function Flow\ETL\DSL\{data_frame, from_array, overwrite, ref};
use function Flow\ETL\Adapter\CSV\to_csv;

data_frame()
->read(from_array([
['date' => '2024-01-01', 'amount' => 100],
['date' => '2024-01-02', 'amount' => 200],
]))
->partitionBy(ref('date'))
->mode(overwrite())
->write(to_csv(__DIR__ . '/output/sales.csv'))
->run();
```

**Behavior:**
- Removes ALL files within partition directories being written to
- Partitions NOT in the current dataset are preserved
- Running twice with the same partition values replaces the first write completely

**Common pitfall:** If you write two separate DataFrames to the same partitions using `overwrite()`, the second write deletes data from the first:

```php
<?php

// First write - creates date=2024-01-01/sales.csv with amount=100
data_frame()
->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
->partitionBy(ref('date'))
->mode(overwrite())
->write(to_csv(__DIR__ . '/output/sales.csv'))
->run();

// Second write - DELETES the 100 and writes 200
data_frame()
->read(from_array([['date' => '2024-01-01', 'amount' => 200]]))
->partitionBy(ref('date'))
->mode(overwrite())
->write(to_csv(__DIR__ . '/output/sales.csv'))
->run();

// Result: date=2024-01-01/sales.csv contains ONLY amount=200
```

To combine data from multiple sources into the same partition, use `append()` mode or merge data before writing.

### Append Mode

```php
<?php

data_frame()
->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
->partitionBy(ref('date'))
->mode(append())
->write(to_csv(__DIR__ . '/output/sales.csv'))
->run();
```

**Behavior:**
- Creates new files with randomized suffixes in existing partition directories
- Does not remove existing files
- Multiple runs accumulate files (may cause duplicates)

### Ignore Mode

```php
<?php

data_frame()
->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
->partitionBy(ref('date'))
->mode(ignore())
->write(to_csv(__DIR__ . '/output/sales.csv'))
->run();
```

**Behavior:**
- Skips writing if partition directory already exists
- No error thrown, silently continues

### Exception If Exists Mode (Default)

```php
<?php

data_frame()
->read(from_array([['date' => '2024-01-01', 'amount' => 100]]))
->partitionBy(ref('date'))
->write(to_csv(__DIR__ . '/output/sales.csv')) // Default mode
->run();
```

**Behavior:**
- Throws `RuntimeException` if any partition path already exists
- Safest option to prevent accidental overwrites

## Reading Partitioned Data

Read partitioned data using glob patterns to match partition directories:

```php
<?php

use function Flow\ETL\Adapter\CSV\from_csv;
use function Flow\ETL\DSL\{data_frame, to_output};

data_frame()
->read(from_csv(__DIR__ . '/output/date=*/*.csv'))
->write(to_output())
->run();
```

### Partition Pruning

Skip entire partitions without reading their contents using `filterPartitions()`:

```php
<?php

data_frame()
->read(from_csv(__DIR__ . '/output/date=*/department=*/*.csv'))
->filterPartitions(ref('date')->greaterThanEqual(lit('2024-01-01')))
->write(to_output())
->run();
```

Unlike `filter()` which reads all data then discards non-matching rows, `filterPartitions()` evaluates partition metadata first and only reads matching partitions - significantly improving performance for large datasets.

### Path Partitions

Extract partition metadata without reading file contents using `from_path_partitions()`:

```php
<?php

use function Flow\ETL\DSL\{data_frame, from_path_partitions, to_output};

data_frame()
->read(from_path_partitions(__DIR__ . '/output/date=*/department=*/*.csv'))
->write(to_output())
->run();

// Output includes 'path' and 'partitions' columns
```

Useful for discovering available partitions or building file manifests before processing data.
9 changes: 7 additions & 2 deletions src/core/etl/src/Flow/ETL/Filesystem/FilesystemStreams.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ public function closeStreams(Path $path) : void

if ($this->saveMode === SaveMode::Overwrite) {
if ($fileStream->path()->partitions()->count()) {
$partitionFilesPatter = \Flow\Filesystem\DSL\path($fileStream->path()->parentDirectory()->uri() . '/*', $fileStream->path()->options());
$filename = \str_replace(self::FLOW_TMP_FILE_PREFIX, '', $fileStream->path()->filename());

foreach ($fs->list($partitionFilesPatter) as $partitionFile) {
$partitionFilesPattern = \Flow\Filesystem\DSL\path(
$fileStream->path()->parentDirectory()->uri() . '/' . $filename . '*.' . $fileStream->path()->extension(),
$fileStream->path()->options()
);

foreach ($fs->list($partitionFilesPattern) as $partitionFile) {
if (\str_contains($partitionFile->path->path(), self::FLOW_TMP_FILE_PREFIX)) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Tests\Integration\Filesystem\FilesystemStreams\Partitioned;

use function Flow\ETL\DSL\overwrite;
use function Flow\Filesystem\DSL\path;
use Flow\ETL\Filesystem\FilesystemStreams;
use Flow\ETL\Tests\Integration\Filesystem\FilesystemStreams\FilesystemStreamsTestCase;
use Flow\Filesystem\Partition;

final class OverwriteMultipleFilesTest extends FilesystemStreamsTestCase
{
#[\Override]
protected function tearDown() : void
{
parent::tearDown();
$this->cleanFiles();
}

public function test_multiple_writes_to_same_partition_with_different_filenames() : void
{
$this->setupFiles([
__FUNCTION__ => [],
]);

$salesStreams = $this->streams();
$salesFile = $this->getPath(__FUNCTION__ . '/sales.csv');
$salesStream = $salesStreams->writeTo($salesFile, partitions: [new Partition('partition', 'value')]);
$salesStream->append('sales data');
$salesStreams->closeStreams($salesFile);

$ordersStreams = $this->streams();
$ordersFile = $this->getPath(__FUNCTION__ . '/orders.csv');
$ordersStream = $ordersStreams->writeTo($ordersFile, partitions: [new Partition('partition', 'value')]);
$ordersStream->append('orders data');
$ordersStreams->closeStreams($ordersFile);

$files = \iterator_to_array($this->fs()->list(path($this->filesDirectory() . '/' . __FUNCTION__ . '/partition=value/*')));

self::assertCount(2, $files);

$basenames = \array_map(static fn ($file) => $file->path->basename(), $files);
\sort($basenames);

self::assertSame(['orders.csv', 'sales.csv'], $basenames);

$contentByBasename = [];

foreach ($files as $file) {
$contentByBasename[$file->path->basename()] = \file_get_contents($file->path->path());
}

self::assertSame('sales data', $contentByBasename['sales.csv']);
self::assertSame('orders data', $contentByBasename['orders.csv']);
}

public function test_overwrite_cleans_up_randomized_files_with_same_basename() : void
{
$streams = $this->streams();

$this->setupFiles([
__FUNCTION__ => [
'partition=value' => [
'file_abc123.csv' => 'randomized file content',
'file.csv' => 'original file content',
],
],
]);

$file = $this->getPath(__FUNCTION__ . '/file.csv');
$fileStream = $streams->writeTo($file, partitions: [new Partition('partition', 'value')]);
$fileStream->append('overwritten content');
$streams->closeStreams($file);

$files = \iterator_to_array($this->fs()->list(path($this->filesDirectory() . '/' . __FUNCTION__ . '/partition=value/*')));

self::assertCount(1, $files);
self::assertSame('file.csv', $files[0]->path->basename());
self::assertSame('overwritten content', \file_get_contents($files[0]->path->path()));
}

public function test_overwrite_does_not_delete_files_with_different_basename() : void
{
$streams = $this->streams();

$this->setupFiles([
__FUNCTION__ => [
'partition=value' => [
'sales.csv' => 'sales data',
],
],
]);

$ordersFile = $this->getPath(__FUNCTION__ . '/orders.csv');
$ordersStream = $streams->writeTo($ordersFile, partitions: [new Partition('partition', 'value')]);
$ordersStream->append('orders data');
$streams->closeStreams($ordersFile);

$files = \iterator_to_array($this->fs()->list(path($this->filesDirectory() . '/' . __FUNCTION__ . '/partition=value/*')));

self::assertCount(2, $files);

$basenames = \array_map(static fn ($file) => $file->path->basename(), $files);
\sort($basenames);

self::assertSame(['orders.csv', 'sales.csv'], $basenames);
}

public function test_overwrite_replaces_file_with_same_basename() : void
{
$streams = $this->streams();

$this->setupFiles([
__FUNCTION__ => [
'partition=value' => [
'file.csv' => 'old content',
],
],
]);

$file = $this->getPath(__FUNCTION__ . '/file.csv');
$fileStream = $streams->writeTo($file, partitions: [new Partition('partition', 'value')]);
$fileStream->append('new content');
$streams->closeStreams($file);

$files = \iterator_to_array($this->fs()->list(path($this->filesDirectory() . '/' . __FUNCTION__ . '/partition=value/*')));

self::assertCount(1, $files);
self::assertSame('file.csv', $files[0]->path->basename());
self::assertSame('new content', \file_get_contents($files[0]->path->path()));
}

protected function streams() : FilesystemStreams
{
$streams = new FilesystemStreams($this->fstab());
$streams->setMode(overwrite());

return $streams;
}
}
2 changes: 1 addition & 1 deletion web/landing/.php-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
8.3
8.3.30
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Skip entire partitions without reading their data using filterPartitions(). Unlike filter() which reads all data then filters, partition pruning evaluates metadata first and only reads matching partitions - dramatically improving performance for large datasets.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
priority: 2
hidden: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\{append, data_frame, from_array, ref};

require __DIR__ . '/vendor/autoload.php';

data_frame()
->read(from_array(
[
['id' => 1, 'color' => 'red', 'sku' => 'PRODUCT01'],
['id' => 2, 'color' => 'red', 'sku' => 'PRODUCT02'],
['id' => 3, 'color' => 'red', 'sku' => 'PRODUCT03'],
['id' => 4, 'color' => 'green', 'sku' => 'PRODUCT01'],
['id' => 5, 'color' => 'green', 'sku' => 'PRODUCT02'],
['id' => 6, 'color' => 'green', 'sku' => 'PRODUCT03'],
['id' => 7, 'color' => 'blue', 'sku' => 'PRODUCT01'],
['id' => 8, 'color' => 'blue', 'sku' => 'PRODUCT02'],
]
))
->partitionBy(ref('color'), ref('sku'))
->mode(append())
->write(to_csv(__DIR__ . '/output/products.csv'))
->run();
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Write partitioned data with append mode. New files with randomized suffixes are created in partition directories without removing existing data. Useful for incremental updates.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- [Partitioning](/documentation/components/core/partitioning)
- [Save Mode](/documentation/components/core/save-mode)
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
- [Partitioning](/documentation/components/core/partitioning)
- [Save Mode](/documentation/components/core/save-mode)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
priority: 4
hidden: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

use function Flow\ETL\Adapter\CSV\to_csv;
use function Flow\ETL\DSL\{data_frame, exception_if_exists, from_array, ref};

require __DIR__ . '/vendor/autoload.php';

data_frame()
->read(from_array(
[
['id' => 1, 'color' => 'red', 'sku' => 'PRODUCT01'],
['id' => 2, 'color' => 'red', 'sku' => 'PRODUCT02'],
['id' => 3, 'color' => 'red', 'sku' => 'PRODUCT03'],
['id' => 4, 'color' => 'green', 'sku' => 'PRODUCT01'],
['id' => 5, 'color' => 'green', 'sku' => 'PRODUCT02'],
['id' => 6, 'color' => 'green', 'sku' => 'PRODUCT03'],
['id' => 7, 'color' => 'blue', 'sku' => 'PRODUCT01'],
['id' => 8, 'color' => 'blue', 'sku' => 'PRODUCT02'],
]
))
->partitionBy(ref('color'), ref('sku'))
->mode(exception_if_exists())
->write(to_csv(__DIR__ . '/output/products.csv'))
->run();
Loading
Loading