Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.6.1"
version = "0.7.0"
edition = "2024"
repository = "https://github.com/boringSQL/dryrun"

Expand Down
33 changes: 32 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ dryrun lint

All commands work offline from the schema file. Each project has its own `dryrun.toml` and `.dryrun/`, there is no global state. Add `.dryrun/` to your `.gitignore`.

Snapshots live in `~/.dryrun/history.db`, keyed by `(project_id, database_id)`. The MCP server reads from the history db first and falls back to `.dryrun/schema.json` for first-run or shared snapshots. After `dryrun snapshot take` it will switch to DB.
Snapshots live in `.dryrun/history.db`, keyed by `(project_id, database_id)`. The MCP server reads from the history db first and falls back to `.dryrun/schema.json` for first-run or shared snapshots. After `dryrun snapshot take` it will switch to DB.

Static file `schema.json` will be deprecated in future.

### Multi-node: capture activity from replicas

Expand Down Expand Up @@ -193,6 +195,35 @@ Every DB-related command (`init`, `import`, `probe`, `dump-schema`, `lint`, `dri

> **Note:** the MCP server is currently single-database. Using the default profile. Or the option is to run one `dryrun mcp-serve` process per database. Native multi-database support inside one MCP process is tracked in [#7](https://github.com/boringSQL/dryrun/issues/7).

### Sharing snapshots across a team

DryRun's value increases in team setup. Multiple developers can pull snapshots from any POSIX compliant directory.

To publish the snapshots you need

```sh
cd project_name

# apture from the live DB (use cwd name for project name)
dryrun init --db "$DATABASE_URL"
dryrun snapshot take --db "$DATABASE_URL"
dryrun snapshot push --to-path ./snapshots --all
```

Developers can then import the snapshots to the local history

```sh
dryrun snapshot pull --from-path ./shared/snapshots --all
```

Snapshots are content-addressed (`{project}/{database}/{ts}-{hash}.json.zst`) and idempotent: pushing the same snapshot twice won't change it.

The simplest deployment is a dedicated git repo. Create the snapshots repo and add `*.json.zst binary` to `.gitattributes` so git stops trying to diff bundles.`

Offline tools (`lint`, `check_migration`, `drift`) work immediately after the pull.

No server, no credentials. Same promise as before.

## MCP server

Add `dryrun` to your AI assistant. If you installed via Homebrew, `dryrun` is already on your PATH:
Expand Down
201 changes: 184 additions & 17 deletions crates/dry_run_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::path::PathBuf;

use clap::{Parser, Subcommand};
use dry_run_core::history::{
DatabaseId, PutOutcome, SnapshotKey, SnapshotRef, SnapshotStore, TimeRange,
DatabaseId, FilesystemStore, PutOutcome, SnapshotKey, SnapshotKind, SnapshotRef, SnapshotStore,
TimeRange,
};
use dry_run_core::{DryRun, HistoryStore, ProjectConfig};
use rmcp::ServiceExt;
Expand Down Expand Up @@ -140,6 +141,26 @@ enum SnapshotAction {
#[arg(long)]
history_db: Option<PathBuf>,
},
Push {
#[arg(long)]
to_path: PathBuf,
#[arg(long)]
all: bool,
#[arg(long, env = "DATABASE_URL")]
db: Option<String>,
#[arg(long)]
history_db: Option<PathBuf>,
},
Pull {
#[arg(long)]
from_path: PathBuf,
#[arg(long)]
all: bool,
#[arg(long, env = "DATABASE_URL")]
db: Option<String>,
#[arg(long)]
history_db: Option<PathBuf>,
},
}

#[derive(Subcommand)]
Expand Down Expand Up @@ -342,7 +363,7 @@ schema_file = ".dryrun/schema.json"
.unwrap_or_else(|| ProjectConfig::parse(""))?;
let resolved = config.resolve_profile(Some(db_url), None, None, &cwd)?;
let key = complete_key(&resolved, &snapshot.database);
store.put(&key, &snapshot).await?;
store.put_schema(&key, &snapshot).await?;

let planner = ctx.introspect_planner_stats(&snapshot.content_hash).await?;
store.put_planner_stats(&key, &planner).await?;
Expand Down Expand Up @@ -478,7 +499,7 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
let resolved = config.resolve_profile(Some(db_url), None, profile, &cwd)?;
let key = complete_key(&resolved, &snapshot.database);

let schema_outcome = store.put(&key, &snapshot).await?;
let schema_outcome = store.put_schema(&key, &snapshot).await?;
match schema_outcome {
PutOutcome::Inserted => {
println!("Snapshot saved: {}", snapshot.content_hash);
Expand Down Expand Up @@ -603,7 +624,7 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
SnapshotAction::List { db, history_db } => {
let store = open_history_store(history_db.as_deref())?;
let key = resolve_read_key(db.as_deref(), profile).await?;
let rows = store.list(&key, TimeRange::default()).await?;
let rows = store.list_schema(&key, TimeRange::default()).await?;

if rows.is_empty() {
println!(
Expand Down Expand Up @@ -642,15 +663,19 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
let key = resolve_read_key(Some(db_url), profile).await?;

let from_snapshot = if let Some(hash) = &from {
store.get(&key, SnapshotRef::Hash(hash.clone())).await?
store
.get_schema(&key, SnapshotRef::Hash(hash.clone()))
.await?
} else if *latest {
store.get(&key, SnapshotRef::Latest).await?
store.get_schema(&key, SnapshotRef::Latest).await?
} else {
anyhow::bail!("specify --from <hash> or --latest");
};

let to_snapshot = if let Some(hash) = &to {
store.get(&key, SnapshotRef::Hash(hash.clone())).await?
store
.get_schema(&key, SnapshotRef::Hash(hash.clone()))
.await?
} else {
ctx.introspect_schema().await?
};
Expand All @@ -664,6 +689,52 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
println!("{json}");
Ok(())
}
SnapshotAction::Push {
to_path,
all,
db,
history_db,
} => {
let store = open_history_store(history_db.as_deref())?;
let fs = FilesystemStore::new(to_path.clone());

let keys = if *all {
store.list_keys()?
} else {
vec![resolve_read_key(db.as_deref(), profile).await?]
};
if keys.is_empty() {
println!("No snapshots in history.db to push.");
return Ok(());
}

let outcomes = sync_keys(&store, &fs, &keys).await?;
print_sync_outcomes("push", &outcomes, to_path);
Ok(())
}
SnapshotAction::Pull {
from_path,
all,
db,
history_db,
} => {
let fs = FilesystemStore::new(from_path.clone());
let store = open_history_store(history_db.as_deref())?;

let keys = if *all {
fs.list_keys()?
} else {
vec![resolve_read_key(db.as_deref(), profile).await?]
};
if keys.is_empty() {
println!("No snapshots at {} to pull.", from_path.display());
return Ok(());
}

let outcomes = sync_keys(&fs, &store, &keys).await?;
print_sync_outcomes("pull", &outcomes, from_path);
Ok(())
}
SnapshotAction::Export { out, history_db } => {
let store = open_history_store(history_db.as_deref())?;
let out_root = out.clone().unwrap_or_else(|| {
Expand All @@ -675,10 +746,10 @@ async fn cmd_snapshot(cli: &Cli, action: &SnapshotAction) -> anyhow::Result<()>
let keys = store.list_keys()?;
let mut written = 0usize;
for key in &keys {
let summaries = store.list(key, TimeRange::default()).await?;
let summaries = store.list_schema(key, TimeRange::default()).await?;
for s in &summaries {
let snap = store
.get(key, SnapshotRef::Hash(s.content_hash.clone()))
.get_schema(key, SnapshotRef::Hash(s.content_hash.clone()))
.await?;
write_snapshot_export(&out_root, key, &snap)?;
written += 1;
Expand Down Expand Up @@ -836,6 +907,108 @@ async fn cmd_drift(
Ok(())
}

#[derive(Debug, Default)]
struct KindCount {
copied: usize,
up_to_date: usize,
}

#[derive(Debug)]
struct SyncOutcome {
key: SnapshotKey,
schema: KindCount,
planner: KindCount,
activity: KindCount,
}

fn kind_order(k: &SnapshotKind) -> u8 {
match k {
SnapshotKind::Schema => 0,
SnapshotKind::Planner => 1,
SnapshotKind::Activity { .. } => 2,
}
}

async fn sync_keys(
src: &dyn SnapshotStore,
dst: &dyn SnapshotStore,
keys: &[SnapshotKey],
) -> anyhow::Result<Vec<SyncOutcome>> {
let mut outcomes = Vec::with_capacity(keys.len());
for key in keys {
let mut outcome = SyncOutcome {
key: key.clone(),
schema: KindCount::default(),
planner: KindCount::default(),
activity: KindCount::default(),
};

let mut kinds = src.list_kinds(key).await?;
// schema first so FilesystemStore's orphan rule is satisfied
kinds.sort_by_key(kind_order);

for kind in &kinds {
let src_summaries = src.list(key, kind, TimeRange::default()).await?;
let dst_hashes: std::collections::HashSet<String> = dst
.list(key, kind, TimeRange::default())
.await?
.into_iter()
.map(|s| s.content_hash)
.collect();

let counter = match kind {
SnapshotKind::Schema => &mut outcome.schema,
SnapshotKind::Planner => &mut outcome.planner,
SnapshotKind::Activity { .. } => &mut outcome.activity,
};

for s in src_summaries {
if dst_hashes.contains(&s.content_hash) {
counter.up_to_date += 1;
continue;
}
let stored = src
.get(key, kind, SnapshotRef::Hash(s.content_hash.clone()))
.await?;
match dst.put(key, &stored).await? {
PutOutcome::Inserted => counter.copied += 1,
PutOutcome::Deduped => counter.up_to_date += 1,
}
}
}

outcomes.push(outcome);
}
Ok(outcomes)
}

fn print_sync_outcomes(verb: &str, outcomes: &[SyncOutcome], path: &std::path::Path) {
let mut total = (0usize, 0usize, 0usize, 0usize);
for o in outcomes {
println!(
" project={} database={}: {} schema, {} planner, {} activity copied ({} up-to-date)",
o.key.project_id.0,
o.key.database_id.0,
o.schema.copied,
o.planner.copied,
o.activity.copied,
o.schema.up_to_date + o.planner.up_to_date + o.activity.up_to_date,
);
total.0 += o.schema.copied;
total.1 += o.planner.copied;
total.2 += o.activity.copied;
total.3 += o.schema.up_to_date + o.planner.up_to_date + o.activity.up_to_date;
}
println!(
"{verb}: {} schema, {} planner, {} activity copied / {} up-to-date ({})",
total.0,
total.1,
total.2,
total.3,
path.display(),
);
}

// helpers

fn require_db_url(db: Option<&str>) -> anyhow::Result<&str> {
Expand Down Expand Up @@ -934,14 +1107,8 @@ fn write_snapshot_export(
key: &SnapshotKey,
snap: &dry_run_core::SchemaSnapshot,
) -> anyhow::Result<PathBuf> {
let path = out_root
.join(&key.project_id.0)
.join(&key.database_id.0)
.join(format!(
"{}-{}.json.zst",
snap.timestamp.format("%Y%m%dT%H%M%SZ"),
snap.content_hash,
));
let path =
dry_run_core::history::snapshot_path(out_root, key, snap.timestamp, &snap.content_hash);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
Expand Down
8 changes: 4 additions & 4 deletions crates/dry_run_cli/src/mcp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn persist_refresh(
planner: Option<&dry_run_core::PlannerStatsSnapshot>,
activity_by_node: &std::collections::BTreeMap<String, dry_run_core::ActivityStatsSnapshot>,
) {
if let Err(e) = store.put(key, schema).await {
if let Err(e) = store.put_schema(key, schema).await {
tracing::warn!(error = %e, "failed to persist schema");
}
if let Some(p) = planner
Expand Down Expand Up @@ -834,18 +834,18 @@ impl DryRunServer {

let from_snapshot = match &params.from {
Some(hash) => store
.get(key, SnapshotRef::Hash(hash.clone()))
.get_schema(key, SnapshotRef::Hash(hash.clone()))
.await
.map_err(to_mcp_err)?,
None => store
.get(key, SnapshotRef::Latest)
.get_schema(key, SnapshotRef::Latest)
.await
.map_err(to_mcp_err)?,
};

let to_snapshot = match &params.to {
Some(hash) => store
.get(key, SnapshotRef::Hash(hash.clone()))
.get_schema(key, SnapshotRef::Hash(hash.clone()))
.await
.map_err(to_mcp_err)?,
None => self.get_schema().await?,
Expand Down
4 changes: 2 additions & 2 deletions crates/dry_run_cli/src/mcp/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ async fn rebuild_after_refresh_preserves_replica_activity() {
let schema = test_snapshot();
let schema_hash = schema.content_hash.clone();

SnapshotStore::put(&store, &key, &schema)
SnapshotStore::put_schema(&store, &key, &schema)
.await
.expect("seed schema");
let replica = make_activity_row(&schema_hash, "replica1", "replica-h1");
Expand Down Expand Up @@ -436,7 +436,7 @@ async fn reload_schema_prefers_history_over_json() {

let schema = test_snapshot();
let schema_hash = schema.content_hash.clone();
SnapshotStore::put(&store, &key, &schema)
SnapshotStore::put_schema(&store, &key, &schema)
.await
.expect("seed schema");
store
Expand Down
5 changes: 4 additions & 1 deletion crates/dry_run_cli/tests/init_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ async fn init_full_capture_writes_schema_planner_and_activity() {
database_id: DatabaseId("postgres".into()),
};

let summaries = store.list(&key, TimeRange::default()).await.expect("list");
let summaries = store
.list_schema(&key, TimeRange::default())
.await
.expect("list");
assert_eq!(
summaries.len(),
1,
Expand Down
Loading