From 5ef1842ba0318f28ea47e3d9b7aadfb1872e0294 Mon Sep 17 00:00:00 2001 From: Raphael Date: Thu, 23 Apr 2026 18:58:42 +0200 Subject: [PATCH 1/2] feat: implemented taurus emitter pattern --- adapter/rest/src/main.rs | 47 ++++-------- crates/base/src/store.rs | 158 ++++++++++++++++++++++++++++++++++----- 2 files changed, 157 insertions(+), 48 deletions(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 286cba7..297351e 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -1,6 +1,6 @@ use base::{ runner::{ServerContext, ServerRunner}, - store::FlowIdentifyResult, + store::{FlowExecutionResult, FlowIdentifyResult}, traits::Server as ServerTrait, }; use http_body_util::{BodyExt, Full}; @@ -18,8 +18,7 @@ use tokio::net::TcpListener; use tonic::async_trait; use tucana::shared::{ AdapterConfiguration, RuntimeFeature, Struct, Translation, ValidationFlow, Value, - helper::{path::get_string, value::ToValue}, - value::Kind, + helper::value::ToValue, value::Kind, }; use crate::response::{error_to_http_response, value_to_http_response}; @@ -79,36 +78,22 @@ async fn execute_flow_to_hyper_response( body: Value, store: Arc, ) -> Response> { - match store.validate_and_execute_flow(flow, Some(body)).await { - Some(result) => { - log::debug!("Received Result: {:?}", result); - - if let Value { - kind: - Some(Kind::StructValue(Struct { - fields: result_fields, - })), - } = &result - && result_fields.contains_key("name") - && result_fields.contains_key("message") - && !result_fields.contains_key("payload") - && !result_fields.contains_key("headers") - { - log::debug!("Detected a RuntimeError"); - let name = get_string("name", &result); - let message = get_string("message", &result); - - return error_to_http_response( - StatusCode::INTERNAL_SERVER_ERROR, - format!("{}: {}", name.unwrap(), message.unwrap()).as_str(), - ); - } - + match store.execute_flow_with_emitter(flow, Some(body)).await { + FlowExecutionResult::Ongoing(result) => { + log::debug!("Received first ongoing response from emitter"); value_to_http_response(result) } - None => { - log::error!("flow execution failed"); - error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed") + FlowExecutionResult::Failed => { + log::error!("Flow execution failed event received from emitter"); + error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error") + } + FlowExecutionResult::FinishedWithoutOngoing => Response::builder() + .status(StatusCode::CREATED) + .body(Full::new(Bytes::new())) + .unwrap(), + FlowExecutionResult::TransportError => { + log::error!("Flow execution transport error"); + error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error") } } } diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index a057e0f..5d63244 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -2,7 +2,13 @@ use crate::traits::IdentifiableFlow; use async_nats::jetstream::kv::Config; use futures_lite::StreamExt; use prost::Message; -use tucana::shared::{ExecutionFlow, ValidationFlow, Value}; +use tucana::shared::{ + ExecutionFlow, Struct, ValidationFlow, Value, + value::Kind::{self, StructValue}, +}; + +const EMITTER_TOPIC_PREFIX: &str = "runtime.emitter"; +const EMITTER_WAIT_TIMEOUT_SECONDS: u64 = 30; pub struct AdapterStore { client: async_nats::Client, @@ -15,6 +21,13 @@ pub enum FlowIdentifyResult { Multiple(Vec), } +pub enum FlowExecutionResult { + Ongoing(Value), + Failed, + FinishedWithoutOngoing, + TransportError, +} + impl AdapterStore { pub async fn from_url(url: String, bucket: String) -> Self { let client = match async_nats::connect(url).await { @@ -118,32 +131,109 @@ impl AdapterStore { flow: ValidationFlow, input_value: Option, ) -> Option { + match self.execute_flow_with_emitter(flow, input_value).await { + FlowExecutionResult::Ongoing(value) => Some(value), + FlowExecutionResult::Failed + | FlowExecutionResult::FinishedWithoutOngoing + | FlowExecutionResult::TransportError => None, + } + } + + pub async fn execute_flow_with_emitter( + &self, + flow: ValidationFlow, + input_value: Option, + ) -> FlowExecutionResult { // TODO: Replace body vaidation with triangulus when its ready - let uuid = uuid::Uuid::new_v4().to_string(); + let execution_id = uuid::Uuid::new_v4().to_string(); let flow_id = flow.flow_id; let execution_flow: ExecutionFlow = Self::convert_validation_flow(flow, input_value.clone()); let bytes = execution_flow.encode_to_vec(); - let topic = format!("execution.{}", uuid); + let execution_topic = format!("execution.{}", execution_id); + let emitter_topic = format!("{}.{}", EMITTER_TOPIC_PREFIX, execution_id); + log::info!( "Requesting execution of flow {} with execution id {}", flow_id, - uuid + execution_id ); - log::debug!("Flow Input for Execution ({}) is {:?}", uuid, input_value); - let result = self.client.request(topic, bytes.into()).await; - - match result { - Ok(message) => match Value::decode(message.payload) { - Ok(value) => Some(value), - Err(err) => { - log::error!("Failed to decode response from NATS server: {:?}", err); - None - } - }, + log::debug!( + "Flow Input for Execution ({}) is {:?}", + execution_id, + input_value + ); + + let mut subscriber = match self.client.subscribe(emitter_topic.clone()).await { + Ok(subscriber) => subscriber, Err(err) => { - log::error!("Failed to send request to NATS server: {:?}", err); - None + log::error!( + "Failed to subscribe to emitter topic '{}' for flow {}: {:?}", + emitter_topic, + flow_id, + err + ); + return FlowExecutionResult::TransportError; + } + }; + + if let Err(err) = self + .client + .publish(execution_topic.clone(), bytes.into()) + .await + { + log::error!( + "Failed to publish flow {} to execution topic '{}': {:?}", + flow_id, + execution_topic, + err + ); + return FlowExecutionResult::TransportError; + } + + loop { + let next_message = tokio::time::timeout( + std::time::Duration::from_secs(EMITTER_WAIT_TIMEOUT_SECONDS), + subscriber.next(), + ) + .await; + + let message = match next_message { + Ok(Some(message)) => message, + Ok(None) => { + log::error!( + "Emitter subscription '{}' closed before execution completed", + emitter_topic + ); + return FlowExecutionResult::TransportError; + } + Err(_) => { + log::error!( + "Timed out waiting for emitter events on '{}' after {}s", + emitter_topic, + EMITTER_WAIT_TIMEOUT_SECONDS + ); + return FlowExecutionResult::TransportError; + } + }; + + let Some((emit_type, payload)) = Self::decode_emit_message(message.payload.as_ref()) + else { + continue; + }; + + match emit_type.as_str() { + "starting" => {} + "ongoing" => return FlowExecutionResult::Ongoing(payload), + "failed" => return FlowExecutionResult::Failed, + "finished" => return FlowExecutionResult::FinishedWithoutOngoing, + other => { + log::warn!( + "Received unknown emitter event '{}' on '{}'", + other, + emitter_topic + ); + } } } } @@ -174,4 +264,38 @@ impl AdapterStore { } true } + + fn decode_emit_message(bytes: &[u8]) -> Option<(String, Value)> { + let decoded = match Value::decode(bytes) { + Ok(value) => value, + Err(err) => { + log::error!("Failed to decode emitter payload: {:?}", err); + return None; + } + }; + + let Value { + kind: Some(StructValue(Struct { fields })), + } = decoded + else { + log::warn!("Emitter payload was not a struct value"); + return None; + }; + + let Some(emit_type) = fields.get("emit_type") else { + log::warn!("Emitter payload is missing 'emit_type'"); + return None; + }; + let Some(payload) = fields.get("payload") else { + log::warn!("Emitter payload is missing 'payload'"); + return None; + }; + + let Some(Kind::StringValue(emit_type_str)) = emit_type.kind.as_ref() else { + log::warn!("Emitter payload field 'emit_type' was not a string"); + return None; + }; + + Some((emit_type_str.clone(), payload.clone())) + } } From b2e4ba9d728d28eeeaa7de5bf917a6a0ce1875b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Thu, 23 Apr 2026 19:11:24 +0200 Subject: [PATCH 2/2] Update adapter/rest/src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- adapter/rest/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 297351e..9cd333c 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -88,7 +88,7 @@ async fn execute_flow_to_hyper_response( error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error") } FlowExecutionResult::FinishedWithoutOngoing => Response::builder() - .status(StatusCode::CREATED) + .status(StatusCode::NO_CONTENT) .body(Full::new(Bytes::new())) .unwrap(), FlowExecutionResult::TransportError => {