diff --git a/Cargo.lock b/Cargo.lock index 3e5d698..9c5a8ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,7 +2085,7 @@ dependencies = [ [[package]] name = "opsqueue" -version = "0.34.0" +version = "0.35.0" dependencies = [ "anyhow", "arc-swap", @@ -2139,7 +2139,7 @@ dependencies = [ [[package]] name = "opsqueue_python" -version = "0.34.0" +version = "0.35.0" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 061bc81..804d2cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ ] [workspace.package] -version = "0.34.0" +version = "0.35.0" [workspace.lints.clippy] diff --git a/libs/opsqueue_python/python/opsqueue/exceptions.py b/libs/opsqueue_python/python/opsqueue/exceptions.py index 14603b3..986f743 100644 --- a/libs/opsqueue_python/python/opsqueue/exceptions.py +++ b/libs/opsqueue_python/python/opsqueue/exceptions.py @@ -1,6 +1,7 @@ ## Expected errors: from . import opsqueue_internal +from typing import Optional class SubmissionFailedError(Exception): @@ -38,6 +39,35 @@ def __repr__(self) -> str: return str(self) +class SubmissionNotCancellableError(Exception): + __slots__ = ["submission", "chunk"] + """Raised when a submission could not be cancelled due to already being + completed, failed or cancelled. + + """ + + def __init__( + self, + submission: opsqueue_internal.SubmissionNotCancellable, + chunk: Optional[opsqueue_internal.ChunkFailed] = None, + ): + super().__init__() + self.submission = submission + self.chunk = chunk + + def __str__(self) -> str: + chunk_str = f"\n{self.chunk}" + return f""" + Submission {self.submission.submission.id} was not cancelled because: + + {self.submission} + {"" if self.chunk is None else chunk_str} + """ + + def __repr__(self) -> str: + return str(self) + + ## Usage errors: @@ -76,7 +106,20 @@ class SubmissionNotFoundError(IncorrectUsageError): but the submission doesn't exist within the Opsqueue. """ - pass + __slots__ = ["submission_id"] + + def __init__( + self, + submission_id: int, + ): + super().__init__() + self.submission_id = submission_id + + def __str__(self) -> str: + return f"Submission {self.submission_id} could not be found" + + def __repr__(self) -> str: + return str(self) class ChunkCountIsZeroError(IncorrectUsageError): diff --git a/libs/opsqueue_python/python/opsqueue/producer.py b/libs/opsqueue_python/python/opsqueue/producer.py index 5b8eabb..18bb22c 100644 --- a/libs/opsqueue_python/python/opsqueue/producer.py +++ b/libs/opsqueue_python/python/opsqueue/producer.py @@ -14,7 +14,11 @@ ) from . import opsqueue_internal from . import tracing -from opsqueue.exceptions import SubmissionFailedError +from opsqueue.exceptions import ( + SubmissionFailedError, + SubmissionNotCancellableError, + SubmissionNotFoundError, +) from .opsqueue_internal import ( # type: ignore[import-not-found] SubmissionId, SubmissionStatus, @@ -30,6 +34,8 @@ "SubmissionCompleted", "SubmissionFailedError", "SubmissionFailed", + "SubmissionNotCancellableError", + "SubmissionNotFoundError", "ChunkFailed", ] @@ -314,6 +320,19 @@ def count_submissions(self) -> int: """ return self.inner.count_submissions() # type: ignore[no-any-return] + def cancel_submission(self, submission_id: SubmissionId) -> None: + """ + Cancel a specific submission that is in progress. + + Returns None if the submission was successfully cancelled. + + Raises: + - `SubmissionNotCancellableError` if the submission could not be + cancelled because it was already completed, failed or cancelled. + - `SubmissionNotFoundError` if the submission could not be found. + """ + self.inner.cancel_submission(submission_id) + def get_submission_status( self, submission_id: SubmissionId ) -> SubmissionStatus | None: diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index caa5128..5a1dabb 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -318,6 +318,19 @@ impl From for SubmissionFailed { } } +impl From for SubmissionCancelled { + fn from(value: opsqueue::common::submission::SubmissionCancelled) -> Self { + Self { + id: value.id.into(), + chunks_total: value.chunks_total.into(), + chunks_done: value.chunks_done.into(), + metadata: value.metadata, + strategic_metadata: value.strategic_metadata, + cancelled_at: value.cancelled_at, + } + } +} + #[pyclass(frozen, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub enum SubmissionStatus { @@ -331,6 +344,9 @@ pub enum SubmissionStatus { submission: SubmissionFailed, chunk: ChunkFailed, }, + Cancelled { + submission: SubmissionCancelled, + }, } impl From for SubmissionStatus { @@ -348,10 +364,20 @@ impl From for SubmissionStatus { let submission = s.into(); SubmissionStatus::Failed { submission, chunk } } + Cancelled(s) => SubmissionStatus::Cancelled { + submission: s.into(), + }, } } } +#[pymethods] +impl SubmissionStatus { + fn __repr__(&self) -> String { + format!("{self:?}") + } +} + #[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct Submission { @@ -385,13 +411,6 @@ impl Submission { } } -#[pymethods] -impl SubmissionStatus { - fn __repr__(&self) -> String { - format!("{self:?}") - } -} - #[pymethods] impl SubmissionCompleted { fn __repr__(&self) -> String { @@ -414,6 +433,14 @@ impl SubmissionFailed { } } +#[pymethods] +impl SubmissionCancelled { + fn __repr__(&self) -> String { + format!("SubmissionCancelled(id={0}, chunks_total={1}, chunks_done={2}, metadata={3:?}, strategic_metadata={4:?}, cancelled_at={5})", + self.id.__repr__(), self.chunks_total, self.chunks_done, self.metadata, self.strategic_metadata, self.cancelled_at) + } +} + #[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct SubmissionCompleted { @@ -435,6 +462,62 @@ pub struct SubmissionFailed { pub failed_chunk_id: u64, } +#[pyclass(frozen, get_all, module = "opsqueue")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SubmissionCancelled { + pub id: SubmissionId, + pub chunks_total: u64, + pub chunks_done: u64, + pub metadata: Option, + pub strategic_metadata: Option, + pub cancelled_at: DateTime, +} + +/// Submission could not be cancelled because it was already completed, failed +/// or cancelled. +#[pyclass(frozen, module = "opsqueue")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SubmissionNotCancellable { + Completed { + submission: SubmissionCompleted, + }, + Failed { + submission: SubmissionFailed, + chunk: ChunkFailed, + }, + Cancelled { + submission: SubmissionCancelled, + }, +} + +impl From for SubmissionNotCancellable { + fn from(value: opsqueue::common::errors::SubmissionNotCancellable) -> Self { + use opsqueue::common::errors::SubmissionNotCancellable::*; + match value { + Completed(s) => SubmissionNotCancellable::Completed { + submission: s.into(), + }, + Failed(s, c) => { + let chunk = ChunkFailed::from_internal(c, &s); + SubmissionNotCancellable::Failed { + submission: s.into(), + chunk, + } + } + Cancelled(s) => SubmissionNotCancellable::Cancelled { + submission: s.into(), + }, + } + } +} + +#[pymethods] +impl SubmissionNotCancellable { + fn __repr__(&self) -> String { + format!("{self:?}") + } +} + pub async fn run_unless_interrupted( future: impl IntoFuture>, ) -> Result diff --git a/libs/opsqueue_python/src/errors.rs b/libs/opsqueue_python/src/errors.rs index 0c99f63..f0c6850 100644 --- a/libs/opsqueue_python/src/errors.rs +++ b/libs/opsqueue_python/src/errors.rs @@ -1,14 +1,16 @@ -/// NOTE: We defne the potentially raisable errors/exceptions in Python +/// NOTE: We define the potentially raisable errors/exceptions in Python /// so we have nice IDE support for docs-on-hover and for 'go to definition'. use std::error::Error; use opsqueue::common::chunk::ChunkId; use opsqueue::common::errors::{ - ChunkNotFound, IncorrectUsage, SubmissionNotFound, UnexpectedOpsqueueConsumerServerResponse, E, + ChunkNotFound, IncorrectUsage, SubmissionNotCancellable, SubmissionNotFound, + UnexpectedOpsqueueConsumerServerResponse, E, }; use pyo3::exceptions::PyBaseException; use pyo3::{import_exception, Bound, PyErr, Python}; +use crate::common; use crate::common::{ChunkIndex, SubmissionId}; // Expected errors: @@ -19,6 +21,7 @@ import_exception!(opsqueue.exceptions, IncorrectUsageError); import_exception!(opsqueue.exceptions, TryFromIntError); import_exception!(opsqueue.exceptions, ChunkNotFoundError); import_exception!(opsqueue.exceptions, SubmissionNotFoundError); +import_exception!(opsqueue.exceptions, SubmissionNotCancellableError); import_exception!(opsqueue.exceptions, NewObjectStoreClientError); import_exception!(opsqueue.exceptions, SubmissionNotCompletedYetError); @@ -123,10 +126,23 @@ impl From>> for PyErr { } } +impl From> for PyErr { + fn from(value: CError) -> Self { + let c: Option = match &value.0 { + opsqueue::common::errors::SubmissionNotCancellable::Failed(submission, chunk) => Some( + common::ChunkFailed::from_internal(chunk.clone(), submission), + ), + _ => None, + }; + let s: common::SubmissionNotCancellable = value.0.into(); + SubmissionNotCancellableError::new_err((s, c)) + } +} + impl From> for PyErr { fn from(value: CError) -> Self { let submission_id = value.0 .0; - SubmissionNotFoundError::new_err((value.0.to_string(), SubmissionId::from(submission_id))) + SubmissionNotFoundError::new_err(u64::from(submission_id)) } } diff --git a/libs/opsqueue_python/src/lib.rs b/libs/opsqueue_python/src/lib.rs index 8800502..2c07889 100644 --- a/libs/opsqueue_python/src/lib.rs +++ b/libs/opsqueue_python/src/lib.rs @@ -23,6 +23,8 @@ fn opsqueue_internal(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index 2ea6078..dc1a909 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -10,6 +10,7 @@ use pyo3::{ use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use opsqueue::{ common::errors::E::{self, L, R}, + common::errors::{SubmissionNotCancellable, SubmissionNotFound}, object_store::{ChunksStorageError, NewObjectStoreClientError}, producer::client::{Client as ActualClient, InternalProducerClientError}, }; @@ -118,6 +119,34 @@ impl ProducerClient { }) } + /// Cancel a submission. + /// + /// Will return an error if the submission is already complete, failed, or + /// cancelled, or if the submission could not be found. + pub fn cancel_submission( + &self, + py: Python<'_>, + id: SubmissionId, + ) -> CPyResult< + (), + E< + FatalPythonException, + E>, + >, + > { + py.allow_threads(|| { + self.block_unless_interrupted(async { + self.producer_client + .cancel_submission(id.into()) + .await + .map_err(|e| CError(R(e))) + }) + // TODO ? + // .map(|opt| opt.map(Into::into)) + // .map_err(|e| ProducerClientError::new_err(e.to_string())) + }) + } + /// Retrieve the status (in progress, completed or failed) of a specific submission. /// /// The returned SubmissionStatus object also includes the number of chunks finished so far, diff --git a/libs/opsqueue_python/tests/test_roundtrip.py b/libs/opsqueue_python/tests/test_roundtrip.py index e13bcac..0ec2d22 100644 --- a/libs/opsqueue_python/tests/test_roundtrip.py +++ b/libs/opsqueue_python/tests/test_roundtrip.py @@ -4,11 +4,14 @@ from collections.abc import Iterator, Sequence from opsqueue.producer import ( + SubmissionId, ProducerClient, SubmissionCompleted, SubmissionFailed, ChunkFailed, SubmissionFailedError, + SubmissionNotFoundError, + SubmissionNotCancellableError, ) from opsqueue.consumer import ConsumerClient, Chunk from opsqueue.common import SerializationFormat @@ -302,7 +305,7 @@ def run_consumer() -> None: strategy = strategy_from_description(any_consumer_strategy) consumer_client.run_each_op(lambda x: x, strategy=strategy) - with background_process(run_consumer) as _consumer: + with background_process(run_consumer): # Wait for the submission to complete. producer_client.blocking_stream_completed_submission(submission_id) submission = producer_client.get_submission_status(submission_id) @@ -339,7 +342,7 @@ def consume(x: int) -> None: strategy = strategy_from_description(any_consumer_strategy) consumer_client.run_each_op(consume, strategy=strategy) - with background_process(run_consumer) as _consumer: + with background_process(run_consumer): def assert_submission_failed_has_metadata(x: SubmissionFailed) -> None: assert isinstance(x, SubmissionFailed) @@ -354,3 +357,117 @@ def assert_submission_failed_has_metadata(x: SubmissionFailed) -> None: submission = producer_client.get_submission_status(submission_id) assert submission is not None assert_submission_failed_has_metadata(submission.submission) + + +def test_cancel_submission_not_found( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Attempting to cancel a submission that doesn't exist raises a + SubmissionNotFoundError. + + """ + url = "file:///tmp/opsqueue/test_cancel_submission_not_found" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = 0 + with pytest.raises(SubmissionNotFoundError) as exc_info: + producer_client.cancel_submission(SubmissionId(0)) + assert exc_info.value.submission_id == submission_id + + +def test_cancel_in_progress_and_already_cancelled_submissions( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Cancelling a submission that is in progress succeeds and returns none. + Attempting to cancel a submission that is already cancelled should raises a + SubmissionNotCancellableError. + + """ + + url = ( + "file:///tmp/opsqueue/test_cancel_in_progress_and_already_cancelled_submissions" + ) + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) + # Sanity check submission is in progress before proceeding to cancel. + assert ( + type(producer_client.get_submission_status(submission_id)).__name__ + == "SubmissionStatus_InProgress" + ) + # Cancelling an in progress submission should change submission status to + # cancelled. + producer_client.cancel_submission(submission_id) + assert ( + type(producer_client.get_submission_status(submission_id)).__name__ + == "SubmissionStatus_Cancelled" + ) + # Cancelling an already cancelled submission should fail. + with pytest.raises(SubmissionNotCancellableError) as exc_info: + producer_client.cancel_submission(submission_id) + assert ( + type(exc_info.value.submission).__name__ == "SubmissionNotCancellable_Cancelled" + ) + + +def test_cancel_complete_submission( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Attempting to cancel a submission that has already completed should raise + a SubmissionNotCancellableError. + + """ + url = "file:///tmp/opsqueue/test_cancel_complete_submission" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) + + def run_consumer() -> None: + consumer_client = ConsumerClient(f"localhost:{opsqueue.port}", url) + strategy = strategy_from_description(any_consumer_strategy) + consumer_client.run_each_op(lambda x: x, strategy=strategy) + + with background_process(run_consumer): + # Wait for the submission to complete. + producer_client.blocking_stream_completed_submission(submission_id) + submission = producer_client.get_submission_status(submission_id) + assert submission is not None + assert isinstance(submission.submission, SubmissionCompleted) + # Cancelling the already completed submission should fail. + with pytest.raises(SubmissionNotCancellableError) as exc_info: + producer_client.cancel_submission(submission_id) + assert ( + type(exc_info.value.submission).__name__ + == "SubmissionNotCancellable_Completed" + ) + assert exc_info.value.chunk is None + + +def test_cancel_failed_submission( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: + """Attempting to cancel a submission that has failed should raise a + SubmissionNotCancellableError. + + """ + url = "file:///tmp/opsqueue/test_cancel_failed_submission" + producer_client = ProducerClient(f"localhost:{opsqueue.port}", url) + submission_id = producer_client.insert_submission((1, 2, 3), chunk_size=1) + + def run_consumer() -> None: + consumer_client = ConsumerClient(f"localhost:{opsqueue.port}", url) + + def consume(x: int) -> None: + raise ValueError(f"Couldn't consume {x}") + + strategy = strategy_from_description(any_consumer_strategy) + consumer_client.run_each_op(consume, strategy=strategy) + + with background_process(run_consumer): + with pytest.raises(SubmissionFailedError): + producer_client.blocking_stream_completed_submission(submission_id) + # Cancelling the failed submission should fail. + with pytest.raises(SubmissionNotCancellableError) as exc_info: + producer_client.cancel_submission(submission_id) + assert ( + type(exc_info.value.submission).__name__ + == "SubmissionNotCancellable_Failed" + ) + assert isinstance(exc_info.value.chunk, ChunkFailed) diff --git a/opsqueue/migrations/20260225164600_submissions_cancelled.sql b/opsqueue/migrations/20260225164600_submissions_cancelled.sql new file mode 100644 index 0000000..34de8a7 --- /dev/null +++ b/opsqueue/migrations/20260225164600_submissions_cancelled.sql @@ -0,0 +1,9 @@ +CREATE TABLE submissions_cancelled +( + id BIGINT PRIMARY KEY NOT NULL, + prefix TEXT, + chunks_total INTEGER NOT NULL DEFAULT 0, + chunks_done INTEGER NOT NULL DEFAULT 0, + metadata BLOB, + cancelled_at DATETIME NOT NULL -- Unix Timestamp +); diff --git a/opsqueue/opsqueue_example_database_schema.db b/opsqueue/opsqueue_example_database_schema.db index bced270..4d6ad39 100644 Binary files a/opsqueue/opsqueue_example_database_schema.db and b/opsqueue/opsqueue_example_database_schema.db differ diff --git a/opsqueue/src/common/errors.rs b/opsqueue/src/common/errors.rs index 38ee007..f30436e 100644 --- a/opsqueue/src/common/errors.rs +++ b/opsqueue/src/common/errors.rs @@ -11,7 +11,10 @@ use thiserror::Error; use crate::consumer::common::SyncServerToClientResponse; -use super::{chunk::ChunkId, submission::SubmissionId}; +use super::{ + chunk::{ChunkFailed, ChunkId}, + submission::{SubmissionCancelled, SubmissionCompleted, SubmissionFailed, SubmissionId}, +}; // #[derive(Error, Debug, Clone, Serialize, Deserialize)] // #[error("Low-level database error: {0:?}")] @@ -32,10 +35,20 @@ impl From for E { #[error("Chunk not found for ID {0:?}")] pub struct ChunkNotFound(pub ChunkId); -#[derive(Error, Debug)] +#[derive(Error, Debug, Deserialize, Serialize)] #[error("Submission not found for ID {0:?}")] pub struct SubmissionNotFound(pub SubmissionId); +/// Submission could not be cancelled because it was already completed, failed +/// or cancelled. +#[derive(Error, Debug, Deserialize, Serialize)] +#[error("Submission could not be cancelled {0:?}")] +pub enum SubmissionNotCancellable { + Completed(SubmissionCompleted), + Failed(SubmissionFailed, ChunkFailed), + Cancelled(SubmissionCancelled), +} + #[derive(Error, Debug)] #[error("Unexpected opsqueue consumer server response. This indicates an error inside Opsqueue itself: {0:?}")] pub struct UnexpectedOpsqueueConsumerServerResponse(pub SyncServerToClientResponse); diff --git a/opsqueue/src/common/submission.rs b/opsqueue/src/common/submission.rs index 497d77d..dc87474 100644 --- a/opsqueue/src/common/submission.rs +++ b/opsqueue/src/common/submission.rs @@ -183,11 +183,27 @@ pub struct SubmissionFailed { pub otel_trace_carrier: String, } +/// A submission that has been cancelled. +/// +/// Once a submission is cancelled, it gets moved to the `submissions_cancelled` +/// table, and its old `submissions` record gets deleted. +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct SubmissionCancelled { + pub id: SubmissionId, + pub prefix: Option, + pub chunks_total: ChunkCount, + pub chunks_done: ChunkCount, + pub metadata: Option, + pub strategic_metadata: Option, + pub cancelled_at: DateTime, +} + #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub enum SubmissionStatus { InProgress(Submission), Completed(SubmissionCompleted), Failed(SubmissionFailed, ChunkFailed), + Cancelled(SubmissionCancelled), } impl Default for Submission { @@ -244,7 +260,7 @@ impl Submission { pub mod db { use crate::{ common::{ - errors::{DatabaseError, SubmissionNotFound, E}, + errors::{DatabaseError, SubmissionNotCancellable, SubmissionNotFound, E}, StrategicMetadataMap, }, db::{Connection, True, WriterConnection, WriterPool}, @@ -607,6 +623,39 @@ pub mod db { failed_chunk, ))); } + + let cancelled_row_opt = query!( + r#" + SELECT + id AS "id: SubmissionId" + , prefix + , chunks_total AS "chunks_total: ChunkCount" + , chunks_done AS "chunks_done: ChunkCount" + , metadata + , ( SELECT json_group_object(metadata_key, metadata_value) + FROM submissions_metadata + WHERE submission_id = submissions_cancelled.id + ) AS "strategic_metadata: sqlx::types::Json" + , cancelled_at AS "cancelled_at: DateTime" + FROM submissions_cancelled WHERE id = $1 + "#, + id + ) + .fetch_optional(conn.get_inner()) + .await?; + if let Some(row) = cancelled_row_opt { + let cancelled_submission = SubmissionCancelled { + id: row.id, + prefix: row.prefix, + chunks_total: row.chunks_total, + chunks_done: row.chunks_done, + metadata: row.metadata, + strategic_metadata: row.strategic_metadata.map(|json| json.0), + cancelled_at: row.cancelled_at, + }; + return Ok(Some(SubmissionStatus::Cancelled(cancelled_submission))); + } + Ok(None) } @@ -634,6 +683,85 @@ pub mod db { .await } + #[tracing::instrument(skip(conn))] + pub async fn cancel_submission( + id: SubmissionId, + mut conn: impl WriterConnection, + ) -> Result<(), E>> { + conn.transaction(move |mut tx| { + Box::pin(async move { + match cancel_submission_notx(id, &mut tx).await { + Ok(()) => Ok(()), + Err(E::L(db_err)) => Err(E::L(db_err)), + Err(E::R(not_found_err)) => { + // Submission could not be found, let's check the status + // in order to return a more informative error. + match submission_status(id, &mut tx).await { + Ok(None) => Err(E::R(E::L(not_found_err))), + Ok(Some(SubmissionStatus::InProgress(submission))) => { + panic!("Failed to cancel in progress submission {:?}", submission) + } + Ok(Some(SubmissionStatus::Completed(submission))) => { + Err(E::R(E::R(SubmissionNotCancellable::Completed(submission)))) + } + Ok(Some(SubmissionStatus::Failed(submission, chunk))) => Err(E::R( + E::R(SubmissionNotCancellable::Failed(submission, chunk)), + )), + Ok(Some(SubmissionStatus::Cancelled(submission))) => { + Err(E::R(E::R(SubmissionNotCancellable::Cancelled(submission)))) + } + Err(db_err) => Err(E::L(db_err)), + } + } + } + }) + }) + .await + } + + /// Do not call directly! Must be called inside a transaction. + pub async fn cancel_submission_notx( + id: SubmissionId, + mut conn: impl WriterConnection, + ) -> Result<(), E> { + cancel_submission_raw(id, &mut conn).await?; + super::chunk::db::skip_remaining_chunks(id, conn).await?; + Ok(()) + } + + #[tracing::instrument(skip(conn))] + pub(super) async fn cancel_submission_raw( + id: SubmissionId, + mut conn: impl WriterConnection, + ) -> Result<(), E> { + let now = chrono::prelude::Utc::now(); + + let submission_opt = query!( + " + INSERT INTO submissions_cancelled + (id, chunks_total, prefix, metadata, cancelled_at, chunks_done) + SELECT id, chunks_total, prefix, metadata, julianday($1), chunks_done FROM submissions WHERE id = $2; + + DELETE FROM submissions WHERE id = $3 RETURNING *; + ", + now, + id, + id, + ) + .fetch_optional(conn.get_inner()) + .await?; + histogram!(crate::prometheus::SUBMISSIONS_DURATION_CANCEL_HISTOGRAM).record( + crate::prometheus::time_delta_as_f64(Utc::now() - id.timestamp()), + ); + match submission_opt { + None => Err(E::R(SubmissionNotFound(id))), + Some(_) => { + counter!(crate::prometheus::SUBMISSIONS_CANCELLED_COUNTER).increment(1); + Ok(()) + } + } + } + #[tracing::instrument(skip(conn))] /// Do not call directly! MUST be called inside a transaction. pub(super) async fn complete_submission_raw( @@ -774,7 +902,7 @@ pub mod db { // Clean up old submissions_metadata query!( "DELETE FROM submissions_metadata - WHERE submission_id = ( + WHERE submission_id IN ( SELECT id FROM submissions_completed WHERE completed_at < julianday($1) );", older_than @@ -783,13 +911,22 @@ pub mod db { .await?; query!( "DELETE FROM submissions_metadata - WHERE submission_id = ( + WHERE submission_id IN ( SELECT id FROM submissions_failed WHERE failed_at < julianday($1) );", older_than ) .execute(tx.get_inner()) .await?; + query!( + "DELETE FROM submissions_metadata + WHERE submission_id IN ( + SELECT id FROM submissions_cancelled WHERE cancelled_at < julianday($1) + );", + older_than + ) + .execute(tx.get_inner()) + .await?; // Clean up old submissions: let n_submissions_completed = query!( @@ -804,6 +941,12 @@ pub mod db { ) .execute(tx.get_inner()) .await?.rows_affected(); + let n_submissions_cancelled = query!( + "DELETE FROM submissions_cancelled WHERE cancelled_at < julianday($1);", + older_than + ) + .execute(tx.get_inner()) + .await?.rows_affected(); let n_chunks_completed = query!( "DELETE FROM chunks_completed WHERE completed_at < julianday($1);", @@ -818,8 +961,9 @@ pub mod db { .execute(tx.get_inner()) .await?.rows_affected(); - tracing::info!("Deleted {n_submissions_completed} completed submissions (with {n_chunks_completed} chunks)"); - tracing::info!("Deleted {n_submissions_failed} failed submissions (with {n_chunks_failed} chunks)"); + tracing::info!("Deleted {n_submissions_completed} completed submissions (with {n_chunks_completed} chunks completed)"); + tracing::info!("Deleted {n_submissions_failed} failed submissions (with {n_chunks_failed} chunks failed)"); + tracing::info!("Deleted {n_submissions_cancelled} cancelled submissions"); Ok(()) }) }) diff --git a/opsqueue/src/producer/client.rs b/opsqueue/src/producer/client.rs index 0b94b66..70bbe5c 100644 --- a/opsqueue/src/producer/client.rs +++ b/opsqueue/src/producer/client.rs @@ -3,7 +3,10 @@ use std::time::Duration; use backon::BackoffBuilder; use backon::FibonacciBuilder; use backon::Retryable; +use http::StatusCode; +use crate::common::errors; +use crate::common::errors::E; use crate::common::submission::{SubmissionId, SubmissionStatus}; use crate::tracing::CarrierMap; @@ -107,6 +110,71 @@ impl Client { .await } + /// Send a HTTP request to the OpsQueue server to cancel a submission. + /// + /// Will return an error if the submission is already complete, failed, or + /// cancelled, or if the submission could not be found. + pub async fn cancel_submission( + &self, + submission_id: SubmissionId, + ) -> Result< + (), + E< + errors::SubmissionNotFound, + E, + >, + > { + (|| async { + let base_url = &self.base_url; + let response = self + .http_client + .post(format!("{base_url}/submissions/cancel/{submission_id}")) + .send() + .await + .map_err(|e| E::R(E::R(e.into())))?; + let status = response.status(); + // 200, the submission was successfully cancelled. + if status.is_success() { + return Ok(()); + } + // 404, the submission could not be found. + if status == StatusCode::NOT_FOUND { + let not_found_err = response + .json::() + .await + .map_err(|e| E::R(E::R(e.into())))?; + return Err(E::<_, E<_, InternalProducerClientError>>::L(not_found_err)); + } + // 409, the submission could not be cancelled. + if status == StatusCode::CONFLICT { + let not_cancellable_err = response + .json::() + .await + .map_err(|e| E::R(E::R(e.into())))?; + return Err(E::<_, E<_, InternalProducerClientError>>::R(E::L( + not_cancellable_err, + ))); + } + response + .error_for_status() + .map_err(|e| E::R(E::R(e.into())))?; + panic!( + "Unexpected {:?} from Opsqueue when cancelling a submission", + status + ) + }) + .retry(retry_policy()) + .when(|e| match e { + E::L(_) => false, + E::R(E::L(_)) => false, + E::R(E::R(client_err)) => client_err.is_ephemeral(), + }) + .notify(|err, dur| { + tracing::debug!("retrying error {err:?} with sleeping {dur:?}"); + }) + .await + } + /// Get the status of an existing submission identified by its `submission_id`. /// /// This uses the GET `/producer/submissions` endpoint. @@ -343,7 +411,9 @@ mod tests { .expect("Should be OK") .expect("Should be Some"); match status { - SubmissionStatus::Completed(_) | SubmissionStatus::Failed(_, _) => { + SubmissionStatus::Completed(_) + | SubmissionStatus::Failed(_, _) + | SubmissionStatus::Cancelled(_) => { panic!("Expected a SubmissionStatus that is still Inprogress, got: {status:?}"); } SubmissionStatus::InProgress(submission) => { diff --git a/opsqueue/src/producer/server.rs b/opsqueue/src/producer/server.rs index d32fee1..ad144e0 100644 --- a/opsqueue/src/producer/server.rs +++ b/opsqueue/src/producer/server.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::common::errors::E; use crate::common::submission::{self, SubmissionId}; use crate::db::DBPools; use axum::extract::{Path, State}; @@ -46,6 +47,10 @@ impl ServerState { pub fn build_router(self) -> Router<()> { Router::new() .route("/submissions", post(insert_submission)) + .route( + "/submissions/cancel/{submission_id}", + post(cancel_submission), + ) .route( "/submissions/count_completed", get(submissions_count_completed), @@ -85,6 +90,31 @@ where } } +// 200 if the submission was successfully cancelled. +// 404 if the submission could not be found. +// 409 if the submission could not be cancelled. +// 500 if a DatabaseError occurred. +async fn cancel_submission( + State(state): State, + Path(submission_id): Path, +) -> Result<(), Response> { + let mut conn = state + .pool + .writer_conn() + .await + .map_err(|e| ServerError(e.into()).into_response())?; + match submission::db::cancel_submission(submission_id, &mut conn).await { + Ok(_) => Ok(()), + Err(E::L(db_err)) => Err(ServerError(db_err.into()).into_response()), + Err(E::R(E::L(not_found_err))) => { + Err((StatusCode::NOT_FOUND, Json(not_found_err)).into_response()) + } + Err(E::R(E::R(not_cancellable_err))) => { + Err((StatusCode::CONFLICT, Json(not_cancellable_err)).into_response()) + } + } +} + async fn submission_status( State(state): State, Path(submission_id): Path, diff --git a/opsqueue/src/prometheus.rs b/opsqueue/src/prometheus.rs index 74d7514..8340168 100644 --- a/opsqueue/src/prometheus.rs +++ b/opsqueue/src/prometheus.rs @@ -17,8 +17,10 @@ use crate::db::DBPools; pub const SUBMISSIONS_TOTAL_COUNTER: &str = "submissions_total_count"; pub const SUBMISSIONS_COMPLETED_COUNTER: &str = "submissions_completed_count"; pub const SUBMISSIONS_FAILED_COUNTER: &str = "submissions_failed_count"; +pub const SUBMISSIONS_CANCELLED_COUNTER: &str = "submissions_cancelled_count"; pub const SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM: &str = "submissions_complete_duration_seconds"; pub const SUBMISSIONS_DURATION_FAIL_HISTOGRAM: &str = "submissions_fail_duration_seconds"; +pub const SUBMISSIONS_DURATION_CANCEL_HISTOGRAM: &str = "submissions_cancel_duration_seconds"; pub const CHUNKS_COMPLETED_COUNTER: &str = "chunks_completed_count"; pub const CHUNKS_FAILED_COUNTER: &str = "chunks_failed_count"; @@ -54,7 +56,14 @@ pub fn describe_metrics() { Unit::Count, "Number of submissions failed permanently" ); + describe_counter!( + SUBMISSIONS_CANCELLED_COUNTER, + Unit::Count, + "Number of submissions cancelled permanently" + ); describe_histogram!(SUBMISSIONS_DURATION_COMPLETE_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its final chunk being completed. Does not count failed submissions."); + describe_histogram!(SUBMISSIONS_DURATION_FAIL_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its first chunk being failed."); + describe_histogram!(SUBMISSIONS_DURATION_CANCEL_HISTOGRAM, Unit::Seconds, "Time between a submission entering the system and its first chunk being canceled."); describe_counter!( CHUNKS_COMPLETED_COUNTER,