From 1596ed905cac26f32531e7508bbedaaa63b5d2c2 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Tue, 21 Apr 2026 10:22:34 +0200 Subject: [PATCH] fix: Prevent `iterate_items` from terminating early under server-side filters When `skip_empty`, `skip_hidden`, or `clean=True` is used, the API post-filters each scanned window, so a page with fewer than 1000 items does not imply the dataset is exhausted. Track the scanned window explicitly, advance the offset by the scanned size (not the returned count), and terminate only on an empty response. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/apify_client/_resource_clients/dataset.py | 32 ++++++---- tests/unit/test_dataset_list_items.py | 58 ++++++++++++++++++- 2 files changed, 77 insertions(+), 13 deletions(-) diff --git a/src/apify_client/_resource_clients/dataset.py b/src/apify_client/_resource_clients/dataset.py index f4e0e204..bb07bbd5 100644 --- a/src/apify_client/_resource_clients/dataset.py +++ b/src/apify_client/_resource_clients/dataset.py @@ -268,13 +268,16 @@ def iterate_items( """ cache_size = 1000 - should_finish = False read_items = 0 + # Server-side filters (skip_empty, skip_hidden, clean) drop items after the [offset, offset+limit) + # window is scanned, so a short page does not imply the dataset is exhausted. Track the scanned + # window explicitly and stop only when the server returns no items at all. + scanned_items = 0 # We can't rely on DatasetItemsPage.total because that is updated with a delay, # so if you try to read the dataset items right after a run finishes, you could miss some. # Instead, we just read and read until we reach the limit, or until there are no more items to read. - while not should_finish: + while True: effective_limit = cache_size if limit is not None: if read_items == limit: @@ -282,7 +285,7 @@ def iterate_items( effective_limit = min(cache_size, limit - read_items) current_items_page = self.list_items( - offset=offset + read_items, + offset=offset + scanned_items, limit=effective_limit, clean=clean, desc=desc, @@ -298,10 +301,11 @@ def iterate_items( yield from current_items_page.items current_page_item_count = len(current_items_page.items) - read_items += current_page_item_count + if current_page_item_count == 0: + break - if current_page_item_count < cache_size: - should_finish = True + read_items += current_page_item_count + scanned_items += effective_limit def download_items( self, @@ -945,13 +949,16 @@ async def iterate_items( """ cache_size = 1000 - should_finish = False read_items = 0 + # Server-side filters (skip_empty, skip_hidden, clean) drop items after the [offset, offset+limit) + # window is scanned, so a short page does not imply the dataset is exhausted. Track the scanned + # window explicitly and stop only when the server returns no items at all. + scanned_items = 0 # We can't rely on DatasetItemsPage.total because that is updated with a delay, # so if you try to read the dataset items right after a run finishes, you could miss some. # Instead, we just read and read until we reach the limit, or until there are no more items to read. - while not should_finish: + while True: effective_limit = cache_size if limit is not None: if read_items == limit: @@ -959,7 +966,7 @@ async def iterate_items( effective_limit = min(cache_size, limit - read_items) current_items_page = await self.list_items( - offset=offset + read_items, + offset=offset + scanned_items, limit=effective_limit, clean=clean, desc=desc, @@ -976,10 +983,11 @@ async def iterate_items( yield item current_page_item_count = len(current_items_page.items) - read_items += current_page_item_count + if current_page_item_count == 0: + break - if current_page_item_count < cache_size: - should_finish = True + read_items += current_page_item_count + scanned_items += effective_limit async def get_items_as_bytes( self, diff --git a/tests/unit/test_dataset_list_items.py b/tests/unit/test_dataset_list_items.py index 19831a15..78b3356c 100644 --- a/tests/unit/test_dataset_list_items.py +++ b/tests/unit/test_dataset_list_items.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING import pytest -from werkzeug import Response +from werkzeug import Request, Response from apify_client import ApifyClient, ApifyClientAsync @@ -87,3 +87,59 @@ async def test_list_items_desc_true_async(httpserver: HTTPServer, desc_header_va result = await client.dataset(DATASET_ID).list_items() assert result.desc is True + + +def _make_filtered_pagination_handler(*, dataset_size: int) -> Callable: + """Simulate an API that scans `[offset, offset+limit)` then post-filters out odd-indexed items. + + Each returned page therefore contains ~half of the scanned window — a situation that + only `skip_empty`, `skip_hidden`, or `clean=True` can produce server-side. + """ + + def handler(request: Request) -> Response: + offset = int(request.args.get('offset', '0')) + limit = int(request.args.get('limit', '1000')) + scanned_end = min(offset + limit, dataset_size) + items = [{'i': i} for i in range(offset, scanned_end) if i % 2 == 0] + return Response( + status=200, + headers={ + 'x-apify-pagination-total': str(dataset_size), + 'x-apify-pagination-offset': str(offset), + 'x-apify-pagination-count': str(len(items)), + 'x-apify-pagination-limit': str(limit), + 'x-apify-pagination-desc': 'false', + 'content-type': 'application/json', + }, + response=json.dumps(items), + ) + + return handler + + +def test_iterate_items_with_filter_does_not_terminate_early_sync(httpserver: HTTPServer) -> None: + dataset_size = 2500 + httpserver.expect_request(ITEMS_PATH).respond_with_handler( + _make_filtered_pagination_handler(dataset_size=dataset_size), + ) + api_url = httpserver.url_for('/').removesuffix('/') + + client = ApifyClient(token='test-token', api_url=api_url) + items = list(client.dataset(DATASET_ID).iterate_items(skip_empty=True)) + + expected = [{'i': i} for i in range(dataset_size) if i % 2 == 0] + assert items == expected + + +async def test_iterate_items_with_filter_does_not_terminate_early_async(httpserver: HTTPServer) -> None: + dataset_size = 2500 + httpserver.expect_request(ITEMS_PATH).respond_with_handler( + _make_filtered_pagination_handler(dataset_size=dataset_size), + ) + api_url = httpserver.url_for('/').removesuffix('/') + + client = ApifyClientAsync(token='test-token', api_url=api_url) + items = [item async for item in client.dataset(DATASET_ID).iterate_items(skip_empty=True)] + + expected = [{'i': i} for i in range(dataset_size) if i % 2 == 0] + assert items == expected