From 6480a1d3bcfefa05cd3dc13934a404c7561854da Mon Sep 17 00:00:00 2001 From: Kevin James Date: Wed, 25 Mar 2026 14:55:58 +0000 Subject: [PATCH 1/3] feat(bigquery): support DATE columns Fixes #987 --- bigquery/gcloud/aio/bigquery/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigquery/gcloud/aio/bigquery/utils.py b/bigquery/gcloud/aio/bigquery/utils.py index 3248befe4..0216384d4 100644 --- a/bigquery/gcloud/aio/bigquery/utils.py +++ b/bigquery/gcloud/aio/bigquery/utils.py @@ -77,6 +77,7 @@ def parse(field: dict[str, Any], value: Any) -> Any: ), 'BOOLEAN': lambda x: x == 'true', 'BYTES': bytes, + 'DATE': datetime.date.fromisoformat, 'FLOAT': float, 'INTEGER': int, 'NUMERIC': lambda x: decimal.Decimal( @@ -90,7 +91,6 @@ def parse(field: dict[str, Any], value: Any) -> Any: }[field['type']] except KeyError: # TODO: determine the proper methods for converting the following: - # DATE -> datetime? # DATETIME -> datetime? # GEOGRAPHY -> ?? # TIME -> datetime? From 81175ff605194cccb9027e517510430bcd8717ff Mon Sep 17 00:00:00 2001 From: Kevin James Date: Wed, 25 Mar 2026 14:57:15 +0000 Subject: [PATCH 2/3] feat(pubsub): allow explicit ack/nack on messages (#931) Adds new ack() and nack() methods on SubscriberMessage which allow users to forcibly mark a messsage as getting acked or nacked, regardless of the success of the callback. In plain English: When `callback` terminates, if `.ack()` has been called it acks. Else, if `.nack()` has been called it nacks. Else, it acks if no exception was raised and nacks otherwise. Fixes #837 --- pubsub/gcloud/aio/pubsub/subscriber.py | 32 +++++++++++++------ .../gcloud/aio/pubsub/subscriber_message.py | 23 +++++++++++++ pubsub/tests/unit/subscriber_test.py | 1 + 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/pubsub/gcloud/aio/pubsub/subscriber.py b/pubsub/gcloud/aio/pubsub/subscriber.py index 29b328e53..daa0218ff 100644 --- a/pubsub/gcloud/aio/pubsub/subscriber.py +++ b/pubsub/gcloud/aio/pubsub/subscriber.py @@ -10,8 +10,8 @@ import time from collections.abc import Awaitable from collections.abc import Callable - from typing import Optional from typing import TYPE_CHECKING + from typing import Optional from typing import TypeVar from . import metrics @@ -286,11 +286,26 @@ async def maybe_nack(ack_id: str) -> None: ack_ids = [] + async def ack_or_nack( + message: SubscriberMessage, + ack_queue: 'asyncio.Queue[str]', + nack_queue: Optional['asyncio.Queue[str]'], + ack: bool = False, + ) -> None: + if message.force_ack_nack is None: + # if we've not forced the ack status, set it here + message.force_ack_nack = ack + + if message.force_ack_nack: + await ack_queue.put(message.ack_id) + elif nack_queue: + await nack_queue.put(message.ack_id) + async def _execute_callback( message: SubscriberMessage, callback: ApplicationHandler, ack_queue: 'asyncio.Queue[str]', - nack_queue: 'Optional[asyncio.Queue[str]]', + nack_queue: Optional['asyncio.Queue[str]'], insertion_time: float, ) -> None: try: @@ -300,18 +315,15 @@ async def _execute_callback( ) with metrics.CONSUME_LATENCY.labels(phase='runtime').time(): await callback(message) - await ack_queue.put(message.ack_id) + await ack_or_nack(message, ack_queue, nack_queue, ack=True) metrics.CONSUME.labels(outcome='succeeded').inc() - except asyncio.CancelledError: - if nack_queue: - await nack_queue.put(message.ack_id) + await ack_or_nack(message, ack_queue, nack_queue, ack=False) log.warning('application callback was cancelled') metrics.CONSUME.labels(outcome='cancelled').inc() except Exception as e: - if nack_queue: - await nack_queue.put(message.ack_id) + await ack_or_nack(message, ack_queue, nack_queue, ack=False) log.warning( 'application callback raised an exception', @@ -326,7 +338,7 @@ async def consumer( # pylint: disable=too-many-locals ack_queue: 'asyncio.Queue[str]', ack_deadline_cache: AckDeadlineCache, max_tasks: int, - nack_queue: 'Optional[asyncio.Queue[str]]', + nack_queue: Optional['asyncio.Queue[str]'], ) -> None: try: semaphore = asyncio.Semaphore(max_tasks) @@ -450,7 +462,7 @@ async def subscribe( ack_queue: 'asyncio.Queue[str]' = asyncio.Queue( maxsize=(max_messages_per_producer * num_producers), ) - nack_queue: 'Optional[asyncio.Queue[str]]' = None + nack_queue: Optional['asyncio.Queue[str]'] = None ack_deadline_cache = AckDeadlineCache( subscriber_client, subscription, diff --git a/pubsub/gcloud/aio/pubsub/subscriber_message.py b/pubsub/gcloud/aio/pubsub/subscriber_message.py index 7186e0de9..4f6f9d73e 100644 --- a/pubsub/gcloud/aio/pubsub/subscriber_message.py +++ b/pubsub/gcloud/aio/pubsub/subscriber_message.py @@ -29,6 +29,8 @@ def __init__( self.attributes = attributes self.delivery_attempt = delivery_attempt + self.force_ack_nack: bool | None = None + @staticmethod def from_repr( received_message: dict[str, Any], @@ -66,3 +68,24 @@ def to_repr(self) -> dict[str, Any]: if self.delivery_attempt is not None: r['deliveryAttempt'] = self.delivery_attempt return r + + def ack(self) -> None: + """ + Forcibly mark a message as acked. + + By default, we only ack a message if the callback returns without + raising an exception. If this method has been called on the Message, we + will instead ack it regardless of exception status. + """ + self.force_ack_nack = True + + def nack(self) -> None: + """ + Forcibly mark a message as nacked. + + By default, we only nack a message if the callback raises an exception. + If this method has been called on the Message, we will instead nack it + regardless of exception status, ie. including if it completes + successfully. + """ + self.force_ack_nack = False diff --git a/pubsub/tests/unit/subscriber_test.py b/pubsub/tests/unit/subscriber_test.py index cd58cef27..9030832a5 100644 --- a/pubsub/tests/unit/subscriber_test.py +++ b/pubsub/tests/unit/subscriber_test.py @@ -29,6 +29,7 @@ def make_message_mock(): mock = MagicMock() mock.ack_id = 'ack_id' mock.publish_time.timestamp = MagicMock(return_value=time.time()) + mock.force_ack_nack = None return mock @pytest.fixture(scope='function') From 9c2e7f8225d64a5df906bfafcd62bbe7888341cc Mon Sep 17 00:00:00 2001 From: TalkIQ Date: Wed, 25 Mar 2026 16:02:05 +0000 Subject: [PATCH 3/3] chore(deps): update node.js to v25.8.2 (#1013) Co-authored-by: Renovate Bot --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 4697c61bf..67f5d6c77 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -116,7 +116,7 @@ jobs: pages: docker: - - image: node:25.8.1 + - image: node:25.8.2 steps: - checkout - attach_workspace: