Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions examples/benchmark_perf/benchmark_workers_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def main():
print("\n[1/2] Loading MIMIC-IV base dataset...")
dataset_start = time.time()

base_cache_dir = f"{cache_root}/base_dataset"
base_dataset = MIMIC4Dataset(
ehr_root="/srv/local/data/physionet.org/files/mimiciv/2.2/",
ehr_tables=[
Expand All @@ -127,7 +128,7 @@ def main():
"labevents",
],
dev=dev,
cache_dir=f"{cache_root}/base_dataset",
cache_dir=base_cache_dir,
)

dataset_time = time.time() - dataset_start
Expand All @@ -140,20 +141,17 @@ def main():
sample_dataset = base_dataset.set_task(
MortalityPredictionStageNetMIMIC4(),
num_workers=1,
cache_dir=f"{cache_root}/task_samples",
)

task_time = time.time() - task_start
print(f"✓ Task processing completed in {task_time:.2f} seconds")

# Measure cache sizes
print("\n[3/3] Measuring cache sizes...")
base_cache_dir = f"{cache_root}/base_dataset"
task_cache_dir = f"{cache_root}/task_samples"

base_cache_size = get_directory_size(base_cache_dir)
task_cache_size = get_directory_size(task_cache_dir)
total_cache_size = base_cache_size + task_cache_size
total_cache_size = get_directory_size(base_cache_dir)
task_cache_size = get_directory_size(f"{base_cache_dir}/tasks")
base_cache_size = total_cache_size - task_cache_size

print(f"✓ Base dataset cache: {format_size(base_cache_size)}")
print(f"✓ Task samples cache: {format_size(task_cache_size)}")
Expand Down
12 changes: 5 additions & 7 deletions examples/benchmark_perf/benchmark_workers_12.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def main():
print("\n[1/2] Loading MIMIC-IV base dataset...")
dataset_start = time.time()

base_cache_dir = f"{cache_root}/base_dataset"
base_dataset = MIMIC4Dataset(
ehr_root="/srv/local/data/physionet.org/files/mimiciv/2.2/",
ehr_tables=[
Expand All @@ -127,7 +128,7 @@ def main():
"labevents",
],
dev=dev,
cache_dir=f"{cache_root}/base_dataset",
cache_dir=base_cache_dir,
)

dataset_time = time.time() - dataset_start
Expand All @@ -140,20 +141,17 @@ def main():
sample_dataset = base_dataset.set_task(
MortalityPredictionStageNetMIMIC4(),
num_workers=12,
cache_dir=f"{cache_root}/task_samples",
)

task_time = time.time() - task_start
print(f"✓ Task processing completed in {task_time:.2f} seconds")

# Measure cache sizes
print("\n[3/3] Measuring cache sizes...")
base_cache_dir = f"{cache_root}/base_dataset"
task_cache_dir = f"{cache_root}/task_samples"

base_cache_size = get_directory_size(base_cache_dir)
task_cache_size = get_directory_size(task_cache_dir)
total_cache_size = base_cache_size + task_cache_size
total_cache_size = get_directory_size(base_cache_dir)
task_cache_size = get_directory_size(f"{base_cache_dir}/tasks")
base_cache_size = total_cache_size - task_cache_size

print(f"✓ Base dataset cache: {format_size(base_cache_size)}")
print(f"✓ Task samples cache: {format_size(task_cache_size)}")
Expand Down
12 changes: 5 additions & 7 deletions examples/benchmark_perf/benchmark_workers_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def main():
print("\n[1/2] Loading MIMIC-IV base dataset...")
dataset_start = time.time()

base_cache_dir = f"{cache_root}/base_dataset"
base_dataset = MIMIC4Dataset(
ehr_root="/srv/local/data/physionet.org/files/mimiciv/2.2/",
ehr_tables=[
Expand All @@ -127,7 +128,7 @@ def main():
"labevents",
],
dev=dev,
# cache_dir=f"{cache_root}/base_dataset",
cache_dir=base_cache_dir,
)

dataset_time = time.time() - dataset_start
Expand All @@ -140,20 +141,17 @@ def main():
sample_dataset = base_dataset.set_task(
MortalityPredictionStageNetMIMIC4(),
num_workers=4,
cache_dir=f"{cache_root}/task_samples_old",
)

task_time = time.time() - task_start
print(f"✓ Task processing completed in {task_time:.2f} seconds")

# Measure cache sizes
print("\n[3/3] Measuring cache sizes...")
base_cache_dir = f"{cache_root}/base_dataset"
task_cache_dir = f"{cache_root}/task_samples"

base_cache_size = get_directory_size(base_cache_dir)
task_cache_size = get_directory_size(task_cache_dir)
total_cache_size = base_cache_size + task_cache_size
total_cache_size = get_directory_size(base_cache_dir)
task_cache_size = get_directory_size(f"{base_cache_dir}/tasks")
base_cache_size = total_cache_size - task_cache_size

print(f"✓ Base dataset cache: {format_size(base_cache_size)}")
print(f"✓ Task samples cache: {format_size(task_cache_size)}")
Expand Down
15 changes: 2 additions & 13 deletions examples/benchmark_perf/benchmark_workers_n.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,6 @@ def parse_workers(value: str) -> list[int]:
return workers


def ensure_empty_dir(path: str | Path) -> None:
p = Path(path)
if p.exists():
shutil.rmtree(p)
p.mkdir(parents=True, exist_ok=True)


def remove_dir(path: str | Path, retries: int = 3, delay: float = 1.0) -> None:
"""Remove a directory with retry logic for busy file handles."""
p = Path(path)
Expand Down Expand Up @@ -282,11 +275,8 @@ def main() -> None:
print("\n[1/1] Sweeping num_workers (each run reloads dataset + task)...")
for w in args.workers:
for r in range(args.repeats):
task_cache_dir = cache_root / f"task_samples_w{w}"

# Ensure no cache artifacts before this run.
remove_dir(base_cache_dir)
ensure_empty_dir(task_cache_dir)

tracker.reset()
run_start = time.time()
Expand All @@ -311,13 +301,13 @@ def main() -> None:
sample_dataset = base_dataset.set_task(
MortalityPredictionStageNetMIMIC4(),
num_workers=w,
cache_dir=str(task_cache_dir),
)

task_process_s = time.time() - task_start
total_s = time.time() - run_start
peak_rss_bytes = tracker.peak_bytes()
task_cache_bytes = get_directory_size(task_cache_dir)
tasks_dir = base_cache_dir / "tasks"
task_cache_bytes = get_directory_size(tasks_dir)

# Capture sample count BEFORE cleaning up the cache (litdata needs it).
num_samples = len(sample_dataset)
Expand All @@ -327,7 +317,6 @@ def main() -> None:
del base_dataset

# Clean up to avoid disk growth across an overnight sweep.
remove_dir(task_cache_dir)
remove_dir(base_cache_dir)

results.append(
Expand Down
15 changes: 2 additions & 13 deletions examples/benchmark_perf/benchmark_workers_n_drug_recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,6 @@ def parse_workers(value: str) -> list[int]:
return workers


def ensure_empty_dir(path: str | Path) -> None:
p = Path(path)
if p.exists():
shutil.rmtree(p)
p.mkdir(parents=True, exist_ok=True)


def remove_dir(path: str | Path, retries: int = 3, delay: float = 1.0) -> None:
"""Remove a directory with retry logic for busy file handles."""
p = Path(path)
Expand Down Expand Up @@ -284,11 +277,8 @@ def main() -> None:
print("\n[1/1] Sweeping num_workers (each run reloads dataset + task)...")
for w in args.workers:
for r in range(args.repeats):
task_cache_dir = cache_root / f"task_samples_drug_rec_w{w}"

# Ensure no cache artifacts before this run.
remove_dir(base_cache_dir)
ensure_empty_dir(task_cache_dir)

tracker.reset()
run_start = time.time()
Expand All @@ -313,13 +303,13 @@ def main() -> None:
sample_dataset = base_dataset.set_task(
DrugRecommendationMIMIC4(),
num_workers=w,
cache_dir=str(task_cache_dir),
)

task_process_s = time.time() - task_start
total_s = time.time() - run_start
peak_rss_bytes = tracker.peak_bytes()
task_cache_bytes = get_directory_size(task_cache_dir)
tasks_dir = base_cache_dir / "tasks"
task_cache_bytes = get_directory_size(tasks_dir)

# Capture sample count BEFORE cleaning up the cache (litdata needs it).
num_samples = len(sample_dataset)
Expand All @@ -329,7 +319,6 @@ def main() -> None:
del base_dataset

# Clean up to avoid disk growth across an overnight sweep.
remove_dir(task_cache_dir)
remove_dir(base_cache_dir)

results.append(
Expand Down
15 changes: 2 additions & 13 deletions examples/benchmark_perf/benchmark_workers_n_length_of_stay.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,6 @@ def parse_workers(value: str) -> list[int]:
return workers


def ensure_empty_dir(path: str | Path) -> None:
p = Path(path)
if p.exists():
shutil.rmtree(p)
p.mkdir(parents=True, exist_ok=True)


def remove_dir(path: str | Path, retries: int = 3, delay: float = 1.0) -> None:
"""Remove a directory with retry logic for busy file handles."""
p = Path(path)
Expand Down Expand Up @@ -284,11 +277,8 @@ def main() -> None:
print("\n[1/1] Sweeping num_workers (each run reloads dataset + task)...")
for w in args.workers:
for r in range(args.repeats):
task_cache_dir = cache_root / f"task_samples_los_w{w}"

# Ensure no cache artifacts before this run.
remove_dir(base_cache_dir)
ensure_empty_dir(task_cache_dir)

tracker.reset()
run_start = time.time()
Expand All @@ -313,13 +303,13 @@ def main() -> None:
sample_dataset = base_dataset.set_task(
LengthOfStayPredictionMIMIC4(),
num_workers=w,
cache_dir=str(task_cache_dir),
)

task_process_s = time.time() - task_start
total_s = time.time() - run_start
peak_rss_bytes = tracker.peak_bytes()
task_cache_bytes = get_directory_size(task_cache_dir)
tasks_dir = base_cache_dir / "tasks"
task_cache_bytes = get_directory_size(tasks_dir)

# Capture sample count BEFORE cleaning up the cache (litdata needs it).
num_samples = len(sample_dataset)
Expand All @@ -329,7 +319,6 @@ def main() -> None:
del base_dataset

# Clean up to avoid disk growth across an overnight sweep.
remove_dir(task_cache_dir)
remove_dir(base_cache_dir)

results.append(
Expand Down
Loading