diff --git a/src/apify_client/_resource_clients/dataset.py b/src/apify_client/_resource_clients/dataset.py index f4e0e204..bb07bbd5 100644 --- a/src/apify_client/_resource_clients/dataset.py +++ b/src/apify_client/_resource_clients/dataset.py @@ -268,13 +268,16 @@ def iterate_items( """ cache_size = 1000 - should_finish = False read_items = 0 + # Server-side filters (skip_empty, skip_hidden, clean) drop items after the [offset, offset+limit) + # window is scanned, so a short page does not imply the dataset is exhausted. Track the scanned + # window explicitly and stop only when the server returns no items at all. + scanned_items = 0 # We can't rely on DatasetItemsPage.total because that is updated with a delay, # so if you try to read the dataset items right after a run finishes, you could miss some. # Instead, we just read and read until we reach the limit, or until there are no more items to read. - while not should_finish: + while True: effective_limit = cache_size if limit is not None: if read_items == limit: @@ -282,7 +285,7 @@ def iterate_items( effective_limit = min(cache_size, limit - read_items) current_items_page = self.list_items( - offset=offset + read_items, + offset=offset + scanned_items, limit=effective_limit, clean=clean, desc=desc, @@ -298,10 +301,11 @@ def iterate_items( yield from current_items_page.items current_page_item_count = len(current_items_page.items) - read_items += current_page_item_count + if current_page_item_count == 0: + break - if current_page_item_count < cache_size: - should_finish = True + read_items += current_page_item_count + scanned_items += effective_limit def download_items( self, @@ -945,13 +949,16 @@ async def iterate_items( """ cache_size = 1000 - should_finish = False read_items = 0 + # Server-side filters (skip_empty, skip_hidden, clean) drop items after the [offset, offset+limit) + # window is scanned, so a short page does not imply the dataset is exhausted. Track the scanned + # window explicitly and stop only when the server returns no items at all. + scanned_items = 0 # We can't rely on DatasetItemsPage.total because that is updated with a delay, # so if you try to read the dataset items right after a run finishes, you could miss some. # Instead, we just read and read until we reach the limit, or until there are no more items to read. - while not should_finish: + while True: effective_limit = cache_size if limit is not None: if read_items == limit: @@ -959,7 +966,7 @@ async def iterate_items( effective_limit = min(cache_size, limit - read_items) current_items_page = await self.list_items( - offset=offset + read_items, + offset=offset + scanned_items, limit=effective_limit, clean=clean, desc=desc, @@ -976,10 +983,11 @@ async def iterate_items( yield item current_page_item_count = len(current_items_page.items) - read_items += current_page_item_count + if current_page_item_count == 0: + break - if current_page_item_count < cache_size: - should_finish = True + read_items += current_page_item_count + scanned_items += effective_limit async def get_items_as_bytes( self, diff --git a/tests/unit/test_dataset_list_items.py b/tests/unit/test_dataset_list_items.py index 19831a15..78b3356c 100644 --- a/tests/unit/test_dataset_list_items.py +++ b/tests/unit/test_dataset_list_items.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING import pytest -from werkzeug import Response +from werkzeug import Request, Response from apify_client import ApifyClient, ApifyClientAsync @@ -87,3 +87,59 @@ async def test_list_items_desc_true_async(httpserver: HTTPServer, desc_header_va result = await client.dataset(DATASET_ID).list_items() assert result.desc is True + + +def _make_filtered_pagination_handler(*, dataset_size: int) -> Callable: + """Simulate an API that scans `[offset, offset+limit)` then post-filters out odd-indexed items. + + Each returned page therefore contains ~half of the scanned window — a situation that + only `skip_empty`, `skip_hidden`, or `clean=True` can produce server-side. + """ + + def handler(request: Request) -> Response: + offset = int(request.args.get('offset', '0')) + limit = int(request.args.get('limit', '1000')) + scanned_end = min(offset + limit, dataset_size) + items = [{'i': i} for i in range(offset, scanned_end) if i % 2 == 0] + return Response( + status=200, + headers={ + 'x-apify-pagination-total': str(dataset_size), + 'x-apify-pagination-offset': str(offset), + 'x-apify-pagination-count': str(len(items)), + 'x-apify-pagination-limit': str(limit), + 'x-apify-pagination-desc': 'false', + 'content-type': 'application/json', + }, + response=json.dumps(items), + ) + + return handler + + +def test_iterate_items_with_filter_does_not_terminate_early_sync(httpserver: HTTPServer) -> None: + dataset_size = 2500 + httpserver.expect_request(ITEMS_PATH).respond_with_handler( + _make_filtered_pagination_handler(dataset_size=dataset_size), + ) + api_url = httpserver.url_for('/').removesuffix('/') + + client = ApifyClient(token='test-token', api_url=api_url) + items = list(client.dataset(DATASET_ID).iterate_items(skip_empty=True)) + + expected = [{'i': i} for i in range(dataset_size) if i % 2 == 0] + assert items == expected + + +async def test_iterate_items_with_filter_does_not_terminate_early_async(httpserver: HTTPServer) -> None: + dataset_size = 2500 + httpserver.expect_request(ITEMS_PATH).respond_with_handler( + _make_filtered_pagination_handler(dataset_size=dataset_size), + ) + api_url = httpserver.url_for('/').removesuffix('/') + + client = ApifyClientAsync(token='test-token', api_url=api_url) + items = [item async for item in client.dataset(DATASET_ID).iterate_items(skip_empty=True)] + + expected = [{'i': i} for i in range(dataset_size) if i % 2 == 0] + assert items == expected