diff --git a/fastasyncpg/core.py b/fastasyncpg/core.py index 938be07..7df9d9c 100644 --- a/fastasyncpg/core.py +++ b/fastasyncpg/core.py @@ -736,10 +736,14 @@ async def claim_one(self:Table, status_col='status', pending='pending', complete tbl = txn.t[self.name] p.db = txn p.evt = await tbl.claim(where=f'"{status_col}"=$1', where_args=[pending], order_by=order_by) - try: yield p + try: + async with txn.conn.transaction(): yield p # return a nested tx so any tx abort does not prevent us from marking the row as failed except Exception as e: p.failed, p.exc, p.tb = True, e, traceback.format_exc() if p.evt is None: return pk = self.pks[0] new = failed if p.failed else (None if p.retry else completed) stmt = f'UPDATE {self} SET "{status_col}"=$1 WHERE "{pk}"=$2' - if new: await txn.execute(stmt, new, get_field(p.evt, pk)) + if new: + await txn.execute(stmt, new, get_field(p.evt, pk)) + p.evt = await tbl.selectone(f'"{pk}"=$1', [get_field(p.evt, pk)]) + diff --git a/nbs/00_core.ipynb b/nbs/00_core.ipynb index d790f8e..1060fa1 100644 --- a/nbs/00_core.ipynb +++ b/nbs/00_core.ipynb @@ -5093,13 +5093,16 @@ " tbl = txn.t[self.name]\n", " p.db = txn\n", " p.evt = await tbl.claim(where=f'\"{status_col}\"=$1', where_args=[pending], order_by=order_by)\n", - " try: yield p\n", + " try:\n", + " async with txn.conn.transaction(): yield p # return a nested tx so any tx abort does not prevent us from marking the row as failed\n", " except Exception as e: p.failed, p.exc, p.tb = True, e, traceback.format_exc()\n", " if p.evt is None: return\n", " pk = self.pks[0]\n", " new = failed if p.failed else (None if p.retry else completed)\n", " stmt = f'UPDATE {self} SET \"{status_col}\"=$1 WHERE \"{pk}\"=$2'\n", - " if new: await txn.execute(stmt, new, get_field(p.evt, pk))" + " if new: \n", + " await txn.execute(stmt, new, get_field(p.evt, pk))\n", + " p.evt = await tbl.selectone(f'\"{pk}\"=$1', [get_field(p.evt, pk)])\n" ] }, { @@ -5293,6 +5296,31 @@ "await jobs(\"payload=$1\", [pl])" ] }, + { + "cell_type": "markdown", + "id": "a866bdaa", + "metadata": {}, + "source": [ + "Test that when the transaction is aborted the claimed event row is still set to failed." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e1e45f08", + "metadata": {}, + "outputs": [], + "source": [ + "await jobs.delete_where()\n", + "await jobs.inserts([Job(payload='repro', status='pending')])\n", + "\n", + "async with jobs.claim_one(order_by='id') as p:\n", + " await p.db.execute('select definitely_not_a_real_function()')\n", + "\n", + "test_eq(p.evt.status, 'failed')\n", + "test_eq(p.failed, True)" + ] + }, { "cell_type": "code", "execution_count": null,