diff --git a/src/apify_client/_streamed_log.py b/src/apify_client/_streamed_log.py index f57ba074..92178e9a 100644 --- a/src/apify_client/_streamed_log.py +++ b/src/apify_client/_streamed_log.py @@ -5,9 +5,9 @@ import re import threading from asyncio import Task -from datetime import UTC, datetime +from datetime import UTC, datetime, timedelta from threading import Thread -from typing import TYPE_CHECKING, Self, cast +from typing import TYPE_CHECKING, ClassVar, Self, cast from apify_client._docs import docs_group @@ -90,6 +90,10 @@ class StreamedLog(StreamedLogBase): call `start` and `stop` manually. Obtain an instance via `RunClient.get_streamed_log`. """ + # Caps how long `iter_bytes()` can block on a silent stream so `stop()` can unblock within + # this window instead of waiting for the long-polling default. + _read_timeout: ClassVar[timedelta] = timedelta(seconds=30) + def __init__(self, log_client: LogClient, *, to_logger: logging.Logger, from_start: bool = True) -> None: """Initialize `StreamedLog`. @@ -138,17 +142,17 @@ def __exit__( self.stop() def _stream_log(self) -> None: - with self._log_client.stream(raw=True) as log_stream: + with self._log_client.stream(raw=True, timeout=self._read_timeout) as log_stream: if not log_stream: return - for data in log_stream.iter_bytes(): - self._process_new_data(data) - if self._stop_logging: - break - - # If the stream is finished, then the last part will be also processed. - self._log_buffer_content(include_last_part=True) - return + try: + for data in log_stream.iter_bytes(): + self._process_new_data(data) + if self._stop_logging: + break + finally: + # Flush the last buffered part even if the read timed out or was stopped. + self._log_buffer_content(include_last_part=True) @docs_group('Other') @@ -214,8 +218,9 @@ async def _stream_log(self) -> None: async with self._log_client.stream(raw=True) as log_stream: if not log_stream: return - async for data in log_stream.aiter_bytes(): - self._process_new_data(data) - - # If the stream is finished, then the last part will be also processed. - self._log_buffer_content(include_last_part=True) + try: + async for data in log_stream.aiter_bytes(): + self._process_new_data(data) + finally: + # Flush the last buffered part even if the task is cancelled by `stop()`. + self._log_buffer_content(include_last_part=True) diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index cf537795..9bc9a81e 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -3,6 +3,7 @@ import asyncio import json import logging +import threading import time from datetime import datetime, timedelta from typing import TYPE_CHECKING @@ -15,7 +16,7 @@ from apify_client._logging import RedirectLogFormatter from apify_client._models import ActorJobStatus from apify_client._status_message_watcher import StatusMessageWatcherBase -from apify_client._streamed_log import StreamedLogBase +from apify_client._streamed_log import StreamedLog, StreamedLogBase if TYPE_CHECKING: from collections.abc import Iterator @@ -717,3 +718,147 @@ async def test_async_watcher_aexit_skips_final_sleep_on_exception( elapsed = time.monotonic() - start assert elapsed < _FAST_EXIT_THRESHOLD_S, f'__aexit__ should skip final sleep on exception, took {elapsed:.2f}s' + + +_TAIL_FIRST_MESSAGE = '2025-05-13T07:24:12.588Z tail_test first complete line' +_TAIL_SECOND_MESSAGE = '2025-05-13T07:24:13.132Z tail_test trailing partial line' + + +def _register_run_and_actor_endpoints(httpserver: HTTPServer) -> None: + """Register the minimal run and actor endpoints required by `get_streamed_log`.""" + httpserver.expect_request(f'/v2/actor-runs/{_MOCKED_RUN_ID}', method='GET').respond_with_json( + { + 'data': { + 'id': _MOCKED_RUN_ID, + 'actId': _MOCKED_ACTOR_ID, + 'userId': 'test_user_id', + 'startedAt': '2019-11-30T07:34:24.202Z', + 'finishedAt': '2019-12-12T09:30:12.202Z', + 'status': 'RUNNING', + 'statusMessage': 'Running', + 'isStatusMessageTerminal': False, + 'meta': {'origin': 'WEB'}, + 'stats': {'restartCount': 0, 'resurrectCount': 0, 'computeUnits': 0.1}, + 'options': {'build': 'latest', 'timeoutSecs': 300, 'memoryMbytes': 1024, 'diskMbytes': 2048}, + 'buildId': 'test_build_id', + 'generalAccess': 'RESTRICTED', + 'defaultKeyValueStoreId': 'test_kvs_id', + 'defaultDatasetId': 'test_dataset_id', + 'defaultRequestQueueId': 'test_rq_id', + 'buildNumber': '0.0.1', + 'containerUrl': 'https://test.runs.apify.net', + } + } + ) + httpserver.expect_request(f'/v2/acts/{_MOCKED_ACTOR_ID}', method='GET').respond_with_json( + { + 'data': { + 'id': _MOCKED_ACTOR_ID, + 'userId': 'test_user_id', + 'name': _MOCKED_ACTOR_NAME, + 'username': 'test_user', + 'isPublic': False, + 'createdAt': '2019-07-08T11:27:57.401Z', + 'modifiedAt': '2019-07-08T14:01:05.546Z', + 'stats': { + 'totalBuilds': 0, + 'totalRuns': 0, + 'totalUsers': 0, + 'totalUsers7Days': 0, + 'totalUsers30Days': 0, + 'totalUsers90Days': 0, + 'totalMetamorphs': 0, + 'lastRunStartedAt': '2019-07-08T14:01:05.546Z', + }, + 'versions': [], + 'defaultRunOptions': {'build': 'latest', 'timeoutSecs': 3600, 'memoryMbytes': 2048}, + 'deploymentKey': 'test_key', + } + } + ) + + +@pytest.mark.usefixtures('propagate_stream_logs') +async def test_streamed_log_async_stop_flushes_buffered_tail( + caplog: LogCaptureFixture, + httpserver: HTTPServer, +) -> None: + """Verify the buffered tail is flushed to the logger when the async task is cancelled by `stop`.""" + stop_emitting = threading.Event() + + def _tail_handler(_request: Request) -> Response: + def generate_logs() -> Iterator[bytes]: + yield f'{_TAIL_FIRST_MESSAGE}\n'.encode() + # Second marker has no trailing newline/next-marker, so it stays in the buffer. + yield _TAIL_SECOND_MESSAGE.encode() + # Block until the test tears the server down (or stop releases us). + stop_emitting.wait(timeout=30) + + return Response(response=generate_logs(), status=200, mimetype='application/octet-stream') + + httpserver.expect_request( + f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true' + ).respond_with_handler(_tail_handler) + _register_run_and_actor_endpoints(httpserver) + + api_url = httpserver.url_for('/').removesuffix('/') + run_client = ApifyClientAsync(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID) + streamed_log = await run_client.get_streamed_log() + + logger_name = f'apify.{_MOCKED_ACTOR_NAME}-{_MOCKED_RUN_ID}' + + try: + with caplog.at_level(logging.DEBUG, logger=logger_name): + async with streamed_log: + # Wait long enough for both chunks to arrive and be processed. + await asyncio.sleep(1) + # Context exit calls stop(), which cancels the task mid-stream. + finally: + stop_emitting.set() + + messages = [record.message for record in caplog.records] + assert any(_TAIL_FIRST_MESSAGE in m for m in messages), f'First message missing. Got: {messages}' + assert any(_TAIL_SECOND_MESSAGE in m for m in messages), f'Buffered tail dropped on async stop(). Got: {messages}' + + +@pytest.mark.usefixtures('propagate_stream_logs') +def test_streamed_log_sync_stop_does_not_hang_on_silent_stream( + httpserver: HTTPServer, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Verify `stop()` returns promptly even when the underlying stream is silent (no chunks).""" + # Shorten the read timeout so the test doesn't wait for the production default. + monkeypatch.setattr(StreamedLog, '_read_timeout', timedelta(seconds=1)) + + release_server = threading.Event() + + def _silent_handler(_request: Request) -> Response: + def generate_logs() -> Iterator[bytes]: + # Yield an empty chunk so werkzeug flushes headers and the client sees a streaming + # response; then block without emitting any log data. + yield b'' + release_server.wait(timeout=30) + + return Response(response=generate_logs(), status=200, mimetype='application/octet-stream') + + httpserver.expect_request( + f'/v2/actor-runs/{_MOCKED_RUN_ID}/log', method='GET', query_string='stream=true&raw=true' + ).respond_with_handler(_silent_handler) + _register_run_and_actor_endpoints(httpserver) + + api_url = httpserver.url_for('/').removesuffix('/') + run_client = ApifyClient(token='mocked_token', api_url=api_url).run(run_id=_MOCKED_RUN_ID) + streamed_log = run_client.get_streamed_log() + + streamed_log.start() + try: + # Give the streaming thread time to start and block inside iter_bytes. + time.sleep(0.3) + + # Call stop() from a helper thread so the test cannot hang indefinitely if the fix regresses. + stop_thread = threading.Thread(target=streamed_log.stop) + stop_thread.start() + stop_thread.join(timeout=5) + assert not stop_thread.is_alive(), 'stop() hangs when the underlying stream is silent' + finally: + release_server.set()