From b53e7cdfcb3c8337b4d02675354651489960bc30 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Wed, 25 Feb 2026 17:32:37 +0100 Subject: [PATCH 01/17] Create submissions_cancelled table --- .../20260225164600_submissions_cancelled.sql | 10 ++++++++++ opsqueue/opsqueue_example_database_schema.db | Bin 94208 -> 102400 bytes 2 files changed, 10 insertions(+) create mode 100644 opsqueue/migrations/20260225164600_submissions_cancelled.sql diff --git a/opsqueue/migrations/20260225164600_submissions_cancelled.sql b/opsqueue/migrations/20260225164600_submissions_cancelled.sql new file mode 100644 index 0000000..56de6c9 --- /dev/null +++ b/opsqueue/migrations/20260225164600_submissions_cancelled.sql @@ -0,0 +1,10 @@ +CREATE TABLE submissions_cancelled +( + id BIGINT PRIMARY KEY NOT NULL, + prefix TEXT, + chunks_total INTEGER NOT NULL DEFAULT 0, + chunks_done INTEGER NOT NULL DEFAULT 0, + metadata BLOB, + cancelled_at DATETIME NOT NULL -- Unix Timestamp +); + diff --git a/opsqueue/opsqueue_example_database_schema.db b/opsqueue/opsqueue_example_database_schema.db index bced270be72bff4c21d3460188c2ca31890d4c69..63eb0a6fe6789cdac88cf616b5ddc15cf2a2dfa8 100644 GIT binary patch delta 1019 zcmZp8z}m2YO(!_eC$l6~AuYcsH?c&)m_dMnk&(ecL4kpRK^TZ7ffxpqC+Zk83QtTB zkzmQ=Wnl8*+0C_=Q<{AfyAs=OHf`4BOfH)h1$HoUunDk%wQOF>sl>$9WX8%aZf?%l z!o6Kyj8Tt~QFF5oQzSc^CmWl1toh_XUX{tZJe-q%aZ7CW=H1C8#>o@Fz`uasmG3TJ z9`7&SeqJq}T|5Cmlg)VQGdS5<&5b#8n1ynU+f*%zOOtXli;FY!^NJOc6Z4W&b8=Es zj0}v-bPbGjjZ75`&8!S8tW3<#KY6l@%izl%8*!=9-NlZpI(*tn{Nk?vvRom%pMiVX zSc+Be_*3Ne^D>9k$7c^(Hlt25Z9Y{s|0dXiF0N>Ym^pOulQM>qVm+I?1| z{m<5BarZb}-}boTzsfPGD29VyZn55dS^PRl+E?hxy+w;(JdJfvy**ts;nAwP~p_c{jwSY z=x&*OT-Jcud)3;>zhn(DC9UNQn0?abPR^6l!IWGDlyr;#Jo&DiDyF26yn(h)kw_#H zJF6tv-5JY&MI?e`I`xT##gG{VV0kAeRi|7-pS z{8#x;@$cu~!oQM#-eyIG7JhCnW>!W>G;?lVsh>81QGBzY!F&GA|LqwC7CC^>CI?0V zb*{Y(oXr0j_^$9x=A6On&Ed_j9Q<=Htu>Oc$7Zm|Qk13hZFyU=v^iYuWxvi?Nn*vkLni*3CXl zN=%b@y*KBvSTb(*=H1C8!ojnHfqwzNE8ktdJl znS5AArQTH6z(Uu+Si#WT%GA`#&~RCwc;8L7oHP}Sp9iM6dl^oeQfuuOEyy@BojNUB+%W=LK(|{MI?ef% z9Q-R8_`mVL=6}F{mH!m~e*P`|D>n-oOyu9ZQa^11qZp9=o`3Uydq#mp4j{D2fl)x6 zJ&A#n`91^R6~4(lYP{ZTqTI)s?{g+_Zp4Uu6{hLHIM3TID#obF2qK%AnK+kj-!H}J z$i%}V#K0qNXvo-}S(3Q@zc8a1C_#wuHZh2Zo0~JXaF-+|<)jvuCgo;spCJj-?=8!? zjY$^hYHp^*4194sU%CHr=W)GaFJs%x63dgy+{m;TnBZEOwl7v<%+O^8B~@Oq2PXfw W=MfbHaz%ky1c-&9SP&@vfFA&5TF#^Z From 6b34ce913bcb0bd55e37fc2c6e245829903d750c Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Wed, 25 Feb 2026 17:32:55 +0100 Subject: [PATCH 02/17] Add rust cancel_submission function --- opsqueue/src/common/submission.rs | 52 +++++++++++++++++++++++++++++++ opsqueue/src/prometheus.rs | 2 ++ 2 files changed, 54 insertions(+) diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 497d77d..897f69d 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -634,6 +634,58 @@ pub mod db { .await } + #[tracing::instrument(skip(conn))] + pub async fn cancel_submission( + id: SubmissionId, + mut conn: impl WriterConnection, + ) -> sqlx::Result<()> { + conn.transaction(move |mut tx| { + Box::pin( + async move { cancel_submission_notx(id, &mut tx).await }, + ) + }) + .await + } + + /// Do not call directly! Must be called inside a transaction. + pub async fn cancel_submission_notx( + id: SubmissionId, + mut conn: impl WriterConnection, + ) -> sqlx::Result<()> { + cancel_submission_raw(id, &mut conn).await?; + super::chunk::db::skip_remaining_chunks(id, conn).await?; + Ok(()) + } + + #[tracing::instrument(skip(conn))] + pub(super) async fn cancel_submission_raw( + id: SubmissionId, + mut conn: impl WriterConnection, + ) -> sqlx::Result<()> { + let now = chrono::prelude::Utc::now(); + + query!( + " + INSERT INTO submissions_cancelled + (id, chunks_total, prefix, metadata, cancelled_at, chunks_done) + SELECT id, chunks_total, prefix, metadata, julianday($1), chunks_done FROM submissions WHERE id = $3; + + DELETE FROM submissions WHERE id = $4 RETURNING *; + ", + now, + id, + id, + ) + .fetch_one(conn.get_inner()) + .await?; + counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); + histogram!(crate::prometheus::SUBMISSIONS_DURATION_CANCEL_HISTOGRAM).record( + crate::prometheus::time_delta_as_f64(Utc::now() - id.timestamp()), + ); + + Ok(()) + } + #[tracing::instrument(skip(conn))] /// Do not call directly! MUST be called inside a transaction. pub(super) async fn complete_submission_raw( diff --git a/opsqueue/src/prometheus.rs b/opsqueue/src/prometheus.rs index 74d7514..a74860c 100644 --- a/opsqueue/src/prometheus.rs +++ b/opsqueue/src/prometheus.rs @@ -17,8 +17,10 @@ use crate::db::DBPools; pub const SUBMISSIONS_TOTAL_COUNTER: &str = "submissions_total_count"; pub const SUBMISSIONS_COMPLETED_COUNTER: &str = "submissions_completed_count"; pub const SUBMISSIONS_FAILED_COUNTER: &str = "submissions_failed_count"; +pub const SUBMISSIONS_CANCELLED_COUNTER: &str = "submissions_failed_count"; pub const SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM: &str = "submissions_complete_duration_seconds"; pub const SUBMISSIONS_DURATION_FAIL_HISTOGRAM: &str = "submissions_fail_duration_seconds"; +pub const SUBMISSIONS_DURATION_CANCEL_HISTOGRAM: &str = "submissions_cancel_duration_seconds"; pub const CHUNKS_COMPLETED_COUNTER: &str = "chunks_completed_count"; pub const CHUNKS_FAILED_COUNTER: &str = "chunks_failed_count"; From 085bd84d2c286caa96386b4595e74e532752cee9 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 11:58:54 +0100 Subject: [PATCH 03/17] Add endpoint to cancel a submission --- .../python/opsqueue/producer.py | 9 ++++++++ libs/opsqueue_python/src/producer.rs | 19 +++++++++++++++ opsqueue/src/common/submission.rs | 4 ++-- opsqueue/src/producer/client.rs | 23 +++++++++++++++++++ opsqueue/src/producer/server.rs | 13 +++++++++++ 5 files changed, 66 insertions(+), 2 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index 5b8eabb..1edcab9 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -314,6 +314,15 @@ def count_submissions(self) -> int: """ return self.inner.count_submissions() # type: ignore[no-any-return] + def cancel_submission(self, submission_id: SubmissionId) -> None: + """ + Cancel a specific submission if it's in progress. + + Raises: + - `InternalProducerClientError` if there is a low-level internal error. + """ + return self.inner.cancel_submission(submission_id) + def get_submission_status( self, submission_id: SubmissionId ) -> SubmissionStatus | None: diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index 2ea6078..a8347a5 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -118,6 +118,25 @@ impl ProducerClient { }) } + /// TODO docstring + pub fn cancel_submission( + &self, + py: Python<'_>, + id: SubmissionId, + ) -> CPyResult<(), E> + { + py.allow_threads(|| { + self.block_unless_interrupted(async { + self.producer_client + .cancel_submission(id.into()) + .await + .map_err(|e| CError(R(e))) + }) + // .map(|opt| opt.map(Into::into)) + // .map_err(|e| ProducerClientError::new_err(e.to_string())) + }) + } + /// Retrieve the status (in progress, completed or failed) of a specific submission. /// /// The returned SubmissionStatus object also includes the number of chunks finished so far, diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 897f69d..edc5b3b 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -668,9 +668,9 @@ pub mod db { " INSERT INTO submissions_cancelled (id, chunks_total, prefix, metadata, cancelled_at, chunks_done) - SELECT id, chunks_total, prefix, metadata, julianday($1), chunks_done FROM submissions WHERE id = $3; + SELECT id, chunks_total, prefix, metadata, julianday($1), chunks_done FROM submissions WHERE id = $2; - DELETE FROM submissions WHERE id = $4 RETURNING *; + DELETE FROM submissions WHERE id = $3 RETURNING *; ", now, id, diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index 0b94b66..2790949 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -101,6 +101,29 @@ impl Client { }) .retry(retry_policy()) .when(InternalProducerClientError::is_ephemeral) + .notify(|err, dur| { + tracing::debug!("retrying error {err:?} with sleeping {dur:?}");}) .await + } + + /// TODO docstring + pub async fn cancel_submission( + &self, + submission_id: SubmissionId, + ) -> Result<(), InternalProducerClientError> { + (|| async { + let base_url = &self.base_url; + self + .http_client + .post(format!("{base_url}/submissions/cancel/{submission_id}")) + .send() + .await? + .error_for_status()?; + // let bytes = resp.bytes().await?; + // let body = serde_json::from_slice(&bytes)?; + Ok(()) + }) + .retry(retry_policy()) + .when(InternalProducerClientError::is_ephemeral) .notify(|err, dur| { tracing::debug!("retrying error {err:?} with sleeping {dur:?}"); }) diff --git a/opsqueue/src/producer/server.rs b/opsqueue/src/producer/server.rs index d32fee1..053f31f 100644 --- a/opsqueue/src/producer/server.rs +++ b/opsqueue/src/producer/server.rs @@ -46,6 +46,10 @@ impl ServerState { pub fn build_router(self) -> Router<()> { Router::new() .route("/submissions", post(insert_submission)) + .route( + "/submissions/cancel/{submission_id}", + post(cancel_submission), + ) .route( "/submissions/count_completed", get(submissions_count_completed), @@ -85,6 +89,15 @@ where } } +async fn cancel_submission( + State(state): State, + Path(submission_id): Path, +) -> Result<(), ServerError> { + let mut conn = state.pool.writer_conn().await?; + submission::db::cancel_submission(submission_id, &mut conn).await?; + Ok(()) +} + async fn submission_status( State(state): State, Path(submission_id): Path, From 104b90b0b76f8daacdf8ff5ecf710b75b26f12ca Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 14:12:56 +0100 Subject: [PATCH 04/17] Add SubmissionCancelled to SubmissionStatus --- libs/opsqueue_python/src/common.rs | 37 ++++++++++++++++++++++++++++++ opsqueue/src/common/submission.rs | 37 ++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index caa5128..457511b 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -318,6 +318,18 @@ impl From for SubmissionFailed { } } +impl From for SubmissionCancelled { + fn from(value: opsqueue::common::submission::SubmissionCancelled) -> Self { + Self { + id: value.id.into(), + chunks_total: value.chunks_total.into(), + chunks_done: value.chunks_done.into(), + metadata: value.metadata, + cancelled_at: value.cancelled_at, + } + } +} + #[pyclass(frozen, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub enum SubmissionStatus { @@ -331,6 +343,9 @@ pub enum SubmissionStatus { submission: SubmissionFailed, chunk: ChunkFailed, }, + Cancelled { + submission: SubmissionCancelled, + } } impl From for SubmissionStatus { @@ -347,6 +362,9 @@ impl From for SubmissionStatus { let chunk = ChunkFailed::from_internal(c, &s); let submission = s.into(); SubmissionStatus::Failed { submission, chunk } + }, + Cancelled(s) => SubmissionStatus::Cancelled { + submission: s.into(), } } } @@ -414,6 +432,14 @@ impl SubmissionFailed { } } +#[pymethods] +impl SubmissionCancelled { + fn __repr__(&self) -> String { + format!("SubmissionCancelled(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, cancelled_at={4})", + self.id.__repr__(), self.chunks_total, self.chunks_done, self.metadata, self.cancelled_at) + } +} + #[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct SubmissionCompleted { @@ -435,6 +461,17 @@ pub struct SubmissionFailed { pub failed_chunk_id: u64, } +#[pyclass(frozen, get_all, module = "opsqueue")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SubmissionCancelled { + pub id: SubmissionId, + pub chunks_total: u64, + pub chunks_done: u64, + pub metadata: Option, + // TODO pub strategic_metadata: Option, + pub cancelled_at: DateTime, +} + pub async fn run_unless_interrupted( future: impl IntoFuture>, ) -> Result diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index edc5b3b..0cf6748 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -183,11 +183,27 @@ pub struct SubmissionFailed { pub otel_trace_carrier: String, } +/// A submission that has been cancelled. +/// +/// Once a submission is cancelled, it gets moved to the `submissions_cancelled` +/// table, and its old `submissions` record gets deleted. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct SubmissionCancelled { + pub id: SubmissionId, + pub prefix: Option, + pub chunks_total: ChunkCount, + pub chunks_done: ChunkCount, + pub metadata: Option, + // TODO pub strategic_metadata: Option, + pub cancelled_at: DateTime, +} + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum SubmissionStatus { InProgress(Submission), Completed(SubmissionCompleted), Failed(SubmissionFailed, ChunkFailed), + Cancelled(SubmissionCancelled) } impl Default for Submission { @@ -607,6 +623,27 @@ pub mod db { failed_chunk, ))); } + + let cancelled = query_as!( + SubmissionCancelled, + r#" + SELECT + id AS "id: SubmissionId" + , prefix + , chunks_total AS "chunks_total: ChunkCount" + , chunks_done AS "chunks_done: ChunkCount" + , metadata + , cancelled_at AS "cancelled_at: DateTime" + FROM submissions_cancelled WHERE id = $1 + "#, + id + ) + .fetch_optional(conn.get_inner()) + .await?; + if let Some(cancelled) = cancelled { + return Ok(Some(SubmissionStatus::Cancelled(cancelled))); + } + Ok(None) } From 772fa196903351c6ff27ba72d2da2dfac9451224 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 14:24:21 +0100 Subject: [PATCH 05/17] fixup! Add SubmissionCancelled to SubmissionStatus Add strategic metadata to SubmissionCancelled --- libs/opsqueue_python/src/common.rs | 3 ++- opsqueue/src/common/submission.rs | 22 +++++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 457511b..2fd04db 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -325,6 +325,7 @@ impl From for SubmissionCance chunks_total: value.chunks_total.into(), chunks_done: value.chunks_done.into(), metadata: value.metadata, + strategic_metadata: value.strategic_metadata, cancelled_at: value.cancelled_at, } } @@ -468,7 +469,7 @@ pub struct SubmissionCancelled { pub chunks_total: u64, pub chunks_done: u64, pub metadata: Option, - // TODO pub strategic_metadata: Option, + pub strategic_metadata: Option, pub cancelled_at: DateTime, } diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 0cf6748..657b247 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -194,7 +194,7 @@ pub struct SubmissionCancelled { pub chunks_total: ChunkCount, pub chunks_done: ChunkCount, pub metadata: Option, - // TODO pub strategic_metadata: Option, + pub strategic_metadata: Option, pub cancelled_at: DateTime, } @@ -624,8 +624,7 @@ pub mod db { ))); } - let cancelled = query_as!( - SubmissionCancelled, + let cancelled_row_opt = query!( r#" SELECT id AS "id: SubmissionId" @@ -633,6 +632,10 @@ pub mod db { , chunks_total AS "chunks_total: ChunkCount" , chunks_done AS "chunks_done: ChunkCount" , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_cancelled.id + ) AS "strategic_metadata: sqlx::types::Json" , cancelled_at AS "cancelled_at: DateTime" FROM submissions_cancelled WHERE id = $1 "#, @@ -640,8 +643,17 @@ pub mod db { ) .fetch_optional(conn.get_inner()) .await?; - if let Some(cancelled) = cancelled { - return Ok(Some(SubmissionStatus::Cancelled(cancelled))); + if let Some(row) = cancelled_row_opt { + let cancelled_submission = SubmissionCancelled { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunks_done: row.chunks_done, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.map(|json| json.0), + cancelled_at: row.cancelled_at, + }; + return Ok(Some(SubmissionStatus::Cancelled(cancelled_submission))); } Ok(None) From 5d538132e0667e05e30581c7916aa143f745567e Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 14:24:48 +0100 Subject: [PATCH 06/17] fixup! Add SubmissionCancelled to SubmissionStatus precommit --- libs/opsqueue_python/src/common.rs | 6 +++--- libs/opsqueue_python/src/producer.rs | 3 +-- .../migrations/20260225164600_submissions_cancelled.sql | 1 - opsqueue/src/common/submission.rs | 6 ++---- opsqueue/src/producer/client.rs | 7 ++++--- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 2fd04db..b7c26bd 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -346,7 +346,7 @@ pub enum SubmissionStatus { }, Cancelled { submission: SubmissionCancelled, - } + }, } impl From for SubmissionStatus { @@ -363,10 +363,10 @@ impl From for SubmissionStatus { let chunk = ChunkFailed::from_internal(c, &s); let submission = s.into(); SubmissionStatus::Failed { submission, chunk } - }, + } Cancelled(s) => SubmissionStatus::Cancelled { submission: s.into(), - } + }, } } } diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index a8347a5..97f7bc2 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -123,8 +123,7 @@ impl ProducerClient { &self, py: Python<'_>, id: SubmissionId, - ) -> CPyResult<(), E> - { + ) -> CPyResult<(), E> { py.allow_threads(|| { self.block_unless_interrupted(async { self.producer_client diff --git a/opsqueue/migrations/20260225164600_submissions_cancelled.sql b/opsqueue/migrations/20260225164600_submissions_cancelled.sql index 56de6c9..34de8a7 100644 --- a/opsqueue/migrations/20260225164600_submissions_cancelled.sql +++ b/opsqueue/migrations/20260225164600_submissions_cancelled.sql @@ -7,4 +7,3 @@ CREATE TABLE submissions_cancelled metadata BLOB, cancelled_at DATETIME NOT NULL -- Unix Timestamp ); - diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 657b247..fc1ef23 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -203,7 +203,7 @@ pub enum SubmissionStatus { InProgress(Submission), Completed(SubmissionCompleted), Failed(SubmissionFailed, ChunkFailed), - Cancelled(SubmissionCancelled) + Cancelled(SubmissionCancelled), } impl Default for Submission { @@ -689,9 +689,7 @@ pub mod db { mut conn: impl WriterConnection, ) -> sqlx::Result<()> { conn.transaction(move |mut tx| { - Box::pin( - async move { cancel_submission_notx(id, &mut tx).await }, - ) + Box::pin(async move { cancel_submission_notx(id, &mut tx).await }) }) .await } diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index 2790949..0470110 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -102,7 +102,9 @@ impl Client { .retry(retry_policy()) .when(InternalProducerClientError::is_ephemeral) .notify(|err, dur| { - tracing::debug!("retrying error {err:?} with sleeping {dur:?}");}) .await + tracing::debug!("retrying error {err:?} with sleeping {dur:?}"); + }) + .await } /// TODO docstring @@ -112,8 +114,7 @@ impl Client { ) -> Result<(), InternalProducerClientError> { (|| async { let base_url = &self.base_url; - self - .http_client + self.http_client .post(format!("{base_url}/submissions/cancel/{submission_id}")) .send() .await? From b4acd8fa734f63bc2f203d11daae53de6d9da32e Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 15:36:48 +0100 Subject: [PATCH 07/17] fixup! Add SubmissionCancelled to SubmissionStatus Add strategic metadata to SubmissionCancelled --- libs/opsqueue_python/src/common.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index b7c26bd..1ccbdf5 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -436,8 +436,8 @@ impl SubmissionFailed { #[pymethods] impl SubmissionCancelled { fn __repr__(&self) -> String { - format!("SubmissionCancelled(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, cancelled_at={4})", - self.id.__repr__(), self.chunks_total, self.chunks_done, self.metadata, self.cancelled_at) + format!("SubmissionCancelled(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, strategic_metadata={4:?}, cancelled_at={5})", + self.id.__repr__(), self.chunks_total, self.chunks_done, self.metadata, self.strategic_metadata, self.cancelled_at) } } From e95658c3ca55d8dd1b8393ac75bc2b56e562bb4d Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 15:50:48 +0100 Subject: [PATCH 08/17] Cleanup cancelled submissions/chunks --- opsqueue/src/common/submission.rs | 20 ++++++++++++++++++-- opsqueue/src/producer/client.rs | 2 +- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index fc1ef23..0737b35 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -889,6 +889,15 @@ pub mod db { ) .execute(tx.get_inner()) .await?; + query!( + "DELETE FROM submissions_metadata + WHERE submission_id = ( + SELECT id FROM submissions_cancelled WHERE cancelled_at < julianday($1) + );", + older_than + ) + .execute(tx.get_inner()) + .await?; // Clean up old submissions: let n_submissions_completed = query!( @@ -903,6 +912,12 @@ pub mod db { ) .execute(tx.get_inner()) .await?.rows_affected(); + let n_submissions_cancelled = query!( + "DELETE FROM submissions_cancelled WHERE cancelled_at < julianday($1);", + older_than + ) + .execute(tx.get_inner()) + .await?.rows_affected(); let n_chunks_completed = query!( "DELETE FROM chunks_completed WHERE completed_at < julianday($1);", @@ -917,8 +932,9 @@ pub mod db { .execute(tx.get_inner()) .await?.rows_affected(); - tracing::info!("Deleted {n_submissions_completed} completed submissions (with {n_chunks_completed} chunks)"); - tracing::info!("Deleted {n_submissions_failed} failed submissions (with {n_chunks_failed} chunks)"); + tracing::info!("Deleted {n_submissions_completed} completed submissions (with {n_chunks_completed} chunks completed)"); + tracing::info!("Deleted {n_submissions_failed} failed submissions (with {n_chunks_failed} chunks failed)"); + tracing::info!("Deleted {n_submissions_cancelled} cancelled submissions (with {n_chunks_completed} chunks completed and {n_chunks_failed} chunks failed)"); Ok(()) }) }) diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index 0470110..e5d7557 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -367,7 +367,7 @@ mod tests { .expect("Should be OK") .expect("Should be Some"); match status { - SubmissionStatus::Completed(_) | SubmissionStatus::Failed(_, _) => { + SubmissionStatus::Completed(_) | SubmissionStatus::Failed(_, _) | SubmissionStatus::Cancelled(_) => { panic!("Expected a SubmissionStatus that is still Inprogress, got: {status:?}"); } SubmissionStatus::InProgress(submission) => { From 97d1317f84ebde9e3742e30190d1e01918457bdc Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Thu, 26 Feb 2026 17:40:14 +0100 Subject: [PATCH 09/17] Error handling when cancelling submission --- .../python/opsqueue/exceptions.py | 24 ++++++++++++++++++ .../python/opsqueue/producer.py | 12 ++++++--- libs/opsqueue_python/src/errors.rs | 18 ++++++++++++- libs/opsqueue_python/src/lib.rs | 1 + opsqueue/opsqueue_example_database_schema.db | Bin 102400 -> 102400 bytes opsqueue/src/common/errors.rs | 11 +++++++- opsqueue/src/common/submission.rs | 20 +++++++++------ opsqueue/src/prometheus.rs | 2 +- 8 files changed, 74 insertions(+), 14 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index 14603b3..ad03e88 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -37,6 +37,30 @@ def __str__(self) -> str: def __repr__(self) -> str: return str(self) +class SubmissionNotCancellableError(Exception): + __slots__ = ["submission"] + """Raised when a submission could not be cancelled due to already being + completed, failed or cancelled. + + """ + + def __init__( + self, + submission: opsqueue_internal.SubmissionNotCancellableError, + ): + super().__init__() + self.submission = submission + + def __str__(self) -> str: + return f""" + Submission {self.submission.id} was not cancelled because: + + {self.submission} + """ + + def __repr__(self) -> str: + return str(self) + ## Usage errors: diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index 1edcab9..b5c54fc 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -14,7 +14,7 @@ ) from . import opsqueue_internal from . import tracing -from opsqueue.exceptions import SubmissionFailedError +from opsqueue.exceptions import SubmissionFailedError, SubmissionNotCancellableError, SubmissionNotFoundError from .opsqueue_internal import ( # type: ignore[import-not-found] SubmissionId, SubmissionStatus, @@ -30,6 +30,8 @@ "SubmissionCompleted", "SubmissionFailedError", "SubmissionFailed", + "SubmissionNotCancellableError", + "SubmissionNotFoundError", "ChunkFailed", ] @@ -316,10 +318,14 @@ def count_submissions(self) -> int: def cancel_submission(self, submission_id: SubmissionId) -> None: """ - Cancel a specific submission if it's in progress. + Cancel a specific submission if it's still in progress. + + Returns None if the submission was succesfully cancelled. Raises: - - `InternalProducerClientError` if there is a low-level internal error. + - `SubmissionNotCancellableError` if the submission could not be + cancelled because it was already completed, failed or cancelled. + - `SubmissionNotFoundError` """ return self.inner.cancel_submission(submission_id) diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index 0c99f63..d62e2b6 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -7,7 +7,7 @@ use opsqueue::common::errors::{ ChunkNotFound, IncorrectUsage, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E, }; use pyo3::exceptions::PyBaseException; -use pyo3::{import_exception, Bound, PyErr, Python}; +use pyo3::{import_exception, Bound, PyErr, Python, prelude::*}; use crate::common::{ChunkIndex, SubmissionId}; @@ -19,6 +19,7 @@ import_exception!(opsqueue.exceptions, IncorrectUsageError); import_exception!(opsqueue.exceptions, TryFromIntError); import_exception!(opsqueue.exceptions, ChunkNotFoundError); import_exception!(opsqueue.exceptions, SubmissionNotFoundError); +import_exception!(opsqueue.exceptions, SubmissionNotCancellableError); import_exception!(opsqueue.exceptions, NewObjectStoreClientError); import_exception!(opsqueue.exceptions, SubmissionNotCompletedYetError); @@ -143,6 +144,21 @@ impl From> for PyErr { } } +#[pyclass(frozen, get_all, module = "opsqueue")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SubmissionNotCancellable { + Cancelled(crate::common::SubmissionCancelled), + Completed(crate::common::SubmissionCompleted), + Failed(crate::common::SubmissionFailed), + // NotFound(SubmissionNotFoundError), +} + +impl From> for PyErr { + fn from(value: CError) -> Self { + SubmissionNotCancellableError::new_err(value.0) + } +} + impl From> for PyErr { fn from(value: CError) -> Self { let submission_id = value.0 .0; diff --git a/libs/opsqueue_python/src/lib.rs b/libs/opsqueue_python/src/lib.rs index 8800502..fd79803 100644 --- a/libs/opsqueue_python/src/lib.rs +++ b/libs/opsqueue_python/src/lib.rs @@ -23,6 +23,7 @@ fn opsqueue_internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/opsqueue/opsqueue_example_database_schema.db b/opsqueue/opsqueue_example_database_schema.db index 63eb0a6fe6789cdac88cf616b5ddc15cf2a2dfa8..4d6ad39aac49256a3bb2c0107d27d7f13b0ed702 100644 GIT binary patch delta 262 zcmZozz}B#UZ9}b$t(k(Msg<#bm5Ev5$C$^j{P(^8eV*Ayg!g!mP}o1!!r8a2Kk9d9 zeqC^ir$SFs6EUp`uCu_-Sz{MhEjhTZA zr%vvd)q{&20g8FAT08ldtT|lFLe7}kCvEQJJUJt{*m9tlTm0wAcjdI>~*$$@&b8fxB;htVxDnR Ln-rEQFcts+r9o66 diff --git a/opsqueue/src/common/errors.rs b/opsqueue/src/common/errors.rs index 38ee007..f07ba47 100644 --- a/opsqueue/src/common/errors.rs +++ b/opsqueue/src/common/errors.rs @@ -11,7 +11,7 @@ use thiserror::Error; use crate::consumer::common::SyncServerToClientResponse; -use super::{chunk::ChunkId, submission::SubmissionId}; +use super::{chunk::ChunkId, submission::{SubmissionId, SubmissionCompleted, SubmissionFailed, SubmissionCancelled}}; // #[derive(Error, Debug, Clone, Serialize, Deserialize)] // #[error("Low-level database error: {0:?}")] @@ -36,6 +36,15 @@ pub struct ChunkNotFound(pub ChunkId); #[error("Submission not found for ID {0:?}")] pub struct SubmissionNotFound(pub SubmissionId); +/// A submission could not be cancelled due to one of the enumerated reasons. +#[derive(Error, Debug)] +#[error("Submission could not be cancelled {0:?}")] +pub enum SubmissionNotCancellable { + Cancelled(SubmissionCancelled), + Failed(SubmissionFailed), + Completed(SubmissionCompleted), +} + #[derive(Error, Debug)] #[error("Unexpected opsqueue consumer server response. This indicates an error inside Opsqueue itself: {0:?}")] pub struct UnexpectedOpsqueueConsumerServerResponse(pub SyncServerToClientResponse); diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 0737b35..e8243a1 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -687,7 +687,7 @@ pub mod db { pub async fn cancel_submission( id: SubmissionId, mut conn: impl WriterConnection, - ) -> sqlx::Result<()> { + ) -> Result<(), E> { conn.transaction(move |mut tx| { Box::pin(async move { cancel_submission_notx(id, &mut tx).await }) }) @@ -698,7 +698,7 @@ pub mod db { pub async fn cancel_submission_notx( id: SubmissionId, mut conn: impl WriterConnection, - ) -> sqlx::Result<()> { + ) -> Result<(), E> { cancel_submission_raw(id, &mut conn).await?; super::chunk::db::skip_remaining_chunks(id, conn).await?; Ok(()) @@ -708,10 +708,10 @@ pub mod db { pub(super) async fn cancel_submission_raw( id: SubmissionId, mut conn: impl WriterConnection, - ) -> sqlx::Result<()> { + ) -> Result<(), E> { let now = chrono::prelude::Utc::now(); - query!( + let submission_opt = query!( " INSERT INTO submissions_cancelled (id, chunks_total, prefix, metadata, cancelled_at, chunks_done) @@ -723,14 +723,18 @@ pub mod db { id, id, ) - .fetch_one(conn.get_inner()) + .fetch_optional(conn.get_inner()) .await?; - counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); histogram!(crate::prometheus::SUBMISSIONS_DURATION_CANCEL_HISTOGRAM).record( crate::prometheus::time_delta_as_f64(Utc::now() - id.timestamp()), ); - - Ok(()) + match submission_opt { + None => Err(E::R(SubmissionNotFound(id))), + Some(_) => { + counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); + Ok(()) + }, + } } #[tracing::instrument(skip(conn))] diff --git a/opsqueue/src/prometheus.rs b/opsqueue/src/prometheus.rs index a74860c..77f367e 100644 --- a/opsqueue/src/prometheus.rs +++ b/opsqueue/src/prometheus.rs @@ -17,7 +17,7 @@ use crate::db::DBPools; pub const SUBMISSIONS_TOTAL_COUNTER: &str = "submissions_total_count"; pub const SUBMISSIONS_COMPLETED_COUNTER: &str = "submissions_completed_count"; pub const SUBMISSIONS_FAILED_COUNTER: &str = "submissions_failed_count"; -pub const SUBMISSIONS_CANCELLED_COUNTER: &str = "submissions_failed_count"; +pub const SUBMISSIONS_CANCELLED_COUNTER: &str = "submissions_cancelled_count"; pub const SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM: &str = "submissions_complete_duration_seconds"; pub const SUBMISSIONS_DURATION_FAIL_HISTOGRAM: &str = "submissions_fail_duration_seconds"; pub const SUBMISSIONS_DURATION_CANCEL_HISTOGRAM: &str = "submissions_cancel_duration_seconds"; From 9c9686ac3229b1791742c68764d7558762ed6a35 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Fri, 27 Feb 2026 11:20:52 +0100 Subject: [PATCH 10/17] fixup! Error handling when cancelling submission SubmissionNotFound compiles --- .../python/opsqueue/exceptions.py | 37 ++++++++++--------- .../python/opsqueue/producer.py | 8 +++- libs/opsqueue_python/src/errors.rs | 4 +- libs/opsqueue_python/src/lib.rs | 1 - libs/opsqueue_python/src/producer.rs | 3 +- opsqueue/src/common/errors.rs | 7 +++- opsqueue/src/common/submission.rs | 2 +- opsqueue/src/producer/client.rs | 35 ++++++++++++++---- opsqueue/src/producer/server.rs | 21 +++++++++-- 9 files changed, 79 insertions(+), 39 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index ad03e88..45734fc 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -37,29 +37,30 @@ def __str__(self) -> str: def __repr__(self) -> str: return str(self) -class SubmissionNotCancellableError(Exception): - __slots__ = ["submission"] - """Raised when a submission could not be cancelled due to already being - completed, failed or cancelled. - """ +# class SubmissionNotCancellableError(Exception): +# __slots__ = ["submission"] +# """Raised when a submission could not be cancelled due to already being +# completed, failed or cancelled. - def __init__( - self, - submission: opsqueue_internal.SubmissionNotCancellableError, - ): - super().__init__() - self.submission = submission +# """ - def __str__(self) -> str: - return f""" - Submission {self.submission.id} was not cancelled because: +# def __init__( +# self, +# submission: opsqueue_internal.SubmissionNotCancellableError, +# ): +# super().__init__() +# self.submission = submission - {self.submission} - """ +# def __str__(self) -> str: +# return f""" +# Submission {self.submission.id} was not cancelled because: - def __repr__(self) -> str: - return str(self) +# {self.submission} +# """ + +# def __repr__(self) -> str: +# return str(self) ## Usage errors: diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index b5c54fc..f5e0325 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -14,7 +14,11 @@ ) from . import opsqueue_internal from . import tracing -from opsqueue.exceptions import SubmissionFailedError, SubmissionNotCancellableError, SubmissionNotFoundError +from opsqueue.exceptions import ( + SubmissionFailedError, + # SubmissionNotCancellableError, + SubmissionNotFoundError, +) from .opsqueue_internal import ( # type: ignore[import-not-found] SubmissionId, SubmissionStatus, @@ -30,7 +34,7 @@ "SubmissionCompleted", "SubmissionFailedError", "SubmissionFailed", - "SubmissionNotCancellableError", + # "SubmissionNotCancellableError", "SubmissionNotFoundError", "ChunkFailed", ] diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index d62e2b6..aa35b6c 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -1,4 +1,4 @@ -/// NOTE: We defne the potentially raisable errors/exceptions in Python +/// NOTE: We define the potentially raisable errors/exceptions in Python /// so we have nice IDE support for docs-on-hover and for 'go to definition'. use std::error::Error; @@ -7,7 +7,7 @@ use opsqueue::common::errors::{ ChunkNotFound, IncorrectUsage, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E, }; use pyo3::exceptions::PyBaseException; -use pyo3::{import_exception, Bound, PyErr, Python, prelude::*}; +use pyo3::{import_exception, prelude::*, Bound, PyErr, Python}; use crate::common::{ChunkIndex, SubmissionId}; diff --git a/libs/opsqueue_python/src/lib.rs b/libs/opsqueue_python/src/lib.rs index fd79803..8800502 100644 --- a/libs/opsqueue_python/src/lib.rs +++ b/libs/opsqueue_python/src/lib.rs @@ -23,7 +23,6 @@ fn opsqueue_internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index 97f7bc2..21dcf20 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -9,6 +9,7 @@ use pyo3::{ use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use opsqueue::{ + common::errors::{SubmissionNotFound}, common::errors::E::{self, L, R}, object_store::{ChunksStorageError, NewObjectStoreClientError}, producer::client::{Client as ActualClient, InternalProducerClientError}, @@ -123,7 +124,7 @@ impl ProducerClient { &self, py: Python<'_>, id: SubmissionId, - ) -> CPyResult<(), E> { + ) -> CPyResult<(), E>> { py.allow_threads(|| { self.block_unless_interrupted(async { self.producer_client diff --git a/opsqueue/src/common/errors.rs b/opsqueue/src/common/errors.rs index f07ba47..4e50b90 100644 --- a/opsqueue/src/common/errors.rs +++ b/opsqueue/src/common/errors.rs @@ -11,7 +11,10 @@ use thiserror::Error; use crate::consumer::common::SyncServerToClientResponse; -use super::{chunk::ChunkId, submission::{SubmissionId, SubmissionCompleted, SubmissionFailed, SubmissionCancelled}}; +use super::{ + chunk::ChunkId, + submission::{SubmissionCancelled, SubmissionCompleted, SubmissionFailed, SubmissionId}, +}; // #[derive(Error, Debug, Clone, Serialize, Deserialize)] // #[error("Low-level database error: {0:?}")] @@ -32,7 +35,7 @@ impl From for E { #[error("Chunk not found for ID {0:?}")] pub struct ChunkNotFound(pub ChunkId); -#[derive(Error, Debug)] +#[derive(Error, Debug, Deserialize, Serialize)] #[error("Submission not found for ID {0:?}")] pub struct SubmissionNotFound(pub SubmissionId); diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index e8243a1..7d087ef 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -733,7 +733,7 @@ pub mod db { Some(_) => { counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); Ok(()) - }, + } } } diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index e5d7557..d422749 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -3,7 +3,10 @@ use std::time::Duration; use backon::BackoffBuilder; use backon::FibonacciBuilder; use backon::Retryable; +use http::StatusCode; +use crate::common::errors; +use crate::common::errors::E; use crate::common::submission::{SubmissionId, SubmissionStatus}; use crate::tracing::CarrierMap; @@ -111,20 +114,34 @@ impl Client { pub async fn cancel_submission( &self, submission_id: SubmissionId, - ) -> Result<(), InternalProducerClientError> { + ) -> Result<(), E> { (|| async { let base_url = &self.base_url; - self.http_client + let response = self + .http_client .post(format!("{base_url}/submissions/cancel/{submission_id}")) .send() - .await? - .error_for_status()?; - // let bytes = resp.bytes().await?; - // let body = serde_json::from_slice(&bytes)?; + .await + .map_err(|e| E::R(e.into()))?; + let status = response.status(); + if status.is_success() { + return Ok(()); + } + if status == StatusCode::NOT_FOUND { + let not_found_err = response + .json::() + .await + .map_err(|e| E::R(e.into()))?; + return Err(E::<_, InternalProducerClientError>::L(not_found_err)); + } + response.error_for_status().map_err(|e| E::R(e.into()))?; Ok(()) }) .retry(retry_policy()) - .when(InternalProducerClientError::is_ephemeral) + .when(|e| match e { + E::L(_) => false, + E::R(client_err) => client_err.is_ephemeral(), + }) .notify(|err, dur| { tracing::debug!("retrying error {err:?} with sleeping {dur:?}"); }) @@ -367,7 +384,9 @@ mod tests { .expect("Should be OK") .expect("Should be Some"); match status { - SubmissionStatus::Completed(_) | SubmissionStatus::Failed(_, _) | SubmissionStatus::Cancelled(_) => { + SubmissionStatus::Completed(_) + | SubmissionStatus::Failed(_, _) + | SubmissionStatus::Cancelled(_) => { panic!("Expected a SubmissionStatus that is still Inprogress, got: {status:?}"); } SubmissionStatus::InProgress(submission) => { diff --git a/opsqueue/src/producer/server.rs b/opsqueue/src/producer/server.rs index 053f31f..61165dd 100644 --- a/opsqueue/src/producer/server.rs +++ b/opsqueue/src/producer/server.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::common::errors::E; use crate::common::submission::{self, SubmissionId}; use crate::db::DBPools; use axum::extract::{Path, State}; @@ -89,13 +90,25 @@ where } } +// 200 if the submission was successfully cancelled. +// 404 if the submission could not be found. +// 500 if a DatabaseError occurred async fn cancel_submission( State(state): State, Path(submission_id): Path, -) -> Result<(), ServerError> { - let mut conn = state.pool.writer_conn().await?; - submission::db::cancel_submission(submission_id, &mut conn).await?; - Ok(()) +) -> Result<(), Response> { + let mut conn = state + .pool + .writer_conn() + .await + .map_err(|e| ServerError(e.into()).into_response())?; + match submission::db::cancel_submission(submission_id, &mut conn).await { + Ok(_) => Ok(()), + Err(E::L(db_err)) => Err(ServerError(db_err.into()).into_response()), + Err(E::R(not_found_err)) => { + Err((StatusCode::NOT_FOUND, Json(not_found_err)).into_response()) + } + } } async fn submission_status( From 693ec1e64a506a0db27772a91c3ebec76a51bd6b Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Fri, 27 Feb 2026 13:07:23 +0100 Subject: [PATCH 11/17] fixup! Error handling when cancelling submission Test cancel_submission --- .../python/opsqueue/exceptions.py | 13 +++++++- libs/opsqueue_python/src/errors.rs | 31 +++++++++---------- libs/opsqueue_python/tests/test_roundtrip.py | 22 +++++++++++++ 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index 45734fc..1cf02ce 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -100,9 +100,20 @@ class SubmissionNotFoundError(IncorrectUsageError): Raised when a method is used to look up information about a submission but the submission doesn't exist within the Opsqueue. """ + __slots = ["submission_id"] - pass + def __init__( + self, + submission_id: int, + ): + super().__init__() + self.submission_id = submission_id + def __str__(self) -> str: + return f"Submission {self.submission_id} could not be found" + + def __repr__(self) -> str: + return str(self) class ChunkCountIsZeroError(IncorrectUsageError): """ diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index aa35b6c..7665ba6 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -7,7 +7,7 @@ use opsqueue::common::errors::{ ChunkNotFound, IncorrectUsage, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E, }; use pyo3::exceptions::PyBaseException; -use pyo3::{import_exception, prelude::*, Bound, PyErr, Python}; +use pyo3::{import_exception, Bound, PyErr, Python}; use crate::common::{ChunkIndex, SubmissionId}; @@ -19,7 +19,7 @@ import_exception!(opsqueue.exceptions, IncorrectUsageError); import_exception!(opsqueue.exceptions, TryFromIntError); import_exception!(opsqueue.exceptions, ChunkNotFoundError); import_exception!(opsqueue.exceptions, SubmissionNotFoundError); -import_exception!(opsqueue.exceptions, SubmissionNotCancellableError); +// import_exception!(opsqueue.exceptions, SubmissionNotCancellableError); import_exception!(opsqueue.exceptions, NewObjectStoreClientError); import_exception!(opsqueue.exceptions, SubmissionNotCompletedYetError); @@ -127,7 +127,7 @@ impl From>> for PyErr { impl From> for PyErr { fn from(value: CError) -> Self { let submission_id = value.0 .0; - SubmissionNotFoundError::new_err((value.0.to_string(), SubmissionId::from(submission_id))) + SubmissionNotFoundError::new_err(u64::from(submission_id)) } } @@ -144,20 +144,19 @@ impl From> for PyErr { } } -#[pyclass(frozen, get_all, module = "opsqueue")] -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum SubmissionNotCancellable { - Cancelled(crate::common::SubmissionCancelled), - Completed(crate::common::SubmissionCompleted), - Failed(crate::common::SubmissionFailed), - // NotFound(SubmissionNotFoundError), -} +// #[pyclass(frozen, get_all, module = "opsqueue")] +// #[derive(Debug, Clone, PartialEq, Eq)] +// pub enum SubmissionNotCancellable { +// Cancelled(crate::common::SubmissionCancelled), +// Completed(crate::common::SubmissionCompleted), +// Failed(crate::common::SubmissionFailed), +// } -impl From> for PyErr { - fn from(value: CError) -> Self { - SubmissionNotCancellableError::new_err(value.0) - } -} +// impl From> for PyErr { +// fn from(value: CError) -> Self { +// SubmissionNotCancellableError::new_err(value.0) +// } +// } impl From> for PyErr { fn from(value: CError) -> Self { diff --git a/libs/opsqueue_python/tests/test_roundtrip.py b/libs/opsqueue_python/tests/test_roundtrip.py index e13bcac..40f0478 100644 --- a/libs/opsqueue_python/tests/test_roundtrip.py +++ b/libs/opsqueue_python/tests/test_roundtrip.py @@ -2,6 +2,7 @@ # - use pytest's `--log-cli-level=info` (or `=debug`) argument to get more detailed logs from the producer/consumer clients # - use `RUST_LOG="opsqueue=info"` (or `opsqueue=debug` or `debug` for even more verbosity), together with to the pytest option `-s` AKA `--capture=no`, to debug the opsqueue binary itself. +import opsqueue from collections.abc import Iterator, Sequence from opsqueue.producer import ( ProducerClient, @@ -9,6 +10,7 @@ SubmissionFailed, ChunkFailed, SubmissionFailedError, + SubmissionNotFoundError, ) from opsqueue.consumer import ConsumerClient, Chunk from opsqueue.common import SerializationFormat @@ -354,3 +356,23 @@ def assert_submission_failed_has_metadata(x: SubmissionFailed) -> None: submission = producer_client.get_submission_status(submission_id) assert submission is not None assert_submission_failed_has_metadata(submission.submission) + +def test_cancel_submission( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """SubmissionCompleted should include the submission's metadata and + strategic metadata. + + """ + url = "file:///tmp/opsqueue/test_cancel_submission" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) + assert (type(producer_client.get_submission_status(submission_id)).__name__ + == "SubmissionStatus_InProgress") + # Cancel an in progress submission. + assert producer_client.cancel_submission(submission_id) is None + # Cancel an already cancelled submission. + # TODO this should return a NotCancellableError + with pytest.raises(SubmissionNotFoundError) as exc_info: + producer_client.cancel_submission(submission_id) + assert exc_info.value.submission_id == submission_id.id From 99768c2060aa0e59257dbc34a80162165cfc66da Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Fri, 27 Feb 2026 15:34:41 +0100 Subject: [PATCH 12/17] fixup! Error handling when cancelling submission SubmissionNotCancellableError --- .../python/opsqueue/exceptions.py | 41 ++++--- .../python/opsqueue/producer.py | 4 +- libs/opsqueue_python/src/common.rs | 56 +++++++-- libs/opsqueue_python/src/errors.rs | 28 ++--- libs/opsqueue_python/src/lib.rs | 1 + libs/opsqueue_python/src/producer.rs | 10 +- libs/opsqueue_python/tests/test_roundtrip.py | 115 +++++++++++++++--- opsqueue/src/common/errors.rs | 6 +- opsqueue/src/common/submission.rs | 31 ++++- opsqueue/src/producer/client.rs | 30 ++++- opsqueue/src/producer/server.rs | 8 +- 11 files changed, 257 insertions(+), 73 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index 1cf02ce..02faae4 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -1,6 +1,7 @@ ## Expected errors: from . import opsqueue_internal +from typing import Optional class SubmissionFailedError(Exception): @@ -38,29 +39,31 @@ def __repr__(self) -> str: return str(self) -# class SubmissionNotCancellableError(Exception): -# __slots__ = ["submission"] -# """Raised when a submission could not be cancelled due to already being -# completed, failed or cancelled. +class SubmissionNotCancellableError(Exception): + __slots__ = ["submission", "chunk"] + """Raised when a submission could not be cancelled due to already being + completed, failed or cancelled. -# """ + """ -# def __init__( -# self, -# submission: opsqueue_internal.SubmissionNotCancellableError, -# ): -# super().__init__() -# self.submission = submission + def __init__( + self, + submission: opsqueue_internal.SubmissionNotCancellable, + chunk: Optional[opsqueue_internal.ChunkFailed]=None, + ): + super().__init__() + self.submission = submission + self.chunk = chunk -# def __str__(self) -> str: -# return f""" -# Submission {self.submission.id} was not cancelled because: + def __str__(self) -> str: + return f""" + Submission {self.submission.id} was not cancelled because: -# {self.submission} -# """ + {self.submission} + """ -# def __repr__(self) -> str: -# return str(self) + def __repr__(self) -> str: + return str(self) ## Usage errors: @@ -100,6 +103,7 @@ class SubmissionNotFoundError(IncorrectUsageError): Raised when a method is used to look up information about a submission but the submission doesn't exist within the Opsqueue. """ + __slots = ["submission_id"] def __init__( @@ -115,6 +119,7 @@ def __str__(self) -> str: def __repr__(self) -> str: return str(self) + class ChunkCountIsZeroError(IncorrectUsageError): """ Raised when making an empty submission. diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index f5e0325..4ea2e69 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -16,7 +16,7 @@ from . import tracing from opsqueue.exceptions import ( SubmissionFailedError, - # SubmissionNotCancellableError, + SubmissionNotCancellableError, SubmissionNotFoundError, ) from .opsqueue_internal import ( # type: ignore[import-not-found] @@ -34,7 +34,7 @@ "SubmissionCompleted", "SubmissionFailedError", "SubmissionFailed", - # "SubmissionNotCancellableError", + "SubmissionNotCancellableError", "SubmissionNotFoundError", "ChunkFailed", ] diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 1ccbdf5..84a50ee 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -371,6 +371,13 @@ impl From for SubmissionStatus { } } +#[pymethods] +impl SubmissionStatus { + fn __repr__(&self) -> String { + format!("{self:?}") + } +} + #[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct Submission { @@ -404,13 +411,6 @@ impl Submission { } } -#[pymethods] -impl SubmissionStatus { - fn __repr__(&self) -> String { - format!("{self:?}") - } -} - #[pymethods] impl SubmissionCompleted { fn __repr__(&self) -> String { @@ -473,6 +473,48 @@ pub struct SubmissionCancelled { pub cancelled_at: DateTime, } +#[pyclass(frozen, module = "opsqueue")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SubmissionNotCancellable { + Cancelled { + submission: SubmissionCancelled, + }, + Completed { + submission: SubmissionCompleted, + }, + Failed { + submission: SubmissionFailed, + chunk: ChunkFailed, + }, +} + +impl From for SubmissionNotCancellable { + fn from(value: opsqueue::common::errors::SubmissionNotCancellable) -> Self { + use opsqueue::common::errors::SubmissionNotCancellable::*; + match value { + Completed(s) => SubmissionNotCancellable::Completed { + submission: s.into(), + }, + Failed(s, c) => { + let chunk = ChunkFailed::from_internal(c, &s); + SubmissionNotCancellable::Failed { + submission: s.into(), chunk + } + }, + Cancelled(s) => SubmissionNotCancellable::Cancelled { + submission: s.into(), + }, + } + } +} + +#[pymethods] +impl SubmissionNotCancellable { + fn __repr__(&self) -> String { + format!("{self:?}") + } +} + pub async fn run_unless_interrupted( future: impl IntoFuture>, ) -> Result diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index 7665ba6..b276e66 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -4,11 +4,13 @@ use std::error::Error; use opsqueue::common::chunk::ChunkId; use opsqueue::common::errors::{ - ChunkNotFound, IncorrectUsage, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E, + ChunkNotFound, IncorrectUsage, SubmissionNotFound, SubmissionNotCancellable, + UnexpectedOpsqueueConsumerServerResponse, E, }; use pyo3::exceptions::PyBaseException; use pyo3::{import_exception, Bound, PyErr, Python}; +use crate::common; use crate::common::{ChunkIndex, SubmissionId}; // Expected errors: @@ -19,7 +21,7 @@ import_exception!(opsqueue.exceptions, IncorrectUsageError); import_exception!(opsqueue.exceptions, TryFromIntError); import_exception!(opsqueue.exceptions, ChunkNotFoundError); import_exception!(opsqueue.exceptions, SubmissionNotFoundError); -// import_exception!(opsqueue.exceptions, SubmissionNotCancellableError); +import_exception!(opsqueue.exceptions, SubmissionNotCancellableError); import_exception!(opsqueue.exceptions, NewObjectStoreClientError); import_exception!(opsqueue.exceptions, SubmissionNotCompletedYetError); @@ -124,6 +126,14 @@ impl From>> for PyErr { } } +impl From> for PyErr { + fn from(value: CError) -> Self { + let s: common::SubmissionNotCancellable = value.0.into(); + // TODO pass the 'ChunkFailed' to the Python exception. + SubmissionNotCancellableError::new_err(s) + } +} + impl From> for PyErr { fn from(value: CError) -> Self { let submission_id = value.0 .0; @@ -144,20 +154,6 @@ impl From> for PyErr { } } -// #[pyclass(frozen, get_all, module = "opsqueue")] -// #[derive(Debug, Clone, PartialEq, Eq)] -// pub enum SubmissionNotCancellable { -// Cancelled(crate::common::SubmissionCancelled), -// Completed(crate::common::SubmissionCompleted), -// Failed(crate::common::SubmissionFailed), -// } - -// impl From> for PyErr { -// fn from(value: CError) -> Self { -// SubmissionNotCancellableError::new_err(value.0) -// } -// } - impl From> for PyErr { fn from(value: CError) -> Self { let submission_id = value.0 .0; diff --git a/libs/opsqueue_python/src/lib.rs b/libs/opsqueue_python/src/lib.rs index 8800502..34b29ea 100644 --- a/libs/opsqueue_python/src/lib.rs +++ b/libs/opsqueue_python/src/lib.rs @@ -23,6 +23,7 @@ fn opsqueue_internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index 21dcf20..a1cdd96 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -9,8 +9,8 @@ use pyo3::{ use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use opsqueue::{ - common::errors::{SubmissionNotFound}, common::errors::E::{self, L, R}, + common::errors::{SubmissionNotCancellable, SubmissionNotFound}, object_store::{ChunksStorageError, NewObjectStoreClientError}, producer::client::{Client as ActualClient, InternalProducerClientError}, }; @@ -124,7 +124,13 @@ impl ProducerClient { &self, py: Python<'_>, id: SubmissionId, - ) -> CPyResult<(), E>> { + ) -> CPyResult< + (), + E< + FatalPythonException, + E>, + >, + > { py.allow_threads(|| { self.block_unless_interrupted(async { self.producer_client diff --git a/libs/opsqueue_python/tests/test_roundtrip.py b/libs/opsqueue_python/tests/test_roundtrip.py index 40f0478..dcc5ed4 100644 --- a/libs/opsqueue_python/tests/test_roundtrip.py +++ b/libs/opsqueue_python/tests/test_roundtrip.py @@ -2,15 +2,16 @@ # - use pytest's `--log-cli-level=info` (or `=debug`) argument to get more detailed logs from the producer/consumer clients # - use `RUST_LOG="opsqueue=info"` (or `opsqueue=debug` or `debug` for even more verbosity), together with to the pytest option `-s` AKA `--capture=no`, to debug the opsqueue binary itself. -import opsqueue from collections.abc import Iterator, Sequence from opsqueue.producer import ( + SubmissionId, ProducerClient, SubmissionCompleted, SubmissionFailed, ChunkFailed, SubmissionFailedError, SubmissionNotFoundError, + SubmissionNotCancellableError ) from opsqueue.consumer import ConsumerClient, Chunk from opsqueue.common import SerializationFormat @@ -304,7 +305,7 @@ def run_consumer() -> None: strategy = strategy_from_description(any_consumer_strategy) consumer_client.run_each_op(lambda x: x, strategy=strategy) - with background_process(run_consumer) as _consumer: + with background_process(run_consumer): # Wait for the submission to complete. producer_client.blocking_stream_completed_submission(submission_id) submission = producer_client.get_submission_status(submission_id) @@ -341,7 +342,7 @@ def consume(x: int) -> None: strategy = strategy_from_description(any_consumer_strategy) consumer_client.run_each_op(consume, strategy=strategy) - with background_process(run_consumer) as _consumer: + with background_process(run_consumer): def assert_submission_failed_has_metadata(x: SubmissionFailed) -> None: assert isinstance(x, SubmissionFailed) @@ -357,22 +358,108 @@ def assert_submission_failed_has_metadata(x: SubmissionFailed) -> None: assert submission is not None assert_submission_failed_has_metadata(submission.submission) -def test_cancel_submission( +def test_cancel_submission_not_found( opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: - """SubmissionCompleted should include the submission's metadata and - strategic metadata. + """Attempting to cancel a submission that doesn't exist raises a + SubmissionNotFoundError. + + """ + url = "file:///tmp/opsqueue/test_cancel_submission_not_found" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = 0 + with pytest.raises(SubmissionNotFoundError) as exc_info: + producer_client.cancel_submission(SubmissionId(0)) + assert exc_info.value.submission_id == submission_id + +def test_cancel_in_progress_and_already_cancelled_submissions( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Cancelling a submission that is in progress succeeds and returns none. + Attempting to cancel a submission that is already cancelled should raises a + SubmissionNotCancellableError. """ - url = "file:///tmp/opsqueue/test_cancel_submission" + + url = "file:///tmp/opsqueue/test_cancel_in_progress_and_already_cancelled_submissions" producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) - assert (type(producer_client.get_submission_status(submission_id)).__name__ - == "SubmissionStatus_InProgress") - # Cancel an in progress submission. + # Sanity check submission is in progress before proceeding to cancel. + assert ( + type(producer_client.get_submission_status(submission_id)).__name__ + == "SubmissionStatus_InProgress" + ) + # Cancelling an in progress submission should succeed. assert producer_client.cancel_submission(submission_id) is None - # Cancel an already cancelled submission. - # TODO this should return a NotCancellableError - with pytest.raises(SubmissionNotFoundError) as exc_info: + # Submission status should now be cancelled. + assert ( + type(producer_client.get_submission_status(submission_id)).__name__ + == "SubmissionStatus_Cancelled" + ) + # Cancelling an already cancelled submission should fail. + with pytest.raises(SubmissionNotCancellableError) as exc_info: producer_client.cancel_submission(submission_id) - assert exc_info.value.submission_id == submission_id.id + assert ( + type(exc_info.value.submission).__name__ + == "SubmissionNotCancellable_Cancelled" + ) + +def test_cancel_complete_submission( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Attempting to cancel a submission that has already completed should raise + a SubmissionNotCancellableError. + + """ + url = "file:///tmp/opsqueue/test_cancel_complete_submission" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) + + def run_consumer() -> None: + consumer_client = ConsumerClient(f"localhost:{opsqueue.port}", url) + strategy = strategy_from_description(any_consumer_strategy) + consumer_client.run_each_op(lambda x: x, strategy=strategy) + + with background_process(run_consumer): + # Wait for the submission to complete. + producer_client.blocking_stream_completed_submission(submission_id) + submission = producer_client.get_submission_status(submission_id) + assert isinstance(submission.submission, SubmissionCompleted) + # Cancelling the already completed submission should fail. + with pytest.raises(SubmissionNotCancellableError) as exc_info: + producer_client.cancel_submission(submission_id) + assert ( + type(exc_info.value.submission).__name__ + == "SubmissionNotCancellable_Completed" + ) + +def test_cancel_failed_submission( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Attempting to cancel a submission that has failed should raise a + SubmissionNotCancellableError. + + """ + url = "file:///tmp/opsqueue/test_cancel_failed_submission" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) + + def run_consumer() -> None: + consumer_client = ConsumerClient(f"localhost:{opsqueue.port}", url) + + def consume(x: int) -> None: + raise ValueError(f"Couldn't consume {x}") + + strategy = strategy_from_description(any_consumer_strategy) + consumer_client.run_each_op(consume, strategy=strategy) + + with background_process(run_consumer): + with pytest.raises(SubmissionFailedError): + producer_client.blocking_stream_completed_submission(submission_id) + # Cancelling the failed submission should fail. + with pytest.raises(SubmissionNotCancellableError) as exc_info: + producer_client.cancel_submission(submission_id) + assert ( + type(exc_info.value.submission).__name__ + == "SubmissionNotCancellable_Failed" + ) diff --git a/opsqueue/src/common/errors.rs b/opsqueue/src/common/errors.rs index 4e50b90..f981f88 100644 --- a/opsqueue/src/common/errors.rs +++ b/opsqueue/src/common/errors.rs @@ -12,7 +12,7 @@ use thiserror::Error; use crate::consumer::common::SyncServerToClientResponse; use super::{ - chunk::ChunkId, + chunk::{ChunkFailed, ChunkId}, submission::{SubmissionCancelled, SubmissionCompleted, SubmissionFailed, SubmissionId}, }; @@ -40,11 +40,11 @@ pub struct ChunkNotFound(pub ChunkId); pub struct SubmissionNotFound(pub SubmissionId); /// A submission could not be cancelled due to one of the enumerated reasons. -#[derive(Error, Debug)] +#[derive(Error, Debug, Deserialize, Serialize)] #[error("Submission could not be cancelled {0:?}")] pub enum SubmissionNotCancellable { Cancelled(SubmissionCancelled), - Failed(SubmissionFailed), + Failed(SubmissionFailed, ChunkFailed), Completed(SubmissionCompleted), } diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 7d087ef..bc5dcd2 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -260,7 +260,7 @@ impl Submission { pub mod db { use crate::{ common::{ - errors::{DatabaseError, SubmissionNotFound, E}, + errors::{DatabaseError, SubmissionNotCancellable, SubmissionNotFound, E}, StrategicMetadataMap, }, db::{Connection, True, WriterConnection, WriterPool}, @@ -687,9 +687,34 @@ pub mod db { pub async fn cancel_submission( id: SubmissionId, mut conn: impl WriterConnection, - ) -> Result<(), E> { + ) -> Result<(), E>> { conn.transaction(move |mut tx| { - Box::pin(async move { cancel_submission_notx(id, &mut tx).await }) + Box::pin(async move { + match cancel_submission_notx(id, &mut tx).await { + Ok(()) => Ok(()), + Err(E::L(db_err)) => Err(E::L(db_err)), + Err(E::R(not_found_err)) => { + // Submission could not be found, let's check the status + // in order to return a more informative error. + match submission_status(id, &mut tx).await { + Ok(None) => Err(E::R(E::L(not_found_err))), + Ok(Some(SubmissionStatus::InProgress(submission))) => { + panic!("Failed to cancel in progress submission {:?}", submission) + } + Ok(Some(SubmissionStatus::Completed(submission))) => { + Err(E::R(E::R(SubmissionNotCancellable::Completed(submission)))) + } + Ok(Some(SubmissionStatus::Failed(submission, chunk))) => Err(E::R( + E::R(SubmissionNotCancellable::Failed(submission, chunk)), + )), + Ok(Some(SubmissionStatus::Cancelled(submission))) => { + Err(E::R(E::R(SubmissionNotCancellable::Cancelled(submission)))) + } + Err(_) => Ok(()), + } + } + } + }) }) .await } diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index d422749..12546de 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -114,7 +114,13 @@ impl Client { pub async fn cancel_submission( &self, submission_id: SubmissionId, - ) -> Result<(), E> { + ) -> Result< + (), + E< + errors::SubmissionNotFound, + E, + >, + > { (|| async { let base_url = &self.base_url; let response = self @@ -122,7 +128,7 @@ impl Client { .post(format!("{base_url}/submissions/cancel/{submission_id}")) .send() .await - .map_err(|e| E::R(e.into()))?; + .map_err(|e| E::R(E::R(e.into())))?; let status = response.status(); if status.is_success() { return Ok(()); @@ -131,16 +137,28 @@ impl Client { let not_found_err = response .json::() .await - .map_err(|e| E::R(e.into()))?; - return Err(E::<_, InternalProducerClientError>::L(not_found_err)); + .map_err(|e| E::R(E::R(e.into())))?; + return Err(E::<_, E<_, InternalProducerClientError>>::L(not_found_err)); } - response.error_for_status().map_err(|e| E::R(e.into()))?; + if status == StatusCode::CONFLICT { + let not_cancellable_err = response + .json::() + .await + .map_err(|e| E::R(E::R(e.into())))?; + return Err(E::<_, E<_, InternalProducerClientError>>::R(E::L( + not_cancellable_err, + ))); + } + response + .error_for_status() + .map_err(|e| E::R(E::R(e.into())))?; Ok(()) }) .retry(retry_policy()) .when(|e| match e { E::L(_) => false, - E::R(client_err) => client_err.is_ephemeral(), + E::R(E::L(_)) => false, + E::R(E::R(client_err)) => client_err.is_ephemeral(), }) .notify(|err, dur| { tracing::debug!("retrying error {err:?} with sleeping {dur:?}"); diff --git a/opsqueue/src/producer/server.rs b/opsqueue/src/producer/server.rs index 61165dd..ad144e0 100644 --- a/opsqueue/src/producer/server.rs +++ b/opsqueue/src/producer/server.rs @@ -92,7 +92,8 @@ where // 200 if the submission was successfully cancelled. // 404 if the submission could not be found. -// 500 if a DatabaseError occurred +// 409 if the submission could not be cancelled. +// 500 if a DatabaseError occurred. async fn cancel_submission( State(state): State, Path(submission_id): Path, @@ -105,9 +106,12 @@ async fn cancel_submission( match submission::db::cancel_submission(submission_id, &mut conn).await { Ok(_) => Ok(()), Err(E::L(db_err)) => Err(ServerError(db_err.into()).into_response()), - Err(E::R(not_found_err)) => { + Err(E::R(E::L(not_found_err))) => { Err((StatusCode::NOT_FOUND, Json(not_found_err)).into_response()) } + Err(E::R(E::R(not_cancellable_err))) => { + Err((StatusCode::CONFLICT, Json(not_cancellable_err)).into_response()) + } } } From 807eb0c529edc442675f897700d124b5996a4a2c Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Fri, 27 Feb 2026 15:35:01 +0100 Subject: [PATCH 13/17] Bump version to 0.35.0 --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- libs/opsqueue_python/python/opsqueue/exceptions.py | 2 +- libs/opsqueue_python/src/common.rs | 5 +++-- libs/opsqueue_python/src/errors.rs | 2 +- libs/opsqueue_python/tests/test_roundtrip.py | 13 +++++++++---- 6 files changed, 17 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3e5d698..9c5a8ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,7 +2085,7 @@ dependencies = [ [[package]] name = "opsqueue" -version = "0.34.0" +version = "0.35.0" dependencies = [ "anyhow", "arc-swap", @@ -2139,7 +2139,7 @@ dependencies = [ [[package]] name = "opsqueue_python" -version = "0.34.0" +version = "0.35.0" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 061bc81..804d2cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ ] [workspace.package] -version = "0.34.0" +version = "0.35.0" [workspace.lints.clippy] diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index 02faae4..988f004 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -49,7 +49,7 @@ class SubmissionNotCancellableError(Exception): def __init__( self, submission: opsqueue_internal.SubmissionNotCancellable, - chunk: Optional[opsqueue_internal.ChunkFailed]=None, + chunk: Optional[opsqueue_internal.ChunkFailed] = None, ): super().__init__() self.submission = submission diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 84a50ee..71e1fec 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -498,9 +498,10 @@ impl From for SubmissionNotC Failed(s, c) => { let chunk = ChunkFailed::from_internal(c, &s); SubmissionNotCancellable::Failed { - submission: s.into(), chunk + submission: s.into(), + chunk, } - }, + } Cancelled(s) => SubmissionNotCancellable::Cancelled { submission: s.into(), }, diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index b276e66..488a328 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -4,7 +4,7 @@ use std::error::Error; use opsqueue::common::chunk::ChunkId; use opsqueue::common::errors::{ - ChunkNotFound, IncorrectUsage, SubmissionNotFound, SubmissionNotCancellable, + ChunkNotFound, IncorrectUsage, SubmissionNotCancellable, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E, }; use pyo3::exceptions::PyBaseException; diff --git a/libs/opsqueue_python/tests/test_roundtrip.py b/libs/opsqueue_python/tests/test_roundtrip.py index dcc5ed4..779a3cc 100644 --- a/libs/opsqueue_python/tests/test_roundtrip.py +++ b/libs/opsqueue_python/tests/test_roundtrip.py @@ -11,7 +11,7 @@ ChunkFailed, SubmissionFailedError, SubmissionNotFoundError, - SubmissionNotCancellableError + SubmissionNotCancellableError, ) from opsqueue.consumer import ConsumerClient, Chunk from opsqueue.common import SerializationFormat @@ -358,6 +358,7 @@ def assert_submission_failed_has_metadata(x: SubmissionFailed) -> None: assert submission is not None assert_submission_failed_has_metadata(submission.submission) + def test_cancel_submission_not_found( opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: @@ -372,6 +373,7 @@ def test_cancel_submission_not_found( producer_client.cancel_submission(SubmissionId(0)) assert exc_info.value.submission_id == submission_id + def test_cancel_in_progress_and_already_cancelled_submissions( opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: @@ -381,7 +383,9 @@ def test_cancel_in_progress_and_already_cancelled_submissions( """ - url = "file:///tmp/opsqueue/test_cancel_in_progress_and_already_cancelled_submissions" + url = ( + "file:///tmp/opsqueue/test_cancel_in_progress_and_already_cancelled_submissions" + ) producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) # Sanity check submission is in progress before proceeding to cancel. @@ -400,10 +404,10 @@ def test_cancel_in_progress_and_already_cancelled_submissions( with pytest.raises(SubmissionNotCancellableError) as exc_info: producer_client.cancel_submission(submission_id) assert ( - type(exc_info.value.submission).__name__ - == "SubmissionNotCancellable_Cancelled" + type(exc_info.value.submission).__name__ == "SubmissionNotCancellable_Cancelled" ) + def test_cancel_complete_submission( opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: @@ -433,6 +437,7 @@ def run_consumer() -> None: == "SubmissionNotCancellable_Completed" ) + def test_cancel_failed_submission( opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: From 6ab71edb88e139057eb3066037aef95f5fe01429 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Fri, 27 Feb 2026 16:29:35 +0100 Subject: [PATCH 14/17] fixup! Error handling when cancelling submission PR cleanup --- libs/opsqueue_python/python/opsqueue/exceptions.py | 2 ++ libs/opsqueue_python/python/opsqueue/producer.py | 4 ++-- libs/opsqueue_python/src/common.rs | 8 +++++--- libs/opsqueue_python/src/errors.rs | 9 +++++++-- libs/opsqueue_python/src/producer.rs | 6 +++++- libs/opsqueue_python/tests/test_roundtrip.py | 2 ++ opsqueue/src/common/errors.rs | 7 ++++--- opsqueue/src/producer/client.rs | 13 +++++++++++-- 8 files changed, 38 insertions(+), 13 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index 988f004..0b326ab 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -56,10 +56,12 @@ def __init__( self.chunk = chunk def __str__(self) -> str: + chunk_str = f"\n{self.chunk}" return f""" Submission {self.submission.id} was not cancelled because: {self.submission} + {"" if self.chunk is None else chunk_str} """ def __repr__(self) -> str: diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index 4ea2e69..a2cd331 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -322,14 +322,14 @@ def count_submissions(self) -> int: def cancel_submission(self, submission_id: SubmissionId) -> None: """ - Cancel a specific submission if it's still in progress. + Cancel a specific submission that is in progress. Returns None if the submission was succesfully cancelled. Raises: - `SubmissionNotCancellableError` if the submission could not be cancelled because it was already completed, failed or cancelled. - - `SubmissionNotFoundError` + - `SubmissionNotFoundError` if the submission could not be found. """ return self.inner.cancel_submission(submission_id) diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index 71e1fec..5a1dabb 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -473,12 +473,11 @@ pub struct SubmissionCancelled { pub cancelled_at: DateTime, } +/// Submission could not be cancelled because it was already completed, failed +/// or cancelled. #[pyclass(frozen, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub enum SubmissionNotCancellable { - Cancelled { - submission: SubmissionCancelled, - }, Completed { submission: SubmissionCompleted, }, @@ -486,6 +485,9 @@ pub enum SubmissionNotCancellable { submission: SubmissionFailed, chunk: ChunkFailed, }, + Cancelled { + submission: SubmissionCancelled, + }, } impl From for SubmissionNotCancellable { diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index 488a328..16e9ed8 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -128,9 +128,14 @@ impl From>> for PyErr { impl From> for PyErr { fn from(value: CError) -> Self { + let c: Option = match &value.0 { + opsqueue::common::errors::SubmissionNotCancellable::Failed(submission, chunk) => Some( + common::ChunkFailed::from_internal(chunk.clone(), &submission), + ), + _ => None, + }; let s: common::SubmissionNotCancellable = value.0.into(); - // TODO pass the 'ChunkFailed' to the Python exception. - SubmissionNotCancellableError::new_err(s) + SubmissionNotCancellableError::new_err((s, c)) } } diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index a1cdd96..dc1a909 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -119,7 +119,10 @@ impl ProducerClient { }) } - /// TODO docstring + /// Cancel a submission. + /// + /// Will return an error if the submission is already complete, failed, or + /// cancelled, or if the submission could not be found. pub fn cancel_submission( &self, py: Python<'_>, @@ -138,6 +141,7 @@ impl ProducerClient { .await .map_err(|e| CError(R(e))) }) + // TODO ? // .map(|opt| opt.map(Into::into)) // .map_err(|e| ProducerClientError::new_err(e.to_string())) }) diff --git a/libs/opsqueue_python/tests/test_roundtrip.py b/libs/opsqueue_python/tests/test_roundtrip.py index 779a3cc..77b8b89 100644 --- a/libs/opsqueue_python/tests/test_roundtrip.py +++ b/libs/opsqueue_python/tests/test_roundtrip.py @@ -436,6 +436,7 @@ def run_consumer() -> None: type(exc_info.value.submission).__name__ == "SubmissionNotCancellable_Completed" ) + assert exc_info.value.chunk is None def test_cancel_failed_submission( @@ -468,3 +469,4 @@ def consume(x: int) -> None: type(exc_info.value.submission).__name__ == "SubmissionNotCancellable_Failed" ) + assert exc_info.value.chunk is not None diff --git a/opsqueue/src/common/errors.rs b/opsqueue/src/common/errors.rs index f981f88..f30436e 100644 --- a/opsqueue/src/common/errors.rs +++ b/opsqueue/src/common/errors.rs @@ -39,13 +39,14 @@ pub struct ChunkNotFound(pub ChunkId); #[error("Submission not found for ID {0:?}")] pub struct SubmissionNotFound(pub SubmissionId); -/// A submission could not be cancelled due to one of the enumerated reasons. +/// Submission could not be cancelled because it was already completed, failed +/// or cancelled. #[derive(Error, Debug, Deserialize, Serialize)] #[error("Submission could not be cancelled {0:?}")] pub enum SubmissionNotCancellable { - Cancelled(SubmissionCancelled), - Failed(SubmissionFailed, ChunkFailed), Completed(SubmissionCompleted), + Failed(SubmissionFailed, ChunkFailed), + Cancelled(SubmissionCancelled), } #[derive(Error, Debug)] diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index 12546de..80fa61c 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -110,7 +110,10 @@ impl Client { .await } - /// TODO docstring + /// Send a HTTP request to the OpsQueue server to cancel a submission. + /// + /// Will return an error if the submission is already complete, failed, or + /// cancelled, or if the submission could not be found. pub async fn cancel_submission( &self, submission_id: SubmissionId, @@ -130,9 +133,11 @@ impl Client { .await .map_err(|e| E::R(E::R(e.into())))?; let status = response.status(); + // 200, the submission was successfully cancelled. if status.is_success() { return Ok(()); } + // 404, the submission could not be found. if status == StatusCode::NOT_FOUND { let not_found_err = response .json::() @@ -140,6 +145,7 @@ impl Client { .map_err(|e| E::R(E::R(e.into())))?; return Err(E::<_, E<_, InternalProducerClientError>>::L(not_found_err)); } + // 404, the submission could not be cancelled. if status == StatusCode::CONFLICT { let not_cancellable_err = response .json::() @@ -152,7 +158,10 @@ impl Client { response .error_for_status() .map_err(|e| E::R(E::R(e.into())))?; - Ok(()) + panic!( + "Unexpected {:?} from Opsqueue when cancelling a submission", + status + ) }) .retry(retry_policy()) .when(|e| match e { From e4814a29e3877002b37b336c215696b0c04b7d53 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Fri, 27 Feb 2026 16:37:39 +0100 Subject: [PATCH 15/17] fixup! Error handling when cancelling submission Fix checks --- libs/opsqueue_python/python/opsqueue/producer.py | 2 +- libs/opsqueue_python/src/errors.rs | 2 +- libs/opsqueue_python/tests/test_roundtrip.py | 9 +++++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index a2cd331..31e7546 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -331,7 +331,7 @@ def cancel_submission(self, submission_id: SubmissionId) -> None: cancelled because it was already completed, failed or cancelled. - `SubmissionNotFoundError` if the submission could not be found. """ - return self.inner.cancel_submission(submission_id) + self.inner.cancel_submission(submission_id) def get_submission_status( self, submission_id: SubmissionId diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index 16e9ed8..f0c6850 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -130,7 +130,7 @@ impl From> for PyErr { fn from(value: CError) -> Self { let c: Option = match &value.0 { opsqueue::common::errors::SubmissionNotCancellable::Failed(submission, chunk) => Some( - common::ChunkFailed::from_internal(chunk.clone(), &submission), + common::ChunkFailed::from_internal(chunk.clone(), submission), ), _ => None, }; diff --git a/libs/opsqueue_python/tests/test_roundtrip.py b/libs/opsqueue_python/tests/test_roundtrip.py index 77b8b89..0ec2d22 100644 --- a/libs/opsqueue_python/tests/test_roundtrip.py +++ b/libs/opsqueue_python/tests/test_roundtrip.py @@ -393,9 +393,9 @@ def test_cancel_in_progress_and_already_cancelled_submissions( type(producer_client.get_submission_status(submission_id)).__name__ == "SubmissionStatus_InProgress" ) - # Cancelling an in progress submission should succeed. - assert producer_client.cancel_submission(submission_id) is None - # Submission status should now be cancelled. + # Cancelling an in progress submission should change submission status to + # cancelled. + producer_client.cancel_submission(submission_id) assert ( type(producer_client.get_submission_status(submission_id)).__name__ == "SubmissionStatus_Cancelled" @@ -428,6 +428,7 @@ def run_consumer() -> None: # Wait for the submission to complete. producer_client.blocking_stream_completed_submission(submission_id) submission = producer_client.get_submission_status(submission_id) + assert submission is not None assert isinstance(submission.submission, SubmissionCompleted) # Cancelling the already completed submission should fail. with pytest.raises(SubmissionNotCancellableError) as exc_info: @@ -469,4 +470,4 @@ def consume(x: int) -> None: type(exc_info.value.submission).__name__ == "SubmissionNotCancellable_Failed" ) - assert exc_info.value.chunk is not None + assert isinstance(exc_info.value.chunk, ChunkFailed) From a783c86b085ef46d53d6e6b9c59ef5d1fa2f7783 Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Fri, 27 Feb 2026 17:01:15 +0100 Subject: [PATCH 16/17] fixup! Add SubmissionCancelled to SubmissionStatus PR cleanup --- libs/opsqueue_python/python/opsqueue/exceptions.py | 2 +- libs/opsqueue_python/python/opsqueue/producer.py | 2 +- libs/opsqueue_python/src/lib.rs | 1 + opsqueue/src/common/submission.rs | 8 ++++---- opsqueue/src/producer/client.rs | 2 +- opsqueue/src/prometheus.rs | 7 +++++++ 6 files changed, 15 insertions(+), 7 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index 0b326ab..cbeca92 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -106,7 +106,7 @@ class SubmissionNotFoundError(IncorrectUsageError): but the submission doesn't exist within the Opsqueue. """ - __slots = ["submission_id"] + __slots__ = ["submission_id"] def __init__( self, diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index 31e7546..18bb22c 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -324,7 +324,7 @@ def cancel_submission(self, submission_id: SubmissionId) -> None: """ Cancel a specific submission that is in progress. - Returns None if the submission was succesfully cancelled. + Returns None if the submission was successfully cancelled. Raises: - `SubmissionNotCancellableError` if the submission could not be diff --git a/libs/opsqueue_python/src/lib.rs b/libs/opsqueue_python/src/lib.rs index 34b29ea..2c07889 100644 --- a/libs/opsqueue_python/src/lib.rs +++ b/libs/opsqueue_python/src/lib.rs @@ -23,6 +23,7 @@ fn opsqueue_internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index bc5dcd2..daa79dc 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -710,7 +710,7 @@ pub mod db { Ok(Some(SubmissionStatus::Cancelled(submission))) => { Err(E::R(E::R(SubmissionNotCancellable::Cancelled(submission)))) } - Err(_) => Ok(()), + Err(db_err) => Err(E::L(db_err)), } } } @@ -902,7 +902,7 @@ pub mod db { // Clean up old submissions_metadata query!( "DELETE FROM submissions_metadata - WHERE submission_id = ( + WHERE submission_id IN ( SELECT id FROM submissions_completed WHERE completed_at < julianday($1) );", older_than @@ -911,7 +911,7 @@ pub mod db { .await?; query!( "DELETE FROM submissions_metadata - WHERE submission_id = ( + WHERE submission_id IN ( SELECT id FROM submissions_failed WHERE failed_at < julianday($1) );", older_than @@ -920,7 +920,7 @@ pub mod db { .await?; query!( "DELETE FROM submissions_metadata - WHERE submission_id = ( + WHERE submission_id IN ( SELECT id FROM submissions_cancelled WHERE cancelled_at < julianday($1) );", older_than diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index 80fa61c..70bbe5c 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -145,7 +145,7 @@ impl Client { .map_err(|e| E::R(E::R(e.into())))?; return Err(E::<_, E<_, InternalProducerClientError>>::L(not_found_err)); } - // 404, the submission could not be cancelled. + // 409, the submission could not be cancelled. if status == StatusCode::CONFLICT { let not_cancellable_err = response .json::() diff --git a/opsqueue/src/prometheus.rs b/opsqueue/src/prometheus.rs index 77f367e..8340168 100644 --- a/opsqueue/src/prometheus.rs +++ b/opsqueue/src/prometheus.rs @@ -56,7 +56,14 @@ pub fn describe_metrics() { Unit::Count, "Number of submissions failed permanently" ); + describe_counter!( + SUBMISSIONS_CANCELLED_COUNTER, + Unit::Count, + "Number of submissions cancelled permanently" + ); describe_histogram!(SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its final chunk being completed. Does not count failed submissions."); + describe_histogram!(SUBMISSIONS_DURATION_FAIL_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its first chunk being failed."); + describe_histogram!(SUBMISSIONS_DURATION_CANCEL_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its first chunk being canceled."); describe_counter!( CHUNKS_COMPLETED_COUNTER, From 0e59b8138e9e17c20f2f577a490b95dcaad8d88d Mon Sep 17 00:00:00 2001 From: "jeremy.barisch.rooney@channable.com" Date: Fri, 27 Feb 2026 17:06:01 +0100 Subject: [PATCH 17/17] fixup! Error handling when cancelling submission Address comments --- libs/opsqueue_python/python/opsqueue/exceptions.py | 2 +- opsqueue/src/common/submission.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index cbeca92..986f743 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -58,7 +58,7 @@ def __init__( def __str__(self) -> str: chunk_str = f"\n{self.chunk}" return f""" - Submission {self.submission.id} was not cancelled because: + Submission {self.submission.submission.id} was not cancelled because: {self.submission} {"" if self.chunk is None else chunk_str} diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index daa79dc..dc87474 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -963,7 +963,7 @@ pub mod db { tracing::info!("Deleted {n_submissions_completed} completed submissions (with {n_chunks_completed} chunks completed)"); tracing::info!("Deleted {n_submissions_failed} failed submissions (with {n_chunks_failed} chunks failed)"); - tracing::info!("Deleted {n_submissions_cancelled} cancelled submissions (with {n_chunks_completed} chunks completed and {n_chunks_failed} chunks failed)"); + tracing::info!("Deleted {n_submissions_cancelled} cancelled submissions"); Ok(()) }) })