diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 47cb20b6893..33181c68884 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -261,6 +261,9 @@ jobs: wasm-bindgen --version + - name: Check engine simulation build + run: cargo check -p spacetimedb-engine --no-default-features --features simulation + # Source emsdk environment to make emcc (Emscripten compiler) available in PATH. - name: Run tests run: | diff --git a/Cargo.lock b/Cargo.lock index efc58dc007b..c48331b3b4c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8107,6 +8107,7 @@ dependencies = [ "spacetimedb-data-structures", "spacetimedb-datastore", "spacetimedb-durability", + "spacetimedb-engine", "spacetimedb-execution", "spacetimedb-expr", "spacetimedb-fs-utils", @@ -8225,6 +8226,48 @@ dependencies = [ "tracing", ] +[[package]] +name = "spacetimedb-engine" +version = "2.6.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "enum-map", + "env_logger 0.10.2", + "fs_extra", + "futures", + "hex", + "itertools 0.12.1", + "log", + "once_cell", + "parking_lot 0.12.5", + "pretty_assertions", + "prometheus", + "serde", + "sled", + "spacetimedb-commitlog", + "spacetimedb-data-structures", + "spacetimedb-datastore", + "spacetimedb-durability", + "spacetimedb-expr", + "spacetimedb-fs-utils", + "spacetimedb-lib", + "spacetimedb-metrics", + "spacetimedb-paths", + "spacetimedb-primitives", + "spacetimedb-runtime", + "spacetimedb-sats", + "spacetimedb-schema", + "spacetimedb-snapshot", + "spacetimedb-table", + "sqlparser", + "tempfile", + "thiserror 1.0.69", + "tokio", + "tracing", +] + [[package]] name = "spacetimedb-execution" version = "2.6.0" diff --git a/Cargo.toml b/Cargo.toml index 6f62c05e53a..3f877c82330 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "crates/data-structures", "crates/datastore", "crates/durability", + "crates/engine", "crates/execution", "crates/expr", "crates/guard", @@ -133,8 +134,10 @@ spacetimedb-core = { path = "crates/core", version = "=2.6.0" } spacetimedb-data-structures = { path = "crates/data-structures", version = "=2.6.0" } spacetimedb-datastore = { path = "crates/datastore", version = "=2.6.0" } spacetimedb-durability = { path = "crates/durability", version = "=2.6.0" } +spacetimedb-engine = { path = "crates/engine", version = "=2.6.0" } spacetimedb-execution = { path = "crates/execution", version = "=2.6.0" } spacetimedb-expr = { path = "crates/expr", version = "=2.6.0" } +spacetimedb-fs-utils = { path = "crates/fs-utils", version = "=2.6.0" } spacetimedb-guard = { path = "crates/guard", version = "=2.6.0" } spacetimedb-lib = { path = "crates/lib", default-features = false, version = "=2.6.0" } spacetimedb-memory-usage = { path = "crates/memory-usage", version = "=2.6.0", default-features = false } @@ -144,16 +147,15 @@ spacetimedb-pg = { path = "crates/pg", version = "=2.6.0" } spacetimedb-physical-plan = { path = "crates/physical-plan", version = "=2.6.0" } spacetimedb-primitives = { path = "crates/primitives", version = "=2.6.0" } spacetimedb-query = { path = "crates/query", version = "=2.6.0" } +spacetimedb-query-builder = { path = "crates/query-builder", version = "=2.6.0" } +spacetimedb-runtime = { path = "crates/runtime", version = "=2.6.0" } spacetimedb-sats = { path = "crates/sats", version = "=2.6.0" } spacetimedb-schema = { path = "crates/schema", version = "=2.6.0" } +spacetimedb-snapshot = { path = "crates/snapshot", version = "=2.6.0" } spacetimedb-standalone = { path = "crates/standalone", version = "=2.6.0" } +spacetimedb-subscription = { path = "crates/subscription", version = "=2.6.0" } spacetimedb-sql-parser = { path = "crates/sql-parser", version = "=2.6.0" } spacetimedb-table = { path = "crates/table", version = "=2.6.0" } -spacetimedb-fs-utils = { path = "crates/fs-utils", version = "=2.6.0" } -spacetimedb-snapshot = { path = "crates/snapshot", version = "=2.6.0" } -spacetimedb-subscription = { path = "crates/subscription", version = "=2.6.0" } -spacetimedb-query-builder = { path = "crates/query-builder", version = "=2.6.0" } -spacetimedb-runtime = { path = "crates/runtime", version = "=2.6.0" } # Prevent `ahash` from pulling in `getrandom` by disabling default features. # Modules use `getrandom02` and we need to prevent an incompatible version diff --git a/crates/bench/benches/subscription.rs b/crates/bench/benches/subscription.rs index 49ce21489e2..f2756567d59 100644 --- a/crates/bench/benches/subscription.rs +++ b/crates/bench/benches/subscription.rs @@ -1,9 +1,9 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use spacetimedb::client::consume_each_list::ConsumeEachBuffer; use spacetimedb::db::relational_db::RelationalDB; +use spacetimedb::db::sql::ast::SchemaViewer; use spacetimedb::error::DBError; use spacetimedb::identity::AuthCtx; -use spacetimedb::sql::ast::SchemaViewer; use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use spacetimedb::subscription::tx::DeltaTx; use spacetimedb::subscription::{collect_table_update, TableUpdateType}; diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 6e7075536c2..b9c21843e59 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -22,13 +22,14 @@ spacetimedb-client-api-messages.workspace = true spacetimedb-commitlog.workspace = true spacetimedb-datastore.workspace = true spacetimedb-durability.workspace = true +spacetimedb-engine.workspace = true spacetimedb-memory-usage.workspace = true spacetimedb-metrics.workspace = true spacetimedb-primitives.workspace = true spacetimedb-paths.workspace = true spacetimedb-physical-plan.workspace = true spacetimedb-query.workspace = true -spacetimedb-runtime = { workspace = true, features = ["tokio"] } +spacetimedb-runtime.workspace = true spacetimedb-sats = { workspace = true, features = ["serde"] } spacetimedb-schema.workspace = true spacetimedb-table.workspace = true @@ -144,7 +145,7 @@ allow_loopback_http_for_tests = [] # Enable timing for wasm ABI calls spacetimedb-wasm-instance-env-times = [] # Enable test helpers and utils -test = ["spacetimedb-commitlog/test", "spacetimedb-datastore/test"] +test = ["spacetimedb-commitlog/test", "spacetimedb-datastore/test", "spacetimedb-engine/test"] # Perfmaps for profiling modules perfmap = [] # Enables core pinning. @@ -155,6 +156,7 @@ spacetimedb-lib = { path = "../lib", features = ["proptest", "test"] } spacetimedb-sats = { path = "../sats", features = ["proptest"] } spacetimedb-commitlog = { path = "../commitlog", features = ["test"] } spacetimedb-datastore = { path = "../datastore/", features = ["test"] } +spacetimedb-engine = { workspace = true, features = ["test"] } criterion.workspace = true # Also as dev-dependencies for use in _this_ crate's tests. diff --git a/crates/core/src/database_logger.rs b/crates/core/src/database_logger.rs index 0e202229dea..9a2891955f8 100644 --- a/crates/core/src/database_logger.rs +++ b/crates/core/src/database_logger.rs @@ -674,6 +674,12 @@ impl SystemLogger { } } +impl spacetimedb_engine::update::UpdateLogger for SystemLogger { + fn info(&self, msg: &str) { + self.info(msg); + } +} + #[cfg(test)] mod tests { use std::{ops::Range, sync::Arc}; diff --git a/crates/core/src/db/mod.rs b/crates/core/src/db/mod.rs index 45777fbd612..6b1d2f6700b 100644 --- a/crates/core/src/db/mod.rs +++ b/crates/core/src/db/mod.rs @@ -1,19 +1,28 @@ -use std::sync::Arc; +pub mod persistence { + pub use spacetimedb_engine::persistence::*; +} -use enum_map::EnumMap; -use spacetimedb_schema::reducer_name::ReducerName; -use tokio::sync::mpsc; -use tokio::time::MissedTickBehavior; +pub mod relational_db { + pub use spacetimedb_engine::relational_db::*; +} -use crate::subscription::ExecutionCounters; -use spacetimedb_datastore::execution_context::WorkloadType; -use spacetimedb_datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData}; +pub mod sql { + pub mod ast { + pub use spacetimedb_engine::sql::ast::*; + } -mod durability; -pub mod persistence; -pub mod relational_db; -pub mod snapshot; -pub mod update; + pub mod rls { + pub use spacetimedb_engine::sql::rls::*; + } +} + +pub mod snapshot { + pub use spacetimedb_engine::snapshot::*; +} + +pub mod update { + pub use spacetimedb_engine::update::*; +} /// Whether SpacetimeDB is run in memory, or persists objects and /// a message log to disk. @@ -35,111 +44,10 @@ pub struct Config { pub page_pool_max_size: Option, } -/// A message that is processed by the [`spawn_metrics_recorder`] actor. -/// We use a separate task to record metrics to avoid blocking transactions. -pub struct MetricsMessage { - /// The reducer the produced these metrics. - reducer: Option, - /// Metrics from a mutable transaction. - metrics_for_writer: Option, - /// Metrics from a read-only transaction. - /// A message may have metrics for both types of transactions, - /// because metrics for a reducer and its subscription updates are recorded together. - metrics_for_reader: Option, - /// The row updates for an immutable transaction. - /// Needed for insert and delete counters. - tx_data: Option>, - /// Cached metrics counters for each workload type. - counters: Arc>, -} - -/// The handle used to send work to the tx metrics recorder. -#[derive(Clone)] -pub struct MetricsRecorderQueue { - tx: mpsc::UnboundedSender, -} - -impl MetricsRecorderQueue { - pub fn send_metrics( - &self, - reducer: Option, - metrics_for_writer: Option, - metrics_for_reader: Option, - tx_data: Option>, - counters: Arc>, - ) { - if let Err(err) = self.tx.send(MetricsMessage { - reducer, - metrics_for_writer, - metrics_for_reader, - tx_data, - counters, - }) { - log::warn!("failed to send metrics: {err}"); - } - } -} - -fn record_metrics( - MetricsMessage { - reducer, - metrics_for_writer, - metrics_for_reader, - tx_data, - counters, - }: MetricsMessage, -) { - if let Some(tx_metrics) = metrics_for_writer { - tx_metrics.report( - // If row updates are present, - // they will always belong to the writer transaction. - tx_data.as_deref(), - reducer.as_ref(), - |wl| &counters[wl], - ); - } - if let Some(tx_metrics) = metrics_for_reader { - tx_metrics.report( - // If row updates are present, - // they will never belong to the reader transaction. - // Passing row updates here will most likely panic. - None, - reducer.as_ref(), - |wl| &counters[wl], - ); - } -} - -/// The metrics recorder is a side channel that the main database thread forwards metrics to. -/// While we want to avoid unnecessary compute on the critical path, communicating with other -/// threads is not free, and for this case in particular waking a parked task is not free. -/// -/// Previously, each tx would send its metrics to the recorder task. As soon as the recorder -/// task `recv`d a message, it would update the counters and gauges, and immediately wait for -/// the next tx's message. This meant that the tx would need to be more expensive than the -/// recording of its metrics in order for the recorder task not to be parked on `recv` when -/// the tx would `send` its metrics. This would obviously never be the case, and so each `send` -/// would incur the overhead of waking the task. -/// -/// To mitigate this, we now record metrics, for potentially many transactions, periodically -/// every 5ms. -const TX_METRICS_RECORDING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5); - -/// Spawns a task for recording transaction metrics. -/// Returns the handle for pushing metrics to the recorder. -pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) { - let (tx, mut rx) = mpsc::unbounded_channel(); - let abort_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(TX_METRICS_RECORDING_INTERVAL); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); +pub type MetricsRecorderQueue = spacetimedb_engine::MetricsRecorderQueue; - loop { - interval.tick().await; - while let Ok(metrics) = rx.try_recv() { - record_metrics(metrics); - } - } - }) - .abort_handle(); - (MetricsRecorderQueue { tx }, abort_handle) +pub fn spawn_tx_metrics_recorder( + handle: &spacetimedb_runtime::Handle, +) -> (MetricsRecorderQueue, spacetimedb_runtime::AbortHandle) { + spacetimedb_engine::spawn_tx_metrics_recorder(handle) } diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 4ba103979ec..efee537dbbe 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -1,240 +1,33 @@ -use std::io; -use std::num::ParseIntError; -use std::path::PathBuf; -use std::sync::{MutexGuard, PoisonError}; +pub use spacetimedb_engine::error::*; -use hex::FromHexError; -use spacetimedb_commitlog::repo::TxOffset; -use spacetimedb_durability::DurabilityExited; -use spacetimedb_expr::errors::TypingError; -use spacetimedb_fs_utils::lockfile::advisory::LockError; -use spacetimedb_lib::Identity; -use spacetimedb_schema::error::ValidationErrors; -use spacetimedb_schema::table_name::TableName; -use spacetimedb_snapshot::SnapshotError; -use spacetimedb_table::table::ReadViaBsatnError; -use thiserror::Error; - -use crate::client::ClientActorId; use crate::host::module_host::ViewCallError; use crate::host::scheduler::ScheduleError; use crate::host::AbiCall; use spacetimedb_lib::buffer::DecodeError; -use spacetimedb_primitives::*; -use spacetimedb_sats::hash::Hash; -use spacetimedb_sats::product_value::InvalidFieldError; -use spacetimedb_schema::def::error::{LibError, RelationError, SchemaErrors}; -use spacetimedb_schema::relation::FieldName; - -pub use spacetimedb_datastore::error::{DatastoreError, IndexError, SequenceError, TableError}; - -#[derive(Error, Debug)] -pub enum ClientError { - #[error("Client not found: {0}")] - NotFound(ClientActorId), -} - -#[derive(Error, Debug, PartialEq, Eq)] -pub enum SubscriptionError { - #[error("Index not found: {0:?}")] - NotFound(IndexId), - #[error("Empty string")] - Empty, - #[error("Unsupported query on subscription: {0:?}")] - Unsupported(String), - #[error("Subscribing to queries in one call is not supported")] - Multiple, -} - -#[derive(Error, Debug)] -pub enum PlanError { - #[error("Unsupported feature: `{feature}`")] - Unsupported { feature: String }, - #[error("Unknown table: `{table}`")] - UnknownTable { table: Box }, - #[error("Qualified Table `{expect}` not found")] - TableNotFoundQualified { expect: String }, - #[error("Unknown field: `{field}` not found in the table(s): `{tables:?}`")] - UnknownField { field: String, tables: Vec }, - #[error("Unknown field name: `{field}` not found in the table(s): `{tables:?}`")] - UnknownFieldName { field: FieldName, tables: Vec }, - #[error("Field(s): `{fields:?}` not found in the table(s): `{tables:?}`")] - UnknownFields { - fields: Vec, - tables: Vec, - }, - #[error("Ambiguous field: `{field}`. Also found in {found:?}")] - AmbiguousField { field: String, found: Vec }, - #[error("Plan error: `{0}`")] - Unstructured(String), - #[error("Internal DBError: `{0}`")] - DatabaseInternal(Box), - #[error("Relation Error: `{0}`")] - Relation(#[from] RelationError), -} - -#[derive(Error, Debug)] -pub enum DatabaseError { - #[error("Replica not found: {0}")] - NotFound(u64), - #[error("Database is already opened. Path: `{0}`. Error: {1}")] - DatabasedOpened(PathBuf, anyhow::Error), -} - -impl From for DatabaseError { - fn from(LockError { path, source, .. }: LockError) -> Self { - Self::DatabasedOpened(path, source.into()) - } -} - -#[derive(Error, Debug)] -pub enum DBError { - #[error("LibError: {0}")] - Lib(#[from] LibError), - #[error("BufferError: {0}")] - Buffer(#[from] DecodeError), - #[error("DatastoreError: {0}")] - Datastore(#[from] DatastoreError), - #[error("SequenceError: {0}")] - Sequence2(#[from] SequenceError), - #[error("SchemaError: {0}")] - Schema(SchemaErrors), - #[error("IOError: {0}.")] - IoError(#[from] std::io::Error), - #[error("ParseIntError: {0}.")] - ParseInt(#[from] ParseIntError), - #[error("Hex representation of hash decoded to incorrect number of bytes: {0}.")] - DecodeHexHash(usize), - #[error("DecodeHexError: {0}.")] - DecodeHex(#[from] FromHexError), - #[error("DatabaseError: {0}.")] - Database(#[from] DatabaseError), - #[error("SledError: {0}.")] - SledDbError(#[from] sled::Error), - #[error("Mutex was poisoned acquiring lock on MessageLog: {0}")] - MessageLogPoisoned(String), - #[error("SubscriptionError: {0}")] - Subscription(#[from] SubscriptionError), - #[error("ClientError: {0}")] - Client(#[from] ClientError), - #[error("SqlParserError: {error}, executing: `{sql}`")] - SqlParser { - sql: String, - error: sqlparser::parser::ParserError, - }, - #[error("SqlError: {error}, executing: `{sql}`")] - Plan { sql: String, error: PlanError }, - #[error("Error replaying the commit log: {0}")] - LogReplay(#[from] LogReplayError), - #[error(transparent)] - // Box the inner [`SnapshotError`] to keep Clippy quiet about large `Err` variants. - Snapshot(#[from] Box), - #[error("Error reading a value from a table through BSATN: {0}")] - ReadViaBsatnError(#[from] ReadViaBsatnError), - #[error("Module validation errors: {0}")] - ModuleValidationErrors(#[from] ValidationErrors), - #[error(transparent)] - Other(#[from] anyhow::Error), - #[error(transparent)] - TypeError(#[from] TypingError), - #[error("{error}, executing: `{sql}`")] - WithSql { - #[source] - error: Box, - sql: Box, - }, - #[error(transparent)] - RestoreSnapshot(#[from] RestoreSnapshotError), - #[error(transparent)] - DurabilityGone(#[from] DurabilityExited), - #[error(transparent)] - View(#[from] ViewCallError), -} - -impl From for DBError { - fn from(value: InvalidFieldError) -> Self { - LibError::from(value).into() - } -} - -impl From for DBError { - fn from(err: spacetimedb_table::read_column::TypeError) -> Self { - DatastoreError::Table(TableError::from(err)).into() - } -} - -impl From for PlanError { - fn from(err: DBError) -> Self { - PlanError::DatabaseInternal(Box::new(err)) - } -} - -impl<'a, T: ?Sized + 'a> From>> for DBError { - fn from(err: PoisonError>) -> Self { - DBError::MessageLogPoisoned(err.to_string()) - } -} - -impl From for DBError { - fn from(e: spacetimedb_durability::local::OpenError) -> Self { - use spacetimedb_durability::local::OpenError::*; +use spacetimedb_schema::def::error::RelationError; +use spacetimedb_schema::table_name::TableName; +use thiserror::Error; - match e { - Lock(e) => Self::from(DatabaseError::from(e)), - Commitlog(e) => Self::Other(e.into()), +impl From for DBError { + fn from(err: ViewCallError) -> Self { + match err { + ViewCallError::Args(err) => spacetimedb_engine::error::ViewError::Args(err.to_string()).into(), + ViewCallError::NoSuchModule(err) => { + spacetimedb_engine::error::ViewError::NoSuchModule(err.to_string()).into() + } + ViewCallError::NoSuchView => spacetimedb_engine::error::ViewError::NoSuchView.into(), + ViewCallError::TableDoesNotExist(view_id) => { + spacetimedb_engine::error::ViewError::TableDoesNotExist(view_id).into() + } + ViewCallError::MissingClientConnection => { + spacetimedb_engine::error::ViewError::MissingClientConnection.into() + } + ViewCallError::DatastoreError(err) => spacetimedb_engine::error::ViewError::DatastoreError(err).into(), + ViewCallError::InternalError(err) => spacetimedb_engine::error::ViewError::InternalError(err).into(), } } } -#[derive(Debug, Error)] -pub enum LogReplayError { - #[error( - "Out-of-order commit detected: {} in segment {} after offset {}", - .commit_offset, - .segment_offset, - .last_commit_offset - )] - OutOfOrderCommit { - commit_offset: u64, - segment_offset: usize, - last_commit_offset: u64, - }, - #[error( - "Error reading segment {}/{} at commit {}: {}", - .segment_offset, - .total_segments, - .commit_offset, - .source - )] - TrailingSegments { - segment_offset: usize, - total_segments: usize, - commit_offset: u64, - #[source] - source: io::Error, - }, - #[error("Could not reset log to offset {}: {}", .offset, .source)] - Reset { - offset: u64, - #[source] - source: io::Error, - }, - #[error("Missing object {} referenced from commit {}", .hash, .commit_offset)] - MissingObject { hash: Hash, commit_offset: u64 }, - #[error( - "Unexpected I/O error reading commit {} from segment {}: {}", - .commit_offset, - .segment_offset, - .source - )] - Io { - segment_offset: usize, - commit_offset: u64, - #[source] - source: io::Error, - }, -} - #[derive(Error, Debug)] pub enum NodesError { #[error("Failed to decode row: {0}")] @@ -294,23 +87,3 @@ impl From for NodesError { } } } - -#[derive(Debug, Error)] -pub enum RestoreSnapshotError { - #[error("Snapshot has incorrect database_identity: expected {expected} but found {actual}")] - IdentityMismatch { expected: Identity, actual: Identity }, - #[error("Failed to restore datastore from snapshot")] - Datastore(#[source] Box), - #[error("Failed to read snapshot")] - Snapshot(#[from] Box), - #[error("Failed to bootstrap datastore without snapshot")] - Bootstrap(#[source] Box), - #[error("No connected snapshot found, commitlog starts at {min_commitlog_offset}")] - NoConnectedSnapshot { min_commitlog_offset: TxOffset }, - #[error("Failed to invalidate snapshots at or newer than {offset}")] - Invalidate { - offset: TxOffset, - #[source] - source: Box, - }, -} diff --git a/crates/core/src/estimation.rs b/crates/core/src/estimation.rs index c7d3a29d148..77948ad09a3 100644 --- a/crates/core/src/estimation.rs +++ b/crates/core/src/estimation.rs @@ -133,8 +133,8 @@ mod tests { use super::{estimate_rows_scanned, row_estimate}; use crate::db::relational_db::tests_utils::{begin_tx, insert, with_auto_commit}; use crate::db::relational_db::{tests_utils::TestDB, RelationalDB}; + use crate::db::sql::ast::SchemaViewer; use crate::error::DBError; - use crate::sql::ast::SchemaViewer; use spacetimedb_lib::{identity::AuthCtx, AlgebraicType}; use spacetimedb_query::compile_subscription; use spacetimedb_sats::product; diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 9cfb6077045..425cbd7de07 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -38,6 +38,7 @@ use spacetimedb_datastore::traits::Program; use spacetimedb_durability::{self as durability}; use spacetimedb_lib::{AlgebraicValue, Identity, Timestamp}; use spacetimedb_paths::server::{ModuleLogsDir, ServerDataDir}; +use spacetimedb_runtime::AbortHandle; use spacetimedb_sats::hash::Hash; use spacetimedb_schema::auto_migrate::{ponder_migrate, AutoMigrateError, MigrationPolicy, PrettyPrintStyle}; use spacetimedb_schema::def::{ModuleDef, RawModuleDefVersion}; @@ -47,7 +48,6 @@ use std::ops::Deref; use std::sync::Arc; use std::time::Duration; use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock}; -use tokio::task::AbortHandle; use tokio::time::error::Elapsed; use tokio::time::{interval_at, timeout, Instant}; @@ -889,7 +889,8 @@ impl Host { .. } = host_controller; let replica_dir = data_dir.replica(replica_id); - let (tx_metrics_queue, tx_metrics_recorder_task) = spawn_tx_metrics_recorder(); + let runtime = spacetimedb_runtime::Handle::tokio_current(); + let (tx_metrics_queue, tx_metrics_recorder_task) = spawn_tx_metrics_recorder(&runtime); let (db, connected_clients) = match config.storage { db::Storage::Memory => RelationalDB::open( @@ -902,8 +903,13 @@ impl Host { )?, db::Storage::Disk => { // Replay from the local state. - let history = relational_db::local_history(&replica_dir).await?; - let persistence = persistence.persistence(&database, replica_id).await?; + let history = relational_db::local_history(&replica_dir, &runtime).await?; + let persistence_db = db::persistence::Database { + id: database.id, + database_identity: database.database_identity, + owner_identity: database.owner_identity, + }; + let persistence = persistence.persistence(&persistence_db, replica_id).await?; // Loading a database from persistent storage involves heavy // blocking I/O. `asyncify` to avoid blocking the async worker. let (db, clients) = asyncify({ @@ -1084,8 +1090,9 @@ impl Host { module_host.clear_all_clients().await?; scheduler_starter.start(&module_host)?; - let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle(); - let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone()); + let disk_metrics_recorder_task: spacetimedb_runtime::AbortHandle = + tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle().into(); + let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone(), &runtime); let module = watch::Sender::new(module_host); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index bda490dab74..331aff0ca06 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -6,6 +6,7 @@ use crate::client::messages::{OneOffQueryResponseMessage, ProcedureResultMessage use crate::client::{ClientActorId, ClientConnectionSender, WsVersion}; use crate::database_logger::{DatabaseLogger, LogLevel, Record}; use crate::db::relational_db::{RelationalDB, Tx}; +use crate::db::sql::ast::SchemaViewer; use crate::error::DBError; use crate::estimation::{check_row_limit, estimate_rows_scanned}; use crate::hash::Hash; @@ -18,9 +19,7 @@ use crate::host::{InvalidFunctionArguments, InvalidViewArguments}; use crate::identity::Identity; use crate::messages::control_db::{Database, HostType}; use crate::replica_context::ReplicaContext; -use crate::sql::ast::SchemaViewer; use crate::sql::execute::SqlResult; -use crate::sql::parser::RowLevelExpr; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::BroadcastError; pub use crate::subscription::module_subscription_manager::TransactionOffset; @@ -51,6 +50,7 @@ use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; pub use spacetimedb_durability::{DurabilityExited, DurableOffset}; +use spacetimedb_engine::sql::rls::RowLevelExpr; use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_execution::ExecutionParams; use spacetimedb_execution::RelValue; @@ -65,10 +65,9 @@ use spacetimedb_query::compile_subscription; use spacetimedb_sats::raw_identifier::RawIdentifier; use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue}; use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy}; -use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef}; +use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, ViewDef}; use spacetimedb_schema::identifier::Identifier; use spacetimedb_schema::reducer_name::ReducerName; -use spacetimedb_schema::schema::{Schema, TableSchema}; use spacetimedb_schema::table_name::TableName; use std::collections::VecDeque; use std::fmt; @@ -550,19 +549,6 @@ impl GenericModuleInstance for super::v8::JsProcedureInstance { } } -/// Creates the table for `table_def` in `stdb`. -pub fn create_table_from_def( - stdb: &RelationalDB, - tx: &mut MutTxId, - module_def: &ModuleDef, - table_def: &TableDef, -) -> anyhow::Result<()> { - let schema = TableSchema::from_module_def(module_def, table_def, (), TableId::SENTINEL); - stdb.create_table(tx, schema) - .with_context(|| format!("failed to create table {}", &table_def.name))?; - Ok(()) -} - /// Creates the table for `view_def` in `stdb`. pub fn create_table_from_view_def( stdb: &RelationalDB, @@ -615,7 +601,7 @@ fn init_database_inner( table_defs.sort_by_key(|x| &x.name); for def in table_defs { logger.info(&format!("Creating table `{}`", &def.name)); - create_table_from_def(stdb, tx, module_def, def)?; + spacetimedb_engine::update::create_table_from_def(stdb, tx, module_def, def)?; } // Create all in-memory views defined by the module. diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index e502e0e1efc..a800a3654b5 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -2,6 +2,7 @@ use super::instrumentation::CallTimes; use super::*; use crate::client::ClientActorId; use crate::database_logger; +use crate::db::sql::ast::SchemaViewer; use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint}; use crate::error::DBError; use crate::host::host_controller::CallProcedureReturn; @@ -22,7 +23,6 @@ use crate::identity::Identity; use crate::messages::control_db::HostType; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; -use crate::sql::ast::SchemaViewer; use crate::sql::execute::run_with_instance; use crate::subscription::module_subscription_actor::{commit_and_broadcast_event, CommitAndBroadcastEventSuccess}; use crate::subscription::module_subscription_manager::TransactionOffset; diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 26b35230b1f..d630faab09a 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -5,6 +5,7 @@ pub mod sql; pub mod auth; pub mod db; +pub use spacetimedb_engine::metrics; pub mod messages; pub use spacetimedb_lib::Identity; pub mod error; diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 6ff2cd05566..7fe2fe229f8 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -1,14 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use super::ast::SchemaViewer; -use crate::db::relational_db::RelationalDB; +use crate::db::sql::ast::SchemaViewer; use crate::energy::FunctionBudget; use crate::error::DBError; use crate::estimation::{check_row_limit, estimate_rows_scanned}; use crate::host::module_host::{ - DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, RefInstance, ViewCallError, ViewCallResult, - ViewOutcome, WasmInstance, + DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, RefInstance, ViewCallResult, ViewOutcome, + WasmInstance, }; use crate::host::{ArgsTuple, ModuleHost}; use crate::subscription::module_subscription_actor::{commit_and_broadcast_event, ModuleSubscriptions}; @@ -17,6 +16,7 @@ use crate::subscription::tx::DeltaTx; use anyhow::anyhow; use spacetimedb_datastore::execution_context::Workload; use spacetimedb_datastore::traits::IsolationLevel; +use spacetimedb_engine::relational_db::RelationalDB; use spacetimedb_expr::statement::Statement; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; @@ -163,7 +163,7 @@ fn run_inner( if let ViewOutcome::Failed(err) = result.outcome { let (_, metrics, reducer) = db.rollback_mut_tx(result.tx); db.report_mut_tx_metrics(reducer, metrics, None); - return Err(DBError::View(ViewCallError::InternalError(err))); + return Err(DBError::View(spacetimedb_engine::error::ViewError::InternalError(err))); } let tx = result.tx; diff --git a/crates/core/src/sql/mod.rs b/crates/core/src/sql/mod.rs index 741105f0e75..2e8bdddf980 100644 --- a/crates/core/src/sql/mod.rs +++ b/crates/core/src/sql/mod.rs @@ -1,3 +1 @@ -pub mod ast; pub mod execute; -pub mod parser; diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index bd296b6f9f9..67881a456a5 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -1,17 +1,14 @@ +use crate::error::DBError; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _, RowListBuilderSource}; -use crate::{error::DBError, worker_metrics::WORKER_METRICS}; use anyhow::Result; use metrics::QueryMetrics; use module_subscription_manager::Plan; -use prometheus::IntCounter; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use spacetimedb_client_api_messages::websocket::common::ByteListLen as _; use spacetimedb_client_api_messages::websocket::v1::{self as ws_v1}; -use spacetimedb_datastore::{ - db_metrics::DB_METRICS, execution_context::WorkloadType, locking_tx_datastore::datastore::MetricsRecorder, -}; +pub use spacetimedb_engine::metrics::ExecutionCounters; use spacetimedb_execution::{pipelined::PipelinedProject, Datastore, DeltaStore, Row}; -use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; +use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_physical_plan::plan::ParamResolver; use spacetimedb_primitives::{ColList, TableId}; use spacetimedb_sats::bsatn::ToBsatn; @@ -31,72 +28,6 @@ pub mod subscription; pub mod tx; pub mod websocket_building; -#[derive(Debug)] -pub struct ExecutionCounters { - rdb_num_index_seeks: IntCounter, - rdb_num_rows_scanned: IntCounter, - rdb_num_bytes_scanned: IntCounter, - rdb_num_bytes_written: IntCounter, - bytes_sent_to_clients: IntCounter, - delta_queries_matched: IntCounter, - delta_queries_evaluated: IntCounter, - duplicate_rows_evaluated: IntCounter, - duplicate_rows_sent: IntCounter, -} - -impl ExecutionCounters { - pub fn new(workload: &WorkloadType, db: &Identity) -> Self { - Self { - rdb_num_index_seeks: DB_METRICS.rdb_num_index_seeks.with_label_values(workload, db), - rdb_num_rows_scanned: DB_METRICS.rdb_num_rows_scanned.with_label_values(workload, db), - rdb_num_bytes_scanned: DB_METRICS.rdb_num_bytes_scanned.with_label_values(workload, db), - rdb_num_bytes_written: DB_METRICS.rdb_num_bytes_written.with_label_values(workload, db), - bytes_sent_to_clients: WORKER_METRICS.bytes_sent_to_clients.with_label_values(workload, db), - delta_queries_matched: DB_METRICS.delta_queries_matched.with_label_values(db), - delta_queries_evaluated: DB_METRICS.delta_queries_evaluated.with_label_values(db), - duplicate_rows_evaluated: DB_METRICS.duplicate_rows_evaluated.with_label_values(db), - duplicate_rows_sent: DB_METRICS.duplicate_rows_sent.with_label_values(db), - } - } - - /// Update the global system metrics with transaction-level execution metrics. - pub(crate) fn record(&self, metrics: &ExecutionMetrics) { - if metrics.index_seeks > 0 { - self.rdb_num_index_seeks.inc_by(metrics.index_seeks as u64); - } - if metrics.rows_scanned > 0 { - self.rdb_num_rows_scanned.inc_by(metrics.rows_scanned as u64); - } - if metrics.bytes_scanned > 0 { - self.rdb_num_bytes_scanned.inc_by(metrics.bytes_scanned as u64); - } - if metrics.bytes_written > 0 { - self.rdb_num_bytes_written.inc_by(metrics.bytes_written as u64); - } - if metrics.bytes_sent_to_clients > 0 { - self.bytes_sent_to_clients.inc_by(metrics.bytes_sent_to_clients as u64); - } - if metrics.delta_queries_matched > 0 { - self.delta_queries_matched.inc_by(metrics.delta_queries_matched); - } - if metrics.delta_queries_evaluated > 0 { - self.delta_queries_evaluated.inc_by(metrics.delta_queries_evaluated); - } - if metrics.duplicate_rows_evaluated > 0 { - self.duplicate_rows_evaluated.inc_by(metrics.duplicate_rows_evaluated); - } - if metrics.duplicate_rows_sent > 0 { - self.duplicate_rows_sent.inc_by(metrics.duplicate_rows_sent); - } - } -} - -impl MetricsRecorder for ExecutionCounters { - fn record(&self, metrics: &ExecutionMetrics) { - self.record(metrics); - } -} - /// Execute subscription query fragments over a view. pub fn execute_plan_for_view<'p, F>( plan_fragments: impl IntoIterator, diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 3159b7e7b1e..0fe707a5582 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -2199,8 +2199,8 @@ mod tests { use super::{Plan, SubscriptionManager}; use crate::db::relational_db::tests_utils::with_read_only; + use crate::db::sql::ast::SchemaViewer; use crate::host::module_host::DatabaseTableUpdate; - use crate::sql::ast::SchemaViewer; use crate::subscription::module_subscription_manager::ClientQueryId; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use crate::subscription::tx::DeltaTx; diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 012088bd313..bed0487081b 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -1,8 +1,8 @@ use super::execution_unit::QueryHash; use super::module_subscription_manager::Plan; use crate::db::relational_db::Tx; +use crate::db::sql::ast::SchemaViewer; use crate::error::{DBError, SubscriptionError}; -use crate::sql::ast::SchemaViewer; use once_cell::sync::Lazy; use regex::Regex; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index b4198276dd5..75bf9b75405 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -1,8 +1,8 @@ use super::execution_unit::QueryHash; use super::query::CompiledQuery; use crate::db::relational_db::RelationalDB; +use crate::db::sql::ast::SchemaViewer; use crate::error::DBError; -use crate::sql::ast::SchemaViewer; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; use spacetimedb_lib::db::auth::StTableType; use spacetimedb_lib::identity::AuthCtx; diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 690afdd8c94..c8919b8f10b 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -355,11 +355,6 @@ metrics_group!( #[labels(db: Identity, reducer: str)] pub reducer_plus_query_duration: HistogramVec, - #[name = spacetime_num_bytes_sent_to_clients_total] - #[help = "The cumulative number of bytes sent to clients"] - #[labels(txn_type: WorkloadType, db: Identity)] - pub bytes_sent_to_clients: IntCounterVec, - #[name = spacetime_subscription_send_queue_length] #[help = "The number of `ComputedQueries` waiting in the queue to be aggregated and broadcast by the `send_worker`"] #[labels(database_identity: Identity)] @@ -375,34 +370,6 @@ metrics_group!( #[labels(db: Identity)] pub total_outgoing_queue_length: IntGaugeVec, - #[name = spacetime_replay_total_time_seconds] - #[help = "Total time spent replaying a database upon restart, including snapshot read, snapshot restore and commitlog replay"] - #[labels(db: Identity)] - // We expect a small number of observations per label - // (exactly one, for non-replicated databases, and one per leader change for replicated databases) - // so we'll just store a `Gauge` with the most recent observation for each database. - pub replay_total_time_seconds: GaugeVec, - - #[name = spacetime_replay_snapshot_read_time_seconds] - #[help = "Time spent reading a snapshot from disk before restoring the snapshot upon restart"] - #[labels(db: Identity)] - pub replay_snapshot_read_time_seconds: GaugeVec, - - #[name = spacetime_replay_snapshot_restore_time_seconds] - #[help = "Time spent restoring a database from a snapshot after reading the snapshot and before commitlog replay upon restart"] - #[labels(db: Identity)] - pub replay_snapshot_restore_time_seconds: GaugeVec, - - #[name = spacetime_replay_commitlog_time_seconds] - #[help = "Time spent replaying the commitlog after restoring from a snapshot upon restart"] - #[labels(db: Identity)] - pub replay_commitlog_time_seconds: GaugeVec, - - #[name = spacetime_replay_commitlog_num_commits] - #[help = "Number of commits replayed after restoring from a snapshot upon restart"] - #[labels(db: Identity)] - pub replay_commitlog_num_commits: IntGaugeVec, - #[name = spacetime_module_create_instance_time_seconds] #[help = "Time taken to construct a WASM instance or V8 isolate to run module code"] #[labels(db: Identity, module_type: HostType)] @@ -414,75 +381,6 @@ metrics_group!( #[buckets(0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10, 50, 100)] pub module_create_instance_time_seconds: HistogramVec, - #[name = spacetime_snapshot_creation_time_total_sec] - #[help = "The time (in seconds) it took to take and store a database snapshot, including scheduling overhead"] - #[labels(db: Identity)] - // Snapshot creation should take in the order of milliseconds, - // but log data suggests that there are outliers. - // So let's track a wide range of buckets to get a better picture. - // - // We also track the timing without `asyncify` scheduling overhead - // (`snapshot_creation_time_inner`), and the snapshot compression - // timing with / without scheduling overhead (`snapshot_compression_time_total` - // and `snapshot_compression_time_inner`, respectively). - // - // Compression may have contributed to observed outliers, but is no - // longer included in the snapshot creation timing. - #[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)] - pub snapshot_creation_time_total: HistogramVec, - - #[name = spacetime_snapshot_creation_time_inner_sec] - #[help = "The time (in seconds) it took to take and store a database snapshot, excluding scheduling overhead"] - #[labels(db: Identity)] - #[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)] - pub snapshot_creation_time_inner: HistogramVec, - - #[name = spacetime_snapshot_creation_time_fsync_sec] - #[help = "The time (in seconds) it took to fsync a database snapshot, excluding scheduling overhead"] - #[labels(db: Identity)] - #[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)] - pub snapshot_creation_time_fsync: HistogramVec, - - #[name = spacetime_snapshot_compression_time_total_sec] - #[help = "The time (in seconds) it took to do a compression pass on the snapshot repository, including scheduling overhead"] - #[labels(db: Identity)] - // Not sure what range to expect, but certainly slower than snapshot - // creation. - #[buckets(0.001, 0.01, 0.1, 1.0, 5.0, 10.0)] - pub snapshot_compression_time_total: HistogramVec, - - #[name = spacetime_snapshot_compression_time_inner_sec] - #[help = "The time (in seconds) it took to do a compression pass on the snapshot repository, excluding scheduling overhead"] - #[labels(db: Identity)] - #[buckets(0.001, 0.01, 0.1, 1.0, 5.0, 10.0)] - pub snapshot_compression_time_inner: HistogramVec, - - #[name = spacetime_snapshot_compression_time_per_snapshot_sec] - #[help = "The time (in seconds) it took to compress a single snapshot"] - #[labels(db: Identity)] - #[buckets(0.001, 0.01, 0.1, 1.0, 5.0, 10.0)] - pub snapshot_compression_time_single: HistogramVec, - - #[name = spacetime_snapshot_compression_skipped] - #[help = "The number of snapshots skipped in a single compression pass because they were already compressed"] - #[labels(db: Identity)] - pub snapshot_compression_skipped: IntGaugeVec, - - #[name = spacetime_snapshot_compression_compressed] - #[help = "The number of snapshots compressed in a single compression pass"] - #[labels(db: Identity)] - pub snapshot_compression_compressed: IntGaugeVec, - - #[name = spacetime_snapshot_compression_objects_compressed] - #[help = "The number of snapshot objects compressed in a single compression pass"] - #[labels(db: Identity)] - pub snapshot_compression_objects_compressed: IntGaugeVec, - - #[name = spacetime_snapshot_compression_objects_hardlinked] - #[help = "The number of snapshot objects hardlinked in a single compression pass"] - #[labels(db: Identity)] - pub snapshot_compression_objects_hardlinked: IntGaugeVec, - #[name = spacetime_subscription_rows_examined] #[help = "Distribution of rows examined per subscription query"] #[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)] @@ -499,12 +397,6 @@ metrics_group!( #[help = "Total number of subscription queries by scan strategy"] #[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)] pub subscription_queries_total: IntCounterVec, - - #[name = spacetime_durability_blocking_send_duration_sec] - #[help = "Latency of blocking sends in request_durability (seconds); _count gives the number of times the channel was full"] - #[labels(database_identity: Identity)] - #[buckets(0.001, 0.01, 0.1, 1.0, 10.0)] - pub durability_blocking_send_duration: HistogramVec, } ); diff --git a/crates/datastore/Cargo.toml b/crates/datastore/Cargo.toml index 86cb2aca0b6..ad31b5cd931 100644 --- a/crates/datastore/Cargo.toml +++ b/crates/datastore/Cargo.toml @@ -10,14 +10,14 @@ rust-version.workspace = true spacetimedb-data-structures.workspace = true spacetimedb-lib = { workspace = true, features = ["serde", "metrics_impls"] } spacetimedb-commitlog.workspace = true -spacetimedb-durability.workspace = true +spacetimedb-durability = { path = "../durability", default-features = false } spacetimedb-metrics.workspace = true spacetimedb-primitives.workspace = true spacetimedb-paths.workspace = true spacetimedb-sats = { workspace = true, features = ["serde"] } spacetimedb-schema.workspace = true spacetimedb-table.workspace = true -spacetimedb-snapshot.workspace = true +spacetimedb-snapshot = { path = "../snapshot", default-features = false } spacetimedb-execution.workspace = true anyhow = { workspace = true, features = ["backtrace"] } @@ -39,7 +39,9 @@ thin-vec.workspace = true [features] # Print a warning when doing an unindexed `iter_by_col_range` on a large table. unindexed_iter_by_col_range_warn = [] -default = ["unindexed_iter_by_col_range_warn"] +default = ["unindexed_iter_by_col_range_warn", "tokio"] +tokio = ["spacetimedb-durability/tokio", "spacetimedb-snapshot/tokio"] +simulation = ["spacetimedb-durability/simulation", "spacetimedb-snapshot/simulation"] # Enable test helpers and utils test = ["spacetimedb-commitlog/test", "spacetimedb-schema/test"] diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index 4eaa3870001..198dd4461fc 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -8,6 +8,9 @@ license-file = "LICENSE" description = "Traits and single-node implementation of durability for SpacetimeDB." [features] +default = ["tokio"] +tokio = ["spacetimedb-runtime/tokio"] +simulation = ["spacetimedb-runtime/simulation"] test = [] fallocate = ["spacetimedb-commitlog/fallocate"] @@ -21,7 +24,7 @@ scopeguard.workspace = true spacetimedb-commitlog.workspace = true spacetimedb-fs-utils.workspace = true spacetimedb-paths.workspace = true -spacetimedb-runtime = { workspace = true, features = ["tokio"] } +spacetimedb-runtime = { path = "../runtime", default-features = false } spacetimedb-sats.workspace = true thiserror.workspace = true tokio.workspace = true diff --git a/crates/engine/Cargo.toml b/crates/engine/Cargo.toml new file mode 100644 index 00000000000..7eafb2914a6 --- /dev/null +++ b/crates/engine/Cargo.toml @@ -0,0 +1,59 @@ +[package] +name = "spacetimedb-engine" +version.workspace = true +edition.workspace = true +license-file = "LICENSE" +description = "Database engine and local persistence runtime for SpacetimeDB" +rust-version.workspace = true + +[lints] +workspace = true + +[features] +default = ["tokio"] +tokio = ["spacetimedb-runtime/tokio", "spacetimedb-datastore/tokio", "spacetimedb-durability/tokio", "spacetimedb-snapshot/tokio"] +simulation = ["spacetimedb-runtime/simulation", "spacetimedb-datastore/simulation", "spacetimedb-durability/simulation", "spacetimedb-snapshot/simulation"] +test = ["spacetimedb-commitlog/test", "spacetimedb-datastore/test"] + +[dependencies] +anyhow = { workspace = true, features = ["backtrace"] } +async-trait.workspace = true +enum-map.workspace = true +futures.workspace = true +hex.workspace = true +itertools.workspace = true +log.workspace = true +once_cell.workspace = true +parking_lot.workspace = true +prometheus.workspace = true +serde.workspace = true +sled.workspace = true +spacetimedb-commitlog.workspace = true +spacetimedb-data-structures.workspace = true +spacetimedb-datastore = { path = "../datastore", default-features = false } +spacetimedb-durability = { path = "../durability", default-features = false } +spacetimedb-expr.workspace = true +spacetimedb-fs-utils.workspace = true +spacetimedb-lib = { workspace = true, features = ["serde", "metrics_impls"] } +spacetimedb-metrics.workspace = true +spacetimedb-paths.workspace = true +spacetimedb-primitives.workspace = true +spacetimedb-runtime = { path = "../runtime", default-features = false } +spacetimedb-sats = { workspace = true, features = ["serde"] } +spacetimedb-schema.workspace = true +spacetimedb-snapshot = { path = "../snapshot", default-features = false } +spacetimedb-table.workspace = true +sqlparser.workspace = true +tempfile.workspace = true +thiserror.workspace = true +tracing.workspace = true + +[dev-dependencies] +bytes.workspace = true +env_logger.workspace = true +fs_extra.workspace = true +pretty_assertions.workspace = true +spacetimedb-commitlog = { workspace = true, features = ["test"] } +spacetimedb-datastore = { path = "../datastore", default-features = false, features = ["test"] } +spacetimedb-schema = { workspace = true, features = ["test"] } +tokio.workspace = true diff --git a/crates/engine/LICENSE b/crates/engine/LICENSE new file mode 120000 index 00000000000..8540cf8a991 --- /dev/null +++ b/crates/engine/LICENSE @@ -0,0 +1 @@ +../../licenses/BSL.txt \ No newline at end of file diff --git a/crates/core/src/db/durability.rs b/crates/engine/src/durability.rs similarity index 98% rename from crates/core/src/db/durability.rs rename to crates/engine/src/durability.rs index f749f72850a..9c938a6401b 100644 --- a/crates/core/src/db/durability.rs +++ b/crates/engine/src/durability.rs @@ -10,7 +10,7 @@ use spacetimedb_durability::Transaction; use spacetimedb_lib::Identity; use spacetimedb_sats::ProductValue; -use crate::db::persistence::Durability; +use crate::persistence::Durability; use spacetimedb_runtime::Handle; pub(super) fn request_durability( diff --git a/crates/engine/src/error.rs b/crates/engine/src/error.rs new file mode 100644 index 00000000000..7c1a3e11129 --- /dev/null +++ b/crates/engine/src/error.rs @@ -0,0 +1,262 @@ +use std::io; +use std::num::ParseIntError; +use std::path::PathBuf; +use std::sync::{MutexGuard, PoisonError}; + +use hex::FromHexError; +use spacetimedb_commitlog::repo::TxOffset; +use spacetimedb_durability::DurabilityExited; +use spacetimedb_expr::errors::TypingError; +use spacetimedb_fs_utils::lockfile::advisory::LockError; +use spacetimedb_lib::Identity; +use spacetimedb_schema::error::ValidationErrors; +use spacetimedb_schema::table_name::TableName; +use spacetimedb_snapshot::SnapshotError; +use spacetimedb_table::table::ReadViaBsatnError; +use thiserror::Error; + +use spacetimedb_lib::buffer::DecodeError; +use spacetimedb_primitives::*; +use spacetimedb_sats::hash::Hash; +use spacetimedb_sats::product_value::InvalidFieldError; +use spacetimedb_schema::def::error::{LibError, RelationError, SchemaErrors}; +use spacetimedb_schema::relation::FieldName; + +pub use spacetimedb_datastore::error::{DatastoreError, IndexError, SequenceError, TableError}; + +#[derive(Error, Debug, PartialEq, Eq)] +pub enum SubscriptionError { + #[error("Index not found: {0:?}")] + NotFound(IndexId), + #[error("Empty string")] + Empty, + #[error("Unsupported query on subscription: {0:?}")] + Unsupported(String), + #[error("Subscribing to queries in one call is not supported")] + Multiple, +} + +#[derive(Error, Debug)] +pub enum PlanError { + #[error("Unsupported feature: `{feature}`")] + Unsupported { feature: String }, + #[error("Unknown table: `{table}`")] + UnknownTable { table: Box }, + #[error("Qualified Table `{expect}` not found")] + TableNotFoundQualified { expect: String }, + #[error("Unknown field: `{field}` not found in the table(s): `{tables:?}`")] + UnknownField { field: String, tables: Vec }, + #[error("Unknown field name: `{field}` not found in the table(s): `{tables:?}`")] + UnknownFieldName { field: FieldName, tables: Vec }, + #[error("Field(s): `{fields:?}` not found in the table(s): `{tables:?}`")] + UnknownFields { + fields: Vec, + tables: Vec, + }, + #[error("Ambiguous field: `{field}`. Also found in {found:?}")] + AmbiguousField { field: String, found: Vec }, + #[error("Plan error: `{0}`")] + Unstructured(String), + #[error("Internal DBError: `{0}`")] + DatabaseInternal(Box), + #[error("Relation Error: `{0}`")] + Relation(#[from] RelationError), +} + +#[derive(Error, Debug)] +pub enum DatabaseError { + #[error("Replica not found: {0}")] + NotFound(u64), + #[error("Database is already opened. Path: `{0}`. Error: {1}")] + DatabasedOpened(PathBuf, anyhow::Error), +} + +impl From for DatabaseError { + fn from(LockError { path, source, .. }: LockError) -> Self { + Self::DatabasedOpened(path, source.into()) + } +} + +#[derive(Error, Debug)] +pub enum ViewError { + #[error("{0}")] + Args(String), + #[error("{0}")] + NoSuchModule(String), + #[error("no such view")] + NoSuchView, + #[error("Table does not exist for view `{0}`")] + TableDoesNotExist(ViewId), + #[error("missing client connection for view call trigged by subscription")] + MissingClientConnection, + #[error("DB error during view call: {0}")] + DatastoreError(#[from] DatastoreError), + #[error("The module instance encountered a fatal error: {0}")] + InternalError(String), +} + +#[derive(Error, Debug)] +pub enum DBError { + #[error("LibError: {0}")] + Lib(#[from] LibError), + #[error("BufferError: {0}")] + Buffer(#[from] DecodeError), + #[error("DatastoreError: {0}")] + Datastore(#[from] DatastoreError), + #[error("SequenceError: {0}")] + Sequence2(#[from] SequenceError), + #[error("SchemaError: {0}")] + Schema(SchemaErrors), + #[error("IOError: {0}.")] + IoError(#[from] std::io::Error), + #[error("ParseIntError: {0}.")] + ParseInt(#[from] ParseIntError), + #[error("Hex representation of hash decoded to incorrect number of bytes: {0}.")] + DecodeHexHash(usize), + #[error("DecodeHexError: {0}.")] + DecodeHex(#[from] FromHexError), + #[error("DatabaseError: {0}.")] + Database(#[from] DatabaseError), + #[error("SledError: {0}.")] + SledDbError(#[from] sled::Error), + #[error("Mutex was poisoned acquiring lock on MessageLog: {0}")] + MessageLogPoisoned(String), + #[error("SubscriptionError: {0}")] + Subscription(#[from] SubscriptionError), + #[error("SqlParserError: {error}, executing: `{sql}`")] + SqlParser { + sql: String, + error: sqlparser::parser::ParserError, + }, + #[error("SqlError: {error}, executing: `{sql}`")] + Plan { sql: String, error: PlanError }, + #[error("Error replaying the commit log: {0}")] + LogReplay(#[from] LogReplayError), + #[error(transparent)] + // Box the inner [`SnapshotError`] to keep Clippy quiet about large `Err` variants. + Snapshot(#[from] Box), + #[error("Error reading a value from a table through BSATN: {0}")] + ReadViaBsatnError(#[from] ReadViaBsatnError), + #[error("Module validation errors: {0}")] + ModuleValidationErrors(#[from] ValidationErrors), + #[error(transparent)] + Other(#[from] anyhow::Error), + #[error(transparent)] + TypeError(#[from] TypingError), + #[error("{error}, executing: `{sql}`")] + WithSql { + #[source] + error: Box, + sql: Box, + }, + #[error(transparent)] + RestoreSnapshot(#[from] RestoreSnapshotError), + #[error(transparent)] + DurabilityGone(#[from] DurabilityExited), + #[error(transparent)] + View(#[from] ViewError), +} + +impl From for DBError { + fn from(value: InvalidFieldError) -> Self { + LibError::from(value).into() + } +} + +impl From for DBError { + fn from(err: spacetimedb_table::read_column::TypeError) -> Self { + DatastoreError::Table(TableError::from(err)).into() + } +} + +impl From for PlanError { + fn from(err: DBError) -> Self { + PlanError::DatabaseInternal(Box::new(err)) + } +} + +impl<'a, T: ?Sized + 'a> From>> for DBError { + fn from(err: PoisonError>) -> Self { + DBError::MessageLogPoisoned(err.to_string()) + } +} + +impl From for DBError { + fn from(e: spacetimedb_durability::local::OpenError) -> Self { + use spacetimedb_durability::local::OpenError::*; + + match e { + Lock(e) => Self::from(DatabaseError::from(e)), + Commitlog(e) => Self::Other(e.into()), + } + } +} + +#[derive(Debug, Error)] +pub enum LogReplayError { + #[error( + "Out-of-order commit detected: {} in segment {} after offset {}", + .commit_offset, + .segment_offset, + .last_commit_offset + )] + OutOfOrderCommit { + commit_offset: u64, + segment_offset: usize, + last_commit_offset: u64, + }, + #[error( + "Error reading segment {}/{} at commit {}: {}", + .segment_offset, + .total_segments, + .commit_offset, + .source + )] + TrailingSegments { + segment_offset: usize, + total_segments: usize, + commit_offset: u64, + #[source] + source: io::Error, + }, + #[error("Could not reset log to offset {}: {}", .offset, .source)] + Reset { + offset: u64, + #[source] + source: io::Error, + }, + #[error("Missing object {} referenced from commit {}", .hash, .commit_offset)] + MissingObject { hash: Hash, commit_offset: u64 }, + #[error( + "Unexpected I/O error reading commit {} from segment {}: {}", + .commit_offset, + .segment_offset, + .source + )] + Io { + segment_offset: usize, + commit_offset: u64, + #[source] + source: io::Error, + }, +} + +#[derive(Debug, Error)] +pub enum RestoreSnapshotError { + #[error("Snapshot has incorrect database_identity: expected {expected} but found {actual}")] + IdentityMismatch { expected: Identity, actual: Identity }, + #[error("Failed to restore datastore from snapshot")] + Datastore(#[source] Box), + #[error("Failed to read snapshot")] + Snapshot(#[from] Box), + #[error("Failed to bootstrap datastore without snapshot")] + Bootstrap(#[source] Box), + #[error("No connected snapshot found, commitlog starts at {min_commitlog_offset}")] + NoConnectedSnapshot { min_commitlog_offset: TxOffset }, + #[error("Failed to invalidate snapshots at or newer than {offset}")] + Invalidate { + offset: TxOffset, + #[source] + source: Box, + }, +} diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs new file mode 100644 index 00000000000..09229362bd7 --- /dev/null +++ b/crates/engine/src/lib.rs @@ -0,0 +1,132 @@ +pub(crate) mod durability; +pub mod error; +pub mod metrics; +pub mod persistence; +pub mod relational_db; +pub mod snapshot; +pub mod sql; +pub mod update; +pub mod util; + +use std::sync::Arc; + +use enum_map::EnumMap; +use spacetimedb_datastore::execution_context::WorkloadType; +use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; +use spacetimedb_datastore::traits::TxData; +pub use spacetimedb_lib::identity; +pub use spacetimedb_lib::Identity; +pub use spacetimedb_sats::hash; +use spacetimedb_schema::reducer_name::ReducerName; + +use crate::metrics::ExecutionCounters; + +/// A message that is processed by the [`spawn_metrics_recorder`] actor. +/// We use a separate task to record metrics to avoid blocking transactions. +pub struct MetricsMessage { + /// The reducer the produced these metrics. + reducer: Option, + /// Metrics from a mutable transaction. + metrics_for_writer: Option, + /// Metrics from a read-only transaction. + /// A message may have metrics for both types of transactions, + /// because metrics for a reducer and its subscription updates are recorded together. + metrics_for_reader: Option, + /// The row updates for an immutable transaction. + /// Needed for insert and delete counters. + tx_data: Option>, + /// Cached metrics counters for each workload type. + counters: Arc>, +} + +/// The handle used to send work to the tx metrics recorder. +#[derive(Clone)] +pub struct MetricsRecorderQueue { + tx: spacetimedb_runtime::sync::mpsc::UnboundedSender, +} + +impl MetricsRecorderQueue { + pub fn send_metrics( + &self, + reducer: Option, + metrics_for_writer: Option, + metrics_for_reader: Option, + tx_data: Option>, + counters: Arc>, + ) { + if let Err(err) = self.tx.send(MetricsMessage { + reducer, + metrics_for_writer, + metrics_for_reader, + tx_data, + counters, + }) { + log::warn!("failed to send metrics: {err}"); + } + } +} + +fn record_metrics( + MetricsMessage { + reducer, + metrics_for_writer, + metrics_for_reader, + tx_data, + counters, + }: MetricsMessage, +) { + if let Some(tx_metrics) = metrics_for_writer { + tx_metrics.report( + // If row updates are present, + // they will always belong to the writer transaction. + tx_data.as_deref(), + reducer.as_ref(), + |wl| &counters[wl], + ); + } + if let Some(tx_metrics) = metrics_for_reader { + tx_metrics.report( + // If row updates are present, + // they will never belong to the reader transaction. + // Passing row updates here will most likely panic. + None, + reducer.as_ref(), + |wl| &counters[wl], + ); + } +} + +/// The metrics recorder is a side channel that the main database thread forwards metrics to. +/// While we want to avoid unnecessary compute on the critical path, communicating with other +/// threads is not free, and for this case in particular waking a parked task is not free. +/// +/// Previously, each tx would send its metrics to the recorder task. As soon as the recorder +/// task `recv`d a message, it would update the counters and gauges, and immediately wait for +/// the next tx's message. This meant that the tx would need to be more expensive than the +/// recording of its metrics in order for the recorder task not to be parked on `recv` when +/// the tx would `send` its metrics. This would obviously never be the case, and so each `send` +/// would incur the overhead of waking the task. +/// +/// To mitigate this, we now record metrics, for potentially many transactions, periodically +/// every 5ms. +const TX_METRICS_RECORDING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5); + +/// Spawns a task for recording transaction metrics. +/// Returns the handle for pushing metrics to the recorder. +pub fn spawn_tx_metrics_recorder( + handle: &spacetimedb_runtime::Handle, +) -> (MetricsRecorderQueue, spacetimedb_runtime::AbortHandle) { + let handle_clone = handle.clone(); + let (tx, mut rx) = spacetimedb_runtime::sync::mpsc::unbounded_channel(); + let abort_handle = handle + .spawn(async move { + loop { + handle_clone.sleep(TX_METRICS_RECORDING_INTERVAL).await; + while let Ok(metrics) = rx.try_recv() { + record_metrics(metrics); + } + } + }) + .abort_handle(); + (MetricsRecorderQueue { tx }, abort_handle) +} diff --git a/crates/engine/src/metrics.rs b/crates/engine/src/metrics.rs new file mode 100644 index 00000000000..f5ee0011b23 --- /dev/null +++ b/crates/engine/src/metrics.rs @@ -0,0 +1,181 @@ +use once_cell::sync::Lazy; +use prometheus::{GaugeVec, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec}; +use spacetimedb_datastore::{ + db_metrics::DB_METRICS, execution_context::WorkloadType, locking_tx_datastore::datastore::MetricsRecorder, +}; +use spacetimedb_lib::{metrics::ExecutionMetrics, Identity}; +use spacetimedb_metrics::metrics_group; + +metrics_group!( + pub struct EngineMetrics { + #[name = spacetime_num_bytes_sent_to_clients_total] + #[help = "The cumulative number of bytes sent to clients"] + #[labels(txn_type: WorkloadType, db: Identity)] + pub bytes_sent_to_clients: IntCounterVec, + + #[name = spacetime_replay_total_time_seconds] + #[help = "Total time spent replaying a database upon restart, including snapshot read, snapshot restore and commitlog replay"] + #[labels(db: Identity)] + pub replay_total_time_seconds: GaugeVec, + + #[name = spacetime_replay_snapshot_read_time_seconds] + #[help = "Time spent reading a snapshot from disk before restoring the snapshot upon restart"] + #[labels(db: Identity)] + pub replay_snapshot_read_time_seconds: GaugeVec, + + #[name = spacetime_replay_snapshot_restore_time_seconds] + #[help = "Time spent restoring a database from a snapshot after reading the snapshot and before commitlog replay upon restart"] + #[labels(db: Identity)] + pub replay_snapshot_restore_time_seconds: GaugeVec, + + #[name = spacetime_replay_commitlog_time_seconds] + #[help = "Time spent replaying the commitlog after restoring from a snapshot upon restart"] + #[labels(db: Identity)] + pub replay_commitlog_time_seconds: GaugeVec, + + #[name = spacetime_replay_commitlog_num_commits] + #[help = "Number of commits replayed after restoring from a snapshot upon restart"] + #[labels(db: Identity)] + pub replay_commitlog_num_commits: IntGaugeVec, + + // Snapshot creation should take in the order of milliseconds, + // but log data suggests that there are outliers. + // So let's track a wide range of buckets to get a better picture. + // + // We also track the timing without `asyncify` scheduling overhead + // (`snapshot_creation_time_inner`), and the snapshot compression + // timing with / without scheduling overhead (`snapshot_compression_time_total` + // and `snapshot_compression_time_inner`, respectively). + // + // Compression may have contributed to observed outliers, but is no + // longer included in the snapshot creation timing. + #[name = spacetime_snapshot_creation_time_total_sec] + #[help = "The time (in seconds) it took to take and store a database snapshot, including scheduling overhead"] + #[labels(db: Identity)] + #[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)] + pub snapshot_creation_time_total: HistogramVec, + + #[name = spacetime_snapshot_creation_time_inner_sec] + #[help = "The time (in seconds) it took to take and store a database snapshot, excluding scheduling overhead"] + #[labels(db: Identity)] + #[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)] + pub snapshot_creation_time_inner: HistogramVec, + + #[name = spacetime_snapshot_creation_time_fsync_sec] + #[help = "The time (in seconds) it took to fsync a database snapshot, excluding scheduling overhead"] + #[labels(db: Identity)] + #[buckets(0.0005, 0.001, 0.005, 0.01, 0.1, 1.0, 5.0, 10.0)] + pub snapshot_creation_time_fsync: HistogramVec, + + #[name = spacetime_snapshot_compression_time_total_sec] + #[help = "The time (in seconds) it took to do a compression pass on the snapshot repository, including scheduling overhead"] + #[labels(db: Identity)] + #[buckets(0.001, 0.01, 0.1, 1.0, 5.0, 10.0)] + pub snapshot_compression_time_total: HistogramVec, + + #[name = spacetime_snapshot_compression_time_inner_sec] + #[help = "The time (in seconds) it took to do a compression pass on the snapshot repository, excluding scheduling overhead"] + #[labels(db: Identity)] + #[buckets(0.001, 0.01, 0.1, 1.0, 5.0, 10.0)] + pub snapshot_compression_time_inner: HistogramVec, + + #[name = spacetime_snapshot_compression_time_per_snapshot_sec] + #[help = "The time (in seconds) it took to compress a single snapshot"] + #[labels(db: Identity)] + #[buckets(0.001, 0.01, 0.1, 1.0, 5.0, 10.0)] + pub snapshot_compression_time_single: HistogramVec, + + #[name = spacetime_snapshot_compression_skipped] + #[help = "The number of snapshots skipped in a single compression pass because they were already compressed"] + #[labels(db: Identity)] + pub snapshot_compression_skipped: IntGaugeVec, + + #[name = spacetime_snapshot_compression_compressed] + #[help = "The number of snapshots compressed in a single compression pass"] + #[labels(db: Identity)] + pub snapshot_compression_compressed: IntGaugeVec, + + #[name = spacetime_snapshot_compression_objects_compressed] + #[help = "The number of snapshot objects compressed in a single compression pass"] + #[labels(db: Identity)] + pub snapshot_compression_objects_compressed: IntGaugeVec, + + #[name = spacetime_snapshot_compression_objects_hardlinked] + #[help = "The number of snapshot objects hardlinked in a single compression pass"] + #[labels(db: Identity)] + pub snapshot_compression_objects_hardlinked: IntGaugeVec, + + #[name = spacetime_durability_blocking_send_duration_sec] + #[help = "Latency of blocking sends in request_durability (seconds); _count gives the number of times the channel was full"] + #[labels(database_identity: Identity)] + #[buckets(0.001, 0.01, 0.1, 1.0, 10.0)] + pub durability_blocking_send_duration: HistogramVec, + } +); + +pub static ENGINE_METRICS: Lazy = Lazy::new(EngineMetrics::new); + +#[derive(Debug)] +pub struct ExecutionCounters { + rdb_num_index_seeks: IntCounter, + rdb_num_rows_scanned: IntCounter, + rdb_num_bytes_scanned: IntCounter, + rdb_num_bytes_written: IntCounter, + bytes_sent_to_clients: IntCounter, + delta_queries_matched: IntCounter, + delta_queries_evaluated: IntCounter, + duplicate_rows_evaluated: IntCounter, + duplicate_rows_sent: IntCounter, +} + +impl ExecutionCounters { + pub fn new(workload: &WorkloadType, db: &Identity) -> Self { + Self { + rdb_num_index_seeks: DB_METRICS.rdb_num_index_seeks.with_label_values(workload, db), + rdb_num_rows_scanned: DB_METRICS.rdb_num_rows_scanned.with_label_values(workload, db), + rdb_num_bytes_scanned: DB_METRICS.rdb_num_bytes_scanned.with_label_values(workload, db), + rdb_num_bytes_written: DB_METRICS.rdb_num_bytes_written.with_label_values(workload, db), + bytes_sent_to_clients: ENGINE_METRICS.bytes_sent_to_clients.with_label_values(workload, db), + delta_queries_matched: DB_METRICS.delta_queries_matched.with_label_values(db), + delta_queries_evaluated: DB_METRICS.delta_queries_evaluated.with_label_values(db), + duplicate_rows_evaluated: DB_METRICS.duplicate_rows_evaluated.with_label_values(db), + duplicate_rows_sent: DB_METRICS.duplicate_rows_sent.with_label_values(db), + } + } + + pub fn record(&self, metrics: &ExecutionMetrics) { + if metrics.index_seeks > 0 { + self.rdb_num_index_seeks.inc_by(metrics.index_seeks as u64); + } + if metrics.rows_scanned > 0 { + self.rdb_num_rows_scanned.inc_by(metrics.rows_scanned as u64); + } + if metrics.bytes_scanned > 0 { + self.rdb_num_bytes_scanned.inc_by(metrics.bytes_scanned as u64); + } + if metrics.bytes_written > 0 { + self.rdb_num_bytes_written.inc_by(metrics.bytes_written as u64); + } + if metrics.bytes_sent_to_clients > 0 { + self.bytes_sent_to_clients.inc_by(metrics.bytes_sent_to_clients as u64); + } + if metrics.delta_queries_matched > 0 { + self.delta_queries_matched.inc_by(metrics.delta_queries_matched); + } + if metrics.delta_queries_evaluated > 0 { + self.delta_queries_evaluated.inc_by(metrics.delta_queries_evaluated); + } + if metrics.duplicate_rows_evaluated > 0 { + self.duplicate_rows_evaluated.inc_by(metrics.duplicate_rows_evaluated); + } + if metrics.duplicate_rows_sent > 0 { + self.duplicate_rows_sent.inc_by(metrics.duplicate_rows_sent); + } + } +} + +impl MetricsRecorder for ExecutionCounters { + fn record(&self, metrics: &ExecutionMetrics) { + self.record(metrics); + } +} diff --git a/crates/core/src/db/persistence.rs b/crates/engine/src/persistence.rs similarity index 92% rename from crates/core/src/db/persistence.rs rename to crates/engine/src/persistence.rs index 0edef7ab4b8..de152b0c6f7 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/engine/src/persistence.rs @@ -10,7 +10,8 @@ use spacetimedb_durability::{DurabilityExited, TxOffset}; use spacetimedb_paths::server::ServerDataDir; use spacetimedb_snapshot::DynSnapshotRepo; -use crate::{messages::control_db::Database, util::asyncify}; +use crate::util::asyncify; +use spacetimedb_lib::Identity; use spacetimedb_runtime::Handle; use super::{ @@ -84,6 +85,13 @@ pub type Durability = dyn spacetimedb_durability::Durability; /// configured or the database is in follower state. pub type DiskSizeFn = Arc io::Result + Send + Sync>; +#[derive(Clone, Copy, Debug)] +pub struct Database { + pub id: u64, + pub database_identity: Identity, + pub owner_identity: Identity, +} + /// Persistence services for a database. pub struct Persistence { /// The [Durability] to use, for persisting transactions. @@ -109,9 +117,9 @@ impl Persistence { durability: impl spacetimedb_durability::Durability + 'static, disk_size: impl Fn() -> io::Result + Send + Sync + 'static, snapshots: Option, - runtime: tokio::runtime::Handle, + runtime: Handle, ) -> Self { - Self::new_with_runtime(durability, disk_size, snapshots, Handle::tokio(runtime)) + Self::new_with_runtime(durability, disk_size, snapshots, runtime) } pub fn new_with_runtime( @@ -175,7 +183,7 @@ impl Persistence { /// A persistence provider is a "factory" of sorts that can produce [Persistence] /// services for a given replica. /// -/// The [crate::host::HostController] uses this to obtain [Persistence]s from +/// The host controller uses this to obtain [Persistence]s from /// an external source, and construct [relational_db::RelationalDB]s with it. /// /// This is an `async_trait` to allow it to be used as a trait object. @@ -215,15 +223,16 @@ impl LocalPersistenceProvider { #[async_trait] impl PersistenceProvider for LocalPersistenceProvider { async fn persistence(&self, database: &Database, replica_id: u64) -> anyhow::Result { + let database_identity = database.database_identity; let replica_dir = self.data_dir.replica(replica_id); let snapshot_dir = replica_dir.snapshots(); let runtime = Handle::tokio_current(); - let database_identity = database.database_identity; - let snapshot_worker = - asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id)) - .await - .map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled, runtime.clone()))?; + let snapshot_worker = asyncify(&runtime, move || { + relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id) + }) + .await + .map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled, runtime.clone()))?; let (durability, disk_size) = relational_db::local_durability_with_options( replica_dir, runtime.clone(), @@ -232,11 +241,12 @@ impl PersistenceProvider for LocalPersistenceProvider { ) .await?; - tokio::spawn(relational_db::snapshot_watching_commitlog_compressor( + runtime.spawn(relational_db::snapshot_watching_commitlog_compressor( snapshot_worker.subscribe(), None, None, durability.clone(), + runtime.clone(), )); Ok(Persistence { diff --git a/crates/core/src/db/relational_db.rs b/crates/engine/src/relational_db.rs similarity index 98% rename from crates/core/src/db/relational_db.rs rename to crates/engine/src/relational_db.rs index 2e2914fb670..edcaac618f4 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/engine/src/relational_db.rs @@ -1,9 +1,9 @@ -use crate::db::durability::{request_durability, spawn_close as spawn_durability_close}; -use crate::db::MetricsRecorderQueue; +use crate::durability::{request_durability, spawn_close as spawn_durability_close}; use crate::error::{DBError, RestoreSnapshotError}; -use crate::subscription::ExecutionCounters; +use crate::metrics::ExecutionCounters; +use crate::metrics::ENGINE_METRICS; use crate::util::asyncify; -use crate::worker_metrics::WORKER_METRICS; +use crate::MetricsRecorderQueue; use anyhow::{anyhow, Context}; use enum_map::EnumMap; use spacetimedb_commitlog::repo::OnNewSegmentFn; @@ -43,6 +43,7 @@ use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Identity; use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath}; use spacetimedb_primitives::*; +use spacetimedb_runtime::sync::watch; use spacetimedb_runtime::Handle; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::raw_identifier::RawIdentifier; @@ -62,7 +63,6 @@ use std::borrow::Cow; use std::io; use std::ops::RangeBounds; use std::sync::Arc; -use tokio::sync::watch; pub use super::persistence::{DiskSizeFn, Durability, Persistence}; pub use super::snapshot::SnapshotWorker; @@ -303,7 +303,7 @@ impl RelationalDB { apply_history(&inner, database_identity, history)?; let elapsed_time = start_time.elapsed(); - WORKER_METRICS + ENGINE_METRICS .replay_total_time_seconds .with_label_values(&database_identity) .set(elapsed_time.as_secs_f64()); @@ -495,7 +495,7 @@ impl RelationalDB { let elapsed_time = start.elapsed(); - WORKER_METRICS + ENGINE_METRICS .replay_snapshot_read_time_seconds .with_label_values(database_identity) .set(elapsed_time.as_secs_f64()); @@ -519,7 +519,7 @@ impl RelationalDB { .inspect(|_| { let elapsed_time = start.elapsed(); - WORKER_METRICS.replay_snapshot_restore_time_seconds.with_label_values(database_identity).set(elapsed_time.as_secs_f64()); + ENGINE_METRICS.replay_snapshot_restore_time_seconds.with_label_values(database_identity).set(elapsed_time.as_secs_f64()); log::info!( "[{database_identity}] DATABASE: restored from snapshot of tx_offset {snapshot_offset} in {elapsed_time:?}", @@ -1061,7 +1061,7 @@ impl RelationalDB { /// Reports the `TxMetrics`s passed. /// /// Should only be called after the tx lock has been fully released. - pub(crate) fn report_tx_metrics( + pub fn report_tx_metrics( &self, reducer: Option, tx_data: Option>, @@ -1090,7 +1090,10 @@ const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(10 const VIEW_CLEANUP_BUDGET: std::time::Duration = std::time::Duration::from_millis(10); /// Spawn a background task that periodically cleans up expired views -pub fn spawn_view_cleanup_loop(db: Arc) -> tokio::task::AbortHandle { +pub fn spawn_view_cleanup_loop( + db: Arc, + handle: &spacetimedb_runtime::Handle, +) -> spacetimedb_runtime::AbortHandle { fn run_view_cleanup(db: &RelationalDB) { match db.with_auto_commit(Workload::Internal, |tx| { tx.clear_expired_views(VIEWS_EXPIRATION, VIEW_CLEANUP_BUDGET) @@ -1117,23 +1120,19 @@ pub fn spawn_view_cleanup_loop(db: Arc) -> tokio::task::AbortHandl } } - tokio::spawn(async move { - loop { - // Offload actual cleanup to blocking thread pool, as `VIEW_CLEANUP_BUDGET` is defined - // in milliseconds, which may be too long for async tasks. - let db = db.clone(); - let db_identity = db.database_identity(); - tokio::task::spawn_blocking(move || run_view_cleanup(&db)) - .await - .inspect_err(|e| { - log::error!("[{}] DATABASE: failed to run view cleanup task: {}", db_identity, e); - }) - .ok(); + let handle_clone = handle.clone(); + handle + .spawn(async move { + loop { + // Offload actual cleanup to blocking thread pool, as `VIEW_CLEANUP_BUDGET` is defined + // in milliseconds, which may be too long for async tasks. + let db = db.clone(); + handle_clone.spawn_blocking(move || run_view_cleanup(&db)).await; - tokio::time::sleep(VIEWS_EXPIRATION).await; - } - }) - .abort_handle() + handle_clone.sleep(VIEWS_EXPIRATION).await; + } + }) + .abort_handle() } impl RelationalDB { pub fn create_table(&self, tx: &mut MutTx, schema: TableSchema) -> Result { @@ -1539,7 +1538,7 @@ impl RelationalDB { } /// Read the value of [ST_VARNAME_ROW_LIMIT] from `st_var` - pub(crate) fn row_limit(&self, tx: &Tx) -> Result, DBError> { + pub fn row_limit(&self, tx: &Tx) -> Result, DBError> { let data = self.read_var(tx, StVarName::RowLimit); if let Some(StVarValue::U64(limit)) = data? { @@ -1650,10 +1649,10 @@ fn apply_history( history: impl durability::History, ) -> Result<(), DBError> { let counters = ApplyHistoryCounters { - replay_commitlog_time_seconds: WORKER_METRICS + replay_commitlog_time_seconds: ENGINE_METRICS .replay_commitlog_time_seconds .with_label_values(&database_identity), - replay_commitlog_num_commits: WORKER_METRICS + replay_commitlog_num_commits: ENGINE_METRICS .replay_commitlog_num_commits .with_label_values(&database_identity), }; @@ -1696,10 +1695,11 @@ pub async fn local_durability_with_options( snapshot_worker.request_snapshot_ignore_closed(); }) as Arc }); - let local = asyncify(move || { + let durability_runtime = runtime.clone(); + let local = asyncify(&runtime, move || { durability::Local::open( replica_dir.clone(), - runtime, + durability_runtime, opts, // Give the durability a handle to request a new snapshot run, // which it will send down whenever we rotate commitlog segments. @@ -1719,9 +1719,12 @@ pub async fn local_durability_with_options( /// Open a [History] for replay from the local durable state. /// /// Currently, this is simply a read-only copy of the commitlog. -pub async fn local_history(replica_dir: &ReplicaDir) -> io::Result + use<>> { +pub async fn local_history( + replica_dir: &ReplicaDir, + runtime: &Handle, +) -> io::Result + use<>> { let commitlog_dir = replica_dir.commit_log(); - asyncify(move || Commitlog::open(commitlog_dir, <_>::default(), None)).await + asyncify(runtime, move || Commitlog::open(commitlog_dir, <_>::default(), None)).await } /// Watches snapshot creation events and compresses all commitlog segments older @@ -1730,9 +1733,10 @@ pub async fn local_history(replica_dir: &ReplicaDir) -> io::Result, - mut clog_tx: Option>, - mut snap_tx: Option>, + mut clog_tx: Option>, + mut snap_tx: Option>, durability: LocalDurability, + runtime: Handle, ) { let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update(); while snapshot_rx.changed().await.is_ok() { @@ -1745,7 +1749,7 @@ pub async fn snapshot_watching_commitlog_compressor( tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}"); } - let res: io::Result<_> = asyncify(move || { + let res: io::Result<_> = asyncify(&runtime, move || { let segment_offsets = durability.existing_segment_offsets()?; let start_idx = segment_offsets .binary_search(&prev_snapshot_offset) @@ -1807,19 +1811,21 @@ fn default_row_count_fn(db: Identity) -> RowCountFn { #[cfg(any(test, feature = "test"))] pub mod tests_utils { - use crate::db::snapshot; - use crate::db::snapshot::SnapshotWorker; - use crate::messages::control_db::HostType; + use crate::snapshot; + use crate::snapshot::SnapshotWorker; + use crate::MetricsRecorderQueue; use super::*; use core::ops::Deref; use durability::{Durability, EmptyHistory}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::locking_tx_datastore::TxId; + use spacetimedb_datastore::system_tables::ModuleKind; use spacetimedb_fs_utils::compression::CompressType; use spacetimedb_lib::{bsatn::to_vec, ser::Serialize}; use spacetimedb_paths::server::ReplicaDir; use spacetimedb_paths::FromPathUnchecked; + use spacetimedb_runtime::{TokioHandle, TokioRuntime, TokioRuntimeBuilder}; use tempfile::TempDir; pub enum TestDBDir { @@ -1848,7 +1854,7 @@ pub mod tests_utils { pub type TestDBParts = ( Arc, Option>>, - Option, + Option, Option, ); @@ -1893,7 +1899,7 @@ pub mod tests_utils { struct DurableState { durability: Arc>, - rt: tokio::runtime::Runtime, + rt: TokioRuntime, replica_dir: TestDBDir, } @@ -1919,7 +1925,7 @@ pub mod tests_utils { /// database. pub fn durable() -> Result { let dir = TempReplicaDir::new()?; - let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?; + let rt = TokioRuntimeBuilder::new_multi_thread().enable_all().build()?; // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); let (db, durability) = Self::durable_internal(&dir, rt.handle().clone(), true)?; @@ -1938,7 +1944,7 @@ pub mod tests_utils { pub fn durable_without_snapshot_repo() -> Result { let dir = TempReplicaDir::new()?; - let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?; + let rt = TokioRuntimeBuilder::new_multi_thread().enable_all().build()?; // Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`. let _rt = rt.enter(); let (db, durability) = Self::durable_internal(&dir, rt.handle().clone(), false)?; @@ -1957,7 +1963,7 @@ pub mod tests_utils { pub fn open_existing_durable( root: &ReplicaDir, - rt: tokio::runtime::Handle, + rt: TokioHandle, replica_id: u64, db_identity: Identity, owner_identity: Identity, @@ -1972,9 +1978,8 @@ pub mod tests_utils { .transpose()?; let runtime = Handle::tokio(rt.clone()); - let (local, disk_size_fn) = + let (local, disk_size_fn): (LocalDurability, DiskSizeFn) = rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?; - let history = local.as_history(); let persistence = Persistence { durability: local.clone(), @@ -1986,7 +1991,7 @@ pub mod tests_utils { let (db, _) = RelationalDB::open( db_identity, owner_identity, - history, + local.as_history(), Some(persistence), None, PagePool::new_for_test(), @@ -2064,7 +2069,7 @@ pub mod tests_utils { /// Handle to the tokio runtime, available if [`Self::durable`] was used /// to create the [`TestDB`]. - pub fn runtime(&self) -> Option<&tokio::runtime::Handle> { + pub fn runtime(&self) -> Option<&TokioHandle> { self.durable.as_ref().map(|ds| ds.rt.handle()) } @@ -2088,7 +2093,7 @@ pub mod tests_utils { fn durable_internal( root: &ReplicaDir, - rt: tokio::runtime::Handle, + rt: TokioHandle, want_snapshot_repo: bool, ) -> Result<(RelationalDB, Arc>), DBError> { let snapshots = want_snapshot_repo @@ -2099,16 +2104,15 @@ pub mod tests_utils { }) .transpose()?; let runtime = Handle::tokio(rt.clone()); - let (local, disk_size_fn) = + let (local, disk_size_fn): (LocalDurability, DiskSizeFn) = rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?; - let history = local.as_history(); let persistence = Persistence { durability: local.clone(), disk_size: disk_size_fn, snapshots, runtime, }; - let db = Self::open_db(history, Some(persistence), None, 0)?; + let db = Self::open_db(local.as_history(), Some(persistence), None, 0)?; Ok((db, local)) } @@ -2130,7 +2134,7 @@ pub mod tests_utils { assert_eq!(connected_clients.len(), expected_num_clients); let db = db.with_row_count(Self::row_count_fn()); db.with_auto_commit(Workload::Internal, |tx| { - db.set_initialized(tx, Program::empty(HostType::Wasm.into())) + db.set_initialized(tx, Program::empty(ModuleKind::WASM)) })?; Ok(db) } @@ -2309,7 +2313,7 @@ mod tests { use super::tests_utils::begin_mut_tx; use super::*; - use crate::db::relational_db::tests_utils::{begin_tx, create_view_for_test, insert, make_snapshot, TestDB}; + use crate::relational_db::tests_utils::{begin_tx, create_view_for_test, insert, make_snapshot, TestDB}; use anyhow::bail; use bytes::Bytes; use commitlog::payload::txdata; diff --git a/crates/core/src/db/snapshot.rs b/crates/engine/src/snapshot.rs similarity index 95% rename from crates/core/src/db/snapshot.rs rename to crates/engine/src/snapshot.rs index 7f2474e0d74..11b08727b7d 100644 --- a/crates/core/src/db/snapshot.rs +++ b/crates/engine/src/snapshot.rs @@ -14,10 +14,10 @@ use prometheus::{Histogram, IntGauge}; use spacetimedb_datastore::locking_tx_datastore::{committed_state::CommittedState, datastore::Locking}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::Identity; +use spacetimedb_runtime::sync::watch; use spacetimedb_snapshot::{CompressionStats, DynSnapshotRepo}; -use tokio::sync::watch; -use crate::worker_metrics::WORKER_METRICS; +use crate::metrics::ENGINE_METRICS; use spacetimedb_runtime::Handle; pub type SnapshotDatabaseState = Arc>; @@ -158,9 +158,9 @@ struct SnapshotMetrics { impl SnapshotMetrics { fn new(db: Identity) -> Self { Self { - snapshot_timing_total: WORKER_METRICS.snapshot_creation_time_total.with_label_values(&db), - snapshot_timing_inner: WORKER_METRICS.snapshot_creation_time_inner.with_label_values(&db), - snapshot_timing_fsync: WORKER_METRICS.snapshot_creation_time_fsync.with_label_values(&db), + snapshot_timing_total: ENGINE_METRICS.snapshot_creation_time_total.with_label_values(&db), + snapshot_timing_inner: ENGINE_METRICS.snapshot_creation_time_inner.with_label_values(&db), + snapshot_timing_fsync: ENGINE_METRICS.snapshot_creation_time_fsync.with_label_values(&db), } } } @@ -280,15 +280,15 @@ struct CompressionMetrics { impl CompressionMetrics { fn new(db: Identity) -> Self { Self { - timing_total: WORKER_METRICS.snapshot_compression_time_total.with_label_values(&db), - timing_inner: WORKER_METRICS.snapshot_compression_time_inner.with_label_values(&db), - timing_single: WORKER_METRICS.snapshot_compression_time_single.with_label_values(&db), - skipped: WORKER_METRICS.snapshot_compression_skipped.with_label_values(&db), - compressed: WORKER_METRICS.snapshot_compression_compressed.with_label_values(&db), - objects_compressed: WORKER_METRICS + timing_total: ENGINE_METRICS.snapshot_compression_time_total.with_label_values(&db), + timing_inner: ENGINE_METRICS.snapshot_compression_time_inner.with_label_values(&db), + timing_single: ENGINE_METRICS.snapshot_compression_time_single.with_label_values(&db), + skipped: ENGINE_METRICS.snapshot_compression_skipped.with_label_values(&db), + compressed: ENGINE_METRICS.snapshot_compression_compressed.with_label_values(&db), + objects_compressed: ENGINE_METRICS .snapshot_compression_objects_compressed .with_label_values(&db), - objects_hardlinked: WORKER_METRICS + objects_hardlinked: ENGINE_METRICS .snapshot_compression_objects_hardlinked .with_label_values(&db), } diff --git a/crates/core/src/sql/ast.rs b/crates/engine/src/sql/ast.rs similarity index 100% rename from crates/core/src/sql/ast.rs rename to crates/engine/src/sql/ast.rs diff --git a/crates/engine/src/sql/mod.rs b/crates/engine/src/sql/mod.rs new file mode 100644 index 00000000000..70aae2831d7 --- /dev/null +++ b/crates/engine/src/sql/mod.rs @@ -0,0 +1,2 @@ +pub mod ast; +pub mod rls; diff --git a/crates/core/src/sql/parser.rs b/crates/engine/src/sql/rls.rs similarity index 97% rename from crates/core/src/sql/parser.rs rename to crates/engine/src/sql/rls.rs index 66d216a5590..df063a6a63e 100644 --- a/crates/core/src/sql/parser.rs +++ b/crates/engine/src/sql/rls.rs @@ -1,4 +1,4 @@ -use crate::sql::ast::SchemaViewer; +use super::ast::SchemaViewer; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_expr::check::parse_and_type_sub; diff --git a/crates/core/src/db/update.rs b/crates/engine/src/update.rs similarity index 98% rename from crates/core/src/db/update.rs rename to crates/engine/src/update.rs index 32e3a219536..6d420714a88 100644 --- a/crates/core/src/db/update.rs +++ b/crates/engine/src/update.rs @@ -1,13 +1,13 @@ use super::relational_db::RelationalDB; -use crate::database_logger::SystemLogger; -use crate::sql::parser::RowLevelExpr; +use crate::sql::rls::RowLevelExpr; +use anyhow::Context; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_lib::db::auth::StTableType; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::AlgebraicValue; use spacetimedb_primitives::{ColSet, TableId}; use spacetimedb_schema::auto_migrate::{AutoMigratePlan, ManualMigratePlan, MigratePlan}; -use spacetimedb_schema::def::{TableDef, ViewDef}; +use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef}; use spacetimedb_schema::schema::{column_schemas_from_defs, IndexSchema, Schema, SequenceSchema, TableSchema}; /// The logger used for by [`update_database`] and friends. @@ -15,12 +15,6 @@ pub trait UpdateLogger { fn info(&self, msg: &str); } -impl UpdateLogger for SystemLogger { - fn info(&self, msg: &str) { - self.info(msg); - } -} - /// The result of a database update. /// Indicates whether clients should be disconnected when the update is complete. #[must_use] @@ -341,15 +335,25 @@ fn auto_migrate_database( Ok(res) } +/// Creates the table for `table_def` in `stdb`. +pub fn create_table_from_def( + stdb: &RelationalDB, + tx: &mut MutTxId, + module_def: &ModuleDef, + table_def: &TableDef, +) -> anyhow::Result<()> { + let schema = TableSchema::from_module_def(module_def, table_def, (), TableId::SENTINEL); + stdb.create_table(tx, schema) + .with_context(|| format!("failed to create table {}", &table_def.name))?; + Ok(()) +} + #[cfg(test)] mod test { use super::*; - use crate::{ - db::relational_db::{ - open_snapshot_repo, - tests_utils::{begin_mut_tx, insert, TestDB}, - }, - host::module_host::create_table_from_def, + use crate::relational_db::{ + open_snapshot_repo, + tests_utils::{begin_mut_tx, insert, TestDB}, }; use spacetimedb_datastore::locking_tx_datastore::PendingSchemaChange; use spacetimedb_datastore::system_tables::ST_EVENT_TABLE_ID; diff --git a/crates/engine/src/util.rs b/crates/engine/src/util.rs new file mode 100644 index 00000000000..4d07d7d3c06 --- /dev/null +++ b/crates/engine/src/util.rs @@ -0,0 +1,21 @@ +use spacetimedb_runtime::Handle; +use tracing::Span; + +/// Ergonomic wrapper for `runtime.spawn_blocking(f).await`. +/// +/// If `f` panics, it will be bubbled up to the calling task. +pub async fn asyncify(runtime: &Handle, f: F) -> R +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + // Ensure that `f` executes in the current span context. + // If there is no current span, or it is disabled, `span` is disabled. + let span = Span::current(); + runtime + .spawn_blocking(move || { + let _enter = span.enter(); + f() + }) + .await +} diff --git a/crates/core/testdata/README.md b/crates/engine/testdata/README.md similarity index 100% rename from crates/core/testdata/README.md rename to crates/engine/testdata/README.md diff --git a/crates/core/testdata/v1.2/replicas/22000001/clog/00000000000000000000.stdb.log b/crates/engine/testdata/v1.2/replicas/22000001/clog/00000000000000000000.stdb.log similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/clog/00000000000000000000.stdb.log rename to crates/engine/testdata/v1.2/replicas/22000001/clog/00000000000000000000.stdb.log diff --git a/crates/core/testdata/v1.2/replicas/22000001/clog/00000000000000000000.stdb.ofs b/crates/engine/testdata/v1.2/replicas/22000001/clog/00000000000000000000.stdb.ofs similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/clog/00000000000000000000.stdb.ofs rename to crates/engine/testdata/v1.2/replicas/22000001/clog/00000000000000000000.stdb.ofs diff --git a/crates/core/testdata/v1.2/replicas/22000001/db.lock b/crates/engine/testdata/v1.2/replicas/22000001/db.lock similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/db.lock rename to crates/engine/testdata/v1.2/replicas/22000001/db.lock diff --git a/crates/core/testdata/v1.2/replicas/22000001/module_logs/2025-08-18.log b/crates/engine/testdata/v1.2/replicas/22000001/module_logs/2025-08-18.log similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/module_logs/2025-08-18.log rename to crates/engine/testdata/v1.2/replicas/22000001/module_logs/2025-08-18.log diff --git a/crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/00000000000000000000.snapshot_bsatn b/crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/00000000000000000000.snapshot_bsatn similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/00000000000000000000.snapshot_bsatn rename to crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/00000000000000000000.snapshot_bsatn diff --git a/crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/19/30ce81246a4cdc25e9024ae0065d053adb2efbe1b5b7af457331d330e481e8 b/crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/19/30ce81246a4cdc25e9024ae0065d053adb2efbe1b5b7af457331d330e481e8 similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/19/30ce81246a4cdc25e9024ae0065d053adb2efbe1b5b7af457331d330e481e8 rename to crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/19/30ce81246a4cdc25e9024ae0065d053adb2efbe1b5b7af457331d330e481e8 diff --git a/crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/41/bb11b6d2cdc488192ee70d8175307d6f205756ed163f4237c6cba2936798dc b/crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/41/bb11b6d2cdc488192ee70d8175307d6f205756ed163f4237c6cba2936798dc similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/41/bb11b6d2cdc488192ee70d8175307d6f205756ed163f4237c6cba2936798dc rename to crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/41/bb11b6d2cdc488192ee70d8175307d6f205756ed163f4237c6cba2936798dc diff --git a/crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/45/4d2e2c62ff5d46c5b3e6de72d6277eb285fc2d6b0a5ac6f92498e08a9e5ecc b/crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/45/4d2e2c62ff5d46c5b3e6de72d6277eb285fc2d6b0a5ac6f92498e08a9e5ecc similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/45/4d2e2c62ff5d46c5b3e6de72d6277eb285fc2d6b0a5ac6f92498e08a9e5ecc rename to crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/45/4d2e2c62ff5d46c5b3e6de72d6277eb285fc2d6b0a5ac6f92498e08a9e5ecc diff --git a/crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/62/22df0e5ca93d3fb22762e12161246a1d5917c61ada5d81b8dcce12fd5780b3 b/crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/62/22df0e5ca93d3fb22762e12161246a1d5917c61ada5d81b8dcce12fd5780b3 similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/62/22df0e5ca93d3fb22762e12161246a1d5917c61ada5d81b8dcce12fd5780b3 rename to crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/62/22df0e5ca93d3fb22762e12161246a1d5917c61ada5d81b8dcce12fd5780b3 diff --git a/crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/79/4dced5633eca2ffee784d471f5203209169321083ef99de254ad24af0f6d5a b/crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/79/4dced5633eca2ffee784d471f5203209169321083ef99de254ad24af0f6d5a similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/79/4dced5633eca2ffee784d471f5203209169321083ef99de254ad24af0f6d5a rename to crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/79/4dced5633eca2ffee784d471f5203209169321083ef99de254ad24af0f6d5a diff --git a/crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/95/74dd6d2857fa771a1cd16be31fdef38f83c2fd3bcc05f4934e53bdbfa21f10 b/crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/95/74dd6d2857fa771a1cd16be31fdef38f83c2fd3bcc05f4934e53bdbfa21f10 similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/95/74dd6d2857fa771a1cd16be31fdef38f83c2fd3bcc05f4934e53bdbfa21f10 rename to crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/95/74dd6d2857fa771a1cd16be31fdef38f83c2fd3bcc05f4934e53bdbfa21f10 diff --git a/crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/9a/b95f5aaed7541289faa8bc4de886ce0281f11037c3424494e58fee92411241 b/crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/9a/b95f5aaed7541289faa8bc4de886ce0281f11037c3424494e58fee92411241 similarity index 100% rename from crates/core/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/9a/b95f5aaed7541289faa8bc4de886ce0281f11037c3424494e58fee92411241 rename to crates/engine/testdata/v1.2/replicas/22000001/snapshots/00000000000000000000.snapshot_dir/objects/9a/b95f5aaed7541289faa8bc4de886ce0281f11037c3424494e58fee92411241 diff --git a/crates/snapshot/Cargo.toml b/crates/snapshot/Cargo.toml index aa51c4e3bd8..ebf4d577670 100644 --- a/crates/snapshot/Cargo.toml +++ b/crates/snapshot/Cargo.toml @@ -6,10 +6,15 @@ rust-version.workspace = true license-file = "LICENSE" description = "Low-level interfaces for capturing and restoring snapshots of database states" +[features] +default = ["tokio"] +tokio = ["spacetimedb-durability/tokio"] +simulation = ["spacetimedb-durability/simulation"] + [dependencies] spacetimedb-table.workspace = true spacetimedb-data-structures.workspace = true -spacetimedb-durability.workspace = true +spacetimedb-durability = { path = "../durability", default-features = false } spacetimedb-lib.workspace = true spacetimedb-sats = { workspace = true, features = ["blake3"] } spacetimedb-primitives.workspace = true diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 189bafa11bd..90597c29611 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -17,6 +17,7 @@ use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor}; use spacetimedb::host::{DiskStorage, HostController, HostRuntimeConfig, MigratePlanResult, UpdateDatabaseResult}; use spacetimedb::identity::{AuthCtx, Identity}; use spacetimedb::messages::control_db::{Database, Node, Replica}; +use spacetimedb::metrics::ENGINE_METRICS; use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool; use spacetimedb::util::jobs::JobCores; use spacetimedb::worker_metrics::WORKER_METRICS; @@ -95,6 +96,7 @@ impl StandaloneEnv { let metrics_registry = prometheus::Registry::new(); metrics_registry.register(Box::new(&*WORKER_METRICS)).unwrap(); + metrics_registry.register(Box::new(&*ENGINE_METRICS)).unwrap(); metrics_registry.register(Box::new(&*DB_METRICS)).unwrap(); metrics_registry.register(Box::new(&*DATA_SIZE_METRICS)).unwrap();