Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 16 additions & 31 deletions adapter/rest/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use base::{
runner::{ServerContext, ServerRunner},
store::FlowIdentifyResult,
store::{FlowExecutionResult, FlowIdentifyResult},
traits::Server as ServerTrait,
};
use http_body_util::{BodyExt, Full};
Expand All @@ -18,8 +18,7 @@ use tokio::net::TcpListener;
use tonic::async_trait;
use tucana::shared::{
AdapterConfiguration, RuntimeFeature, Struct, Translation, ValidationFlow, Value,
helper::{path::get_string, value::ToValue},
value::Kind,
helper::value::ToValue, value::Kind,
};

use crate::response::{error_to_http_response, value_to_http_response};
Expand Down Expand Up @@ -79,36 +78,22 @@ async fn execute_flow_to_hyper_response(
body: Value,
store: Arc<base::store::AdapterStore>,
) -> Response<Full<Bytes>> {
match store.validate_and_execute_flow(flow, Some(body)).await {
Some(result) => {
log::debug!("Received Result: {:?}", result);

if let Value {
kind:
Some(Kind::StructValue(Struct {
fields: result_fields,
})),
} = &result
&& result_fields.contains_key("name")
&& result_fields.contains_key("message")
&& !result_fields.contains_key("payload")
&& !result_fields.contains_key("headers")
{
log::debug!("Detected a RuntimeError");
let name = get_string("name", &result);
let message = get_string("message", &result);

return error_to_http_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("{}: {}", name.unwrap(), message.unwrap()).as_str(),
);
}

match store.execute_flow_with_emitter(flow, Some(body)).await {
FlowExecutionResult::Ongoing(result) => {
log::debug!("Received first ongoing response from emitter");
value_to_http_response(result)
}
None => {
log::error!("flow execution failed");
error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed")
FlowExecutionResult::Failed => {
log::error!("Flow execution failed event received from emitter");
error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error")
}
FlowExecutionResult::FinishedWithoutOngoing => Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Full::new(Bytes::new()))
.unwrap(),
FlowExecutionResult::TransportError => {
log::error!("Flow execution transport error");
error_to_http_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error")
}
}
}
Expand Down
158 changes: 141 additions & 17 deletions crates/base/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ use crate::traits::IdentifiableFlow;
use async_nats::jetstream::kv::Config;
use futures_lite::StreamExt;
use prost::Message;
use tucana::shared::{ExecutionFlow, ValidationFlow, Value};
use tucana::shared::{
ExecutionFlow, Struct, ValidationFlow, Value,
value::Kind::{self, StructValue},
};

const EMITTER_TOPIC_PREFIX: &str = "runtime.emitter";
const EMITTER_WAIT_TIMEOUT_SECONDS: u64 = 30;

pub struct AdapterStore {
client: async_nats::Client,
Expand All @@ -15,6 +21,13 @@ pub enum FlowIdentifyResult {
Multiple(Vec<ValidationFlow>),
}

pub enum FlowExecutionResult {
Ongoing(Value),
Failed,
FinishedWithoutOngoing,
TransportError,
}

impl AdapterStore {
pub async fn from_url(url: String, bucket: String) -> Self {
let client = match async_nats::connect(url).await {
Expand Down Expand Up @@ -118,32 +131,109 @@ impl AdapterStore {
flow: ValidationFlow,
input_value: Option<Value>,
) -> Option<Value> {
match self.execute_flow_with_emitter(flow, input_value).await {
FlowExecutionResult::Ongoing(value) => Some(value),
FlowExecutionResult::Failed
| FlowExecutionResult::FinishedWithoutOngoing
| FlowExecutionResult::TransportError => None,
}
}

pub async fn execute_flow_with_emitter(
&self,
flow: ValidationFlow,
input_value: Option<Value>,
) -> FlowExecutionResult {
// TODO: Replace body vaidation with triangulus when its ready
Comment thread
raphael-goetz marked this conversation as resolved.
let uuid = uuid::Uuid::new_v4().to_string();
let execution_id = uuid::Uuid::new_v4().to_string();
let flow_id = flow.flow_id;
let execution_flow: ExecutionFlow =
Self::convert_validation_flow(flow, input_value.clone());
let bytes = execution_flow.encode_to_vec();
let topic = format!("execution.{}", uuid);
let execution_topic = format!("execution.{}", execution_id);
let emitter_topic = format!("{}.{}", EMITTER_TOPIC_PREFIX, execution_id);

log::info!(
"Requesting execution of flow {} with execution id {}",
flow_id,
uuid
execution_id
);
log::debug!("Flow Input for Execution ({}) is {:?}", uuid, input_value);
let result = self.client.request(topic, bytes.into()).await;

match result {
Ok(message) => match Value::decode(message.payload) {
Ok(value) => Some(value),
Err(err) => {
log::error!("Failed to decode response from NATS server: {:?}", err);
None
}
},
log::debug!(
"Flow Input for Execution ({}) is {:?}",
execution_id,
input_value
);

let mut subscriber = match self.client.subscribe(emitter_topic.clone()).await {
Ok(subscriber) => subscriber,
Err(err) => {
log::error!("Failed to send request to NATS server: {:?}", err);
None
log::error!(
"Failed to subscribe to emitter topic '{}' for flow {}: {:?}",
emitter_topic,
flow_id,
err
);
return FlowExecutionResult::TransportError;
}
};

if let Err(err) = self
.client
.publish(execution_topic.clone(), bytes.into())
.await
{
log::error!(
"Failed to publish flow {} to execution topic '{}': {:?}",
flow_id,
execution_topic,
err
);
return FlowExecutionResult::TransportError;
}

loop {
let next_message = tokio::time::timeout(
std::time::Duration::from_secs(EMITTER_WAIT_TIMEOUT_SECONDS),
subscriber.next(),
)
.await;

let message = match next_message {
Ok(Some(message)) => message,
Ok(None) => {
log::error!(
"Emitter subscription '{}' closed before execution completed",
emitter_topic
);
return FlowExecutionResult::TransportError;
}
Err(_) => {
log::error!(
"Timed out waiting for emitter events on '{}' after {}s",
emitter_topic,
EMITTER_WAIT_TIMEOUT_SECONDS
);
return FlowExecutionResult::TransportError;
Comment thread
raphael-goetz marked this conversation as resolved.
}
};

let Some((emit_type, payload)) = Self::decode_emit_message(message.payload.as_ref())
else {
continue;
};

match emit_type.as_str() {
"starting" => {}
"ongoing" => return FlowExecutionResult::Ongoing(payload),
"failed" => return FlowExecutionResult::Failed,
"finished" => return FlowExecutionResult::FinishedWithoutOngoing,
other => {
log::warn!(
"Received unknown emitter event '{}' on '{}'",
other,
emitter_topic
);
}
}
}
}
Expand Down Expand Up @@ -174,4 +264,38 @@ impl AdapterStore {
}
true
}

fn decode_emit_message(bytes: &[u8]) -> Option<(String, Value)> {
let decoded = match Value::decode(bytes) {
Ok(value) => value,
Err(err) => {
log::error!("Failed to decode emitter payload: {:?}", err);
return None;
}
};

let Value {
kind: Some(StructValue(Struct { fields })),
} = decoded
else {
log::warn!("Emitter payload was not a struct value");
return None;
};

let Some(emit_type) = fields.get("emit_type") else {
log::warn!("Emitter payload is missing 'emit_type'");
return None;
};
let Some(payload) = fields.get("payload") else {
log::warn!("Emitter payload is missing 'payload'");
return None;
};

let Some(Kind::StringValue(emit_type_str)) = emit_type.kind.as_ref() else {
log::warn!("Emitter payload field 'emit_type' was not a string");
return None;
};

Some((emit_type_str.clone(), payload.clone()))
Comment thread
raphael-goetz marked this conversation as resolved.
}
}