Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ rust-version.workspace = true
workspace = true

[dependencies]
tokio = { workspace = true, optional = true }
tokio.workspace = true
async-task = { version = "4.4", default-features = false, optional = true }
spin = { version = "0.9", default-features = false, features = ["mutex", "spin_mutex"], optional = true }
libc = { version = "0.2", optional = true }
Expand All @@ -20,5 +20,5 @@ futures.workspace = true

[features]
default = ["tokio"]
tokio = ["dep:tokio"]
tokio = []
simulation = ["dep:async-task", "dep:spin", "dep:libc"]
104 changes: 76 additions & 28 deletions crates/runtime/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
#[cfg(all(feature = "tokio", feature = "simulation"))]
Comment thread
Shubham8287 marked this conversation as resolved.
compile_error!(
"spacetimedb-runtime requires exactly one runtime backend: enable either `tokio` or `simulation`, not both"
);

#[cfg(not(any(feature = "tokio", feature = "simulation")))]
compile_error!("spacetimedb-runtime requires exactly one runtime backend: enable either `tokio` or `simulation`");

#[cfg(feature = "simulation")]
extern crate alloc;

Expand All @@ -15,8 +23,39 @@ pub mod sim;
#[cfg(feature = "simulation")]
pub mod sim_std;

#[cfg(feature = "tokio")]
pub type TokioHandle = tokio::runtime::Handle;
pub type TokioRuntime = tokio::runtime::Runtime;
pub type TokioRuntimeBuilder = tokio::runtime::Builder;

// We intentionally expose a small subset of `tokio::sync` under the simulation
// backend. Tokio's async synchronization primitives are runtime-agnostic: they
// can be polled by this executor instead of a Tokio runtime.
//
// Runtime-agnostic does not translate to deterministic by itself. For
// deterministic simulation, `Waker`s must be invoked by a task running on the
// deterministic executor. For the exports below, that means sends, receives,
// closes, drops of senders/receivers, and watch updates must be driven by
// simulated tasks.
//
// Anything outside the simulated runtime that invokes a stored `Waker`
// bypasses the deterministic executor. This includes Tokio timers,
// OS/kernel readiness routed through another runtime, and blocking threads.
//
// Tokio documents `*_timeout` methods as non-runtime-agnostic because they
// require Tokio's timer; in this subset, that includes
// `mpsc::Sender::send_timeout`.
//
// Also avoid blocking methods. The blocking methods currently reachable from
// this subset are `mpsc::Sender::blocking_send`,
// `mpsc::Receiver::blocking_recv`, `mpsc::Receiver::blocking_recv_many`,
// `mpsc::UnboundedReceiver::blocking_recv`, and
// `mpsc::UnboundedReceiver::blocking_recv_many`. These block or park the
// calling OS thread, which is outside the simulation runtime.
pub mod sync {
// TODO: Remove unbounded channels as resources should be bounded.
pub use tokio::sync::mpsc;
pub use tokio::sync::watch;
}

#[derive(Clone)]
pub enum Handle {
Expand Down Expand Up @@ -74,15 +113,22 @@ enum JoinErrorInner {
Simulation(sim::JoinError),
}

#[cfg(feature = "tokio")]
impl From<tokio::task::AbortHandle> for AbortHandle {
fn from(handle: tokio::task::AbortHandle) -> Self {
Self {
inner: AbortHandleInner::Tokio(handle),
}
}
}

impl AbortHandle {
pub fn abort(&self) {
match &self.inner {
#[cfg(feature = "tokio")]
AbortHandleInner::Tokio(handle) => handle.abort(),
#[cfg(feature = "simulation")]
AbortHandleInner::Simulation(handle) => handle.abort(),
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
_ => unreachable!("runtime abort handle has no enabled backend"),
}
}
}
Expand All @@ -100,16 +146,10 @@ impl JoinErrorInner {

impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = f;
#[cfg(any(feature = "tokio", feature = "simulation"))]
return self.inner.fmt(f);
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
unreachable!("runtime join error has no enabled backend")
self.inner.fmt(f)
}
}

#[cfg(any(feature = "tokio", feature = "simulation"))]
impl std::error::Error for JoinError {}

impl<T> JoinHandleInner<T> {
Expand Down Expand Up @@ -160,8 +200,6 @@ impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = cx;
match self.inner.poll_result(cx) {
Poll::Ready(Ok(output)) => {
self.inner = JoinHandleInner::Detached(PhantomData);
Expand Down Expand Up @@ -197,17 +235,30 @@ impl fmt::Display for RuntimeTimeout {
}
}

#[cfg(any(feature = "tokio", feature = "simulation"))]
impl std::error::Error for RuntimeTimeout {}

#[cfg(feature = "tokio")]
impl Handle {
pub fn tokio(handle: TokioHandle) -> Self {
Self::Tokio(handle)
#[cfg(feature = "tokio")]
{
Self::Tokio(handle)
}
#[cfg(not(feature = "tokio"))]
{
let _ = handle;
panic!("spacetimedb-runtime tokio handle requested without the `tokio` backend enabled")
}
}

pub fn tokio_current() -> Self {
Self::tokio(TokioHandle::current())
#[cfg(feature = "tokio")]
{
Self::tokio(TokioHandle::current())
}
#[cfg(not(feature = "tokio"))]
{
panic!("spacetimedb-runtime current tokio handle requested without the `tokio` backend enabled")
}
}
}

Expand All @@ -220,8 +271,6 @@ impl Handle {

impl Handle {
pub fn spawn<T: Send + 'static>(&self, future: impl Future<Output = T> + Send + 'static) -> JoinHandle<T> {
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = future;
match self {
#[cfg(feature = "tokio")]
Self::Tokio(handle) => JoinHandle {
Expand All @@ -231,8 +280,6 @@ impl Handle {
Self::Simulation(handle) => JoinHandle {
inner: JoinHandleInner::Simulation(handle.spawn_on(sim::NodeId::MAIN, future)),
},
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
_ => unreachable!("runtime dispatch has no enabled backend"),
}
}

Expand All @@ -241,8 +288,6 @@ impl Handle {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = &f;
match self {
#[cfg(feature = "tokio")]
Self::Tokio(_) => tokio::task::spawn_blocking(f)
Expand All @@ -261,8 +306,6 @@ impl Handle {
.spawn_on(sim::NodeId::MAIN, async move { f() })
.await
.expect("simulation spawn_blocking task should not be cancelled"),
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
_ => unreachable!("runtime dispatch has no enabled backend"),
}
}

Expand All @@ -271,17 +314,22 @@ impl Handle {
timeout_after: Duration,
future: impl Future<Output = T>,
) -> Result<T, RuntimeTimeout> {
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
let _ = (timeout_after, future);
match self {
#[cfg(feature = "tokio")]
Self::Tokio(_) => tokio::time::timeout(timeout_after, future)
.await
.map_err(|_| RuntimeTimeout),
#[cfg(feature = "simulation")]
Self::Simulation(handle) => handle.timeout(timeout_after, future).await.map_err(|_| RuntimeTimeout),
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
_ => unreachable!("runtime dispatch has no enabled backend"),
}
}

pub async fn sleep(&self, duration: Duration) {
match self {
#[cfg(feature = "tokio")]
Self::Tokio(_) => tokio::time::sleep(duration).await,
#[cfg(feature = "simulation")]
Self::Simulation(handle) => handle.sleep(duration).await,
}
}
}
Expand Down
Loading