From 5bca43a73b867ae80ebaa6dd39f1e98111a62806 Mon Sep 17 00:00:00 2001 From: alexisperinger-ux Date: Tue, 23 Jun 2026 13:35:00 +0200 Subject: [PATCH 1/3] feat(dbt): add ingest_dbt_manifest tool for Model/Source nodes and DEPENDS_ON lineage Signed-off-by: alexisperinger-ux (cherry picked from commit e6a7493cb14306533f0a15f44b868e92145ca3e5) Signed-off-by: alexisperinger-ux --- src/mcp/mcp.c | 244 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 244 insertions(+) diff --git a/src/mcp/mcp.c b/src/mcp/mcp.c index 7016a0d2..d7617169 100644 --- a/src/mcp/mcp.c +++ b/src/mcp/mcp.c @@ -55,6 +55,7 @@ enum { #include "foundation/log.h" #include "foundation/str_util.h" #include "foundation/dump_verify.h" +#include "foundation/hash_table.h" #include "foundation/compat_regex.h" #include "pipeline/artifact.h" @@ -271,6 +272,12 @@ typedef struct { } tool_def_t; static const tool_def_t TOOLS[] = { + {"ingest_dbt_manifest", + "Ingest a dbt target/manifest.json into the indexed project: adds Model/Source nodes and " + "DEPENDS_ON lineage edges from each node's ref()/source() dependencies. Index the " + "repository first, then call with the project name.", + "{\"type\":\"object\",\"properties\":{\"project\":{\"type\":\"string\"},\"manifest_path\":{" + "\"type\":\"string\"}},\"required\":[\"project\",\"manifest_path\"]}"}, {"index_repository", "Index a repository into the knowledge graph. " "Special mode 'cross-repo-intelligence': skip extraction, only match Routes/Channels " @@ -4488,6 +4495,240 @@ static char *handle_ingest_traces(cbm_mcp_server_t *srv, const char *args) { return result; } +/* Read an entire file into a heap buffer (NUL-terminated). Caller frees. */ +static char *dbt_read_file(const char *path, size_t *out_len) { + FILE *f = fopen(path, "rb"); + if (!f) { + return NULL; + } + if (fseek(f, 0, SEEK_END) != 0) { + fclose(f); + return NULL; + } + long sz = ftell(f); + if (sz < 0) { + fclose(f); + return NULL; + } + rewind(f); + char *buf = malloc((size_t)sz + 1); + if (!buf) { + fclose(f); + return NULL; + } + size_t rd = fread(buf, 1, (size_t)sz, f); + fclose(f); + buf[rd] = '\0'; + if (out_len) { + *out_len = rd; + } + return buf; +} + +/* Map a dbt resource_type to a graph node label, or NULL to skip. */ +static const char *dbt_label_for_resource(const char *rtype) { + if (!rtype) { + return NULL; + } + if (strcmp(rtype, "model") == 0) { + return "Model"; + } + if (strcmp(rtype, "seed") == 0) { + return "Seed"; + } + if (strcmp(rtype, "snapshot") == 0) { + return "Snapshot"; + } + return NULL; /* tests / analyses / operations skipped */ +} + +/* Upsert one dbt manifest entry as a graph node. Returns node id (>0) or <=0. */ +static int64_t dbt_upsert_node(cbm_store_t *store, const char *project, const char *label, + yyjson_val *nv, const char *uid) { + const char *name = yyjson_get_str(yyjson_obj_get(nv, "name")); + const char *path = yyjson_get_str(yyjson_obj_get(nv, "path")); + const char *pkg = yyjson_get_str(yyjson_obj_get(nv, "package_name")); + const char *mat = NULL; + yyjson_val *cfg = yyjson_obj_get(nv, "config"); + if (cfg) { + mat = yyjson_get_str(yyjson_obj_get(cfg, "materialized")); + } + char props[CBM_SZ_512]; + snprintf(props, sizeof(props), "{\"source\":\"dbt\",\"package\":\"%s\",\"materialized\":\"%s\"}", + pkg ? pkg : "", mat ? mat : ""); + cbm_node_t n; + memset(&n, 0, sizeof(n)); + n.project = project; + n.label = label; + n.name = name ? name : uid; + n.qualified_name = uid; /* dbt unique_id, globally stable */ + n.file_path = path; + n.start_line = 1; + n.end_line = 0; + n.properties_json = props; + return cbm_store_upsert_node(store, &n); +} + +/* Ingest a dbt target/manifest.json into an already-indexed project: add + * Model/Source/Seed/Snapshot nodes and DEPENDS_ON edges from depends_on.nodes + * (the compiled ref()/source() lineage). Authoritative dbt lineage without + * parsing Jinja-templated SQL. */ +static char *handle_ingest_dbt_manifest(cbm_mcp_server_t *srv, const char *args) { + (void)srv; + char *project = cbm_mcp_get_string_arg(args, "project"); + char *manifest_path = cbm_mcp_get_string_arg(args, "manifest_path"); + if (!project || !project[0] || !manifest_path || !manifest_path[0]) { + free(project); + free(manifest_path); + return cbm_mcp_text_result("ingest_dbt_manifest requires 'project' and 'manifest_path'", + true); + } + + size_t buf_len = 0; + char *buf = dbt_read_file(manifest_path, &buf_len); + if (!buf) { + char msg[CBM_SZ_512]; + snprintf(msg, sizeof(msg), "could not read manifest file: %s", manifest_path); + free(project); + free(manifest_path); + return cbm_mcp_text_result(msg, true); + } + + yyjson_doc *doc = yyjson_read(buf, buf_len, 0); + if (!doc) { + free(buf); + free(project); + free(manifest_path); + return cbm_mcp_text_result("manifest.json is not valid JSON", true); + } + yyjson_val *root = yyjson_doc_get_root(doc); + yyjson_val *nodes = root ? yyjson_obj_get(root, "nodes") : NULL; + yyjson_val *sources = root ? yyjson_obj_get(root, "sources") : NULL; + + char db_path[CBM_SZ_1K]; + project_db_path(project, db_path, sizeof(db_path)); + cbm_store_t *store = cbm_store_open_path(db_path); + if (!store) { + yyjson_doc_free(doc); + free(buf); + free(project); + free(manifest_path); + return cbm_mcp_text_result("could not open project store (index the repo first)", true); + } + + CBMHashTable *idmap = cbm_ht_create(1024); /* dbt unique_id -> node id */ + int n_nodes = 0, n_sources = 0, n_edges = 0; + cbm_store_begin(store); + + /* Pass 1: model/seed/snapshot nodes. */ + if (nodes && yyjson_is_obj(nodes)) { + yyjson_obj_iter it; + yyjson_obj_iter_init(nodes, &it); + yyjson_val *key; + while ((key = yyjson_obj_iter_next(&it))) { + const char *uid = yyjson_get_str(key); + yyjson_val *nv = yyjson_obj_iter_get_val(key); + const char *label = + dbt_label_for_resource(yyjson_get_str(yyjson_obj_get(nv, "resource_type"))); + if (!uid || !label) { + continue; + } + int64_t id = dbt_upsert_node(store, project, label, nv, uid); + if (id > 0) { + cbm_ht_set(idmap, uid, (void *)(intptr_t)id); + n_nodes++; + } + } + } + + /* Pass 1b: source nodes. */ + if (sources && yyjson_is_obj(sources)) { + yyjson_obj_iter it; + yyjson_obj_iter_init(sources, &it); + yyjson_val *key; + while ((key = yyjson_obj_iter_next(&it))) { + const char *uid = yyjson_get_str(key); + yyjson_val *nv = yyjson_obj_iter_get_val(key); + if (!uid) { + continue; + } + int64_t id = dbt_upsert_node(store, project, "Source", nv, uid); + if (id > 0) { + cbm_ht_set(idmap, uid, (void *)(intptr_t)id); + n_sources++; + } + } + } + + /* Pass 2: depends_on.nodes -> DEPENDS_ON edges. */ + if (nodes && yyjson_is_obj(nodes)) { + yyjson_obj_iter it; + yyjson_obj_iter_init(nodes, &it); + yyjson_val *key; + while ((key = yyjson_obj_iter_next(&it))) { + const char *uid = yyjson_get_str(key); + yyjson_val *nv = yyjson_obj_iter_get_val(key); + if (!uid) { + continue; + } + void *src_v = cbm_ht_get(idmap, uid); + if (!src_v) { + continue; + } + yyjson_val *dep = yyjson_obj_get(nv, "depends_on"); + yyjson_val *deps = dep ? yyjson_obj_get(dep, "nodes") : NULL; + if (!deps || !yyjson_is_arr(deps)) { + continue; + } + yyjson_arr_iter ait; + yyjson_arr_iter_init(deps, &ait); + yyjson_val *d; + while ((d = yyjson_arr_iter_next(&ait))) { + const char *duid = yyjson_get_str(d); + if (!duid) { + continue; + } + void *tgt_v = cbm_ht_get(idmap, duid); + if (!tgt_v || src_v == tgt_v) { + continue; + } + cbm_edge_t e; + memset(&e, 0, sizeof(e)); + e.project = project; + e.source_id = (int64_t)(intptr_t)src_v; + e.target_id = (int64_t)(intptr_t)tgt_v; + e.type = "DEPENDS_ON"; + e.properties_json = "{\"via\":\"dbt\"}"; + if (cbm_store_insert_edge(store, &e) > 0) { + n_edges++; + } + } + } + } + + cbm_store_commit(store); + cbm_store_close(store); + cbm_ht_free(idmap); + yyjson_doc_free(doc); + free(buf); + + yyjson_mut_doc *out = yyjson_mut_doc_new(NULL); + yyjson_mut_val *oroot = yyjson_mut_obj(out); + yyjson_mut_doc_set_root(out, oroot); + yyjson_mut_obj_add_str(out, oroot, "status", "ingested"); + yyjson_mut_obj_add_str(out, oroot, "project", project); + yyjson_mut_obj_add_int(out, oroot, "models", n_nodes); + yyjson_mut_obj_add_int(out, oroot, "sources", n_sources); + yyjson_mut_obj_add_int(out, oroot, "edges", n_edges); + char *ojson = yy_doc_to_str(out); + yyjson_mut_doc_free(out); + char *result = cbm_mcp_text_result(ojson, false); + free(ojson); + free(project); + free(manifest_path); + return result; +} + /* ── Tool dispatch ────────────────────────────────────────────── */ char *cbm_mcp_handle_tool(cbm_mcp_server_t *srv, const char *tool_name, const char *args_json) { @@ -4539,6 +4780,9 @@ char *cbm_mcp_handle_tool(cbm_mcp_server_t *srv, const char *tool_name, const ch if (strcmp(tool_name, "ingest_traces") == 0) { return handle_ingest_traces(srv, args_json); } + if (strcmp(tool_name, "ingest_dbt_manifest") == 0) { + return handle_ingest_dbt_manifest(srv, args_json); + } char msg[CBM_SZ_256]; snprintf(msg, sizeof(msg), "unknown tool: %s", tool_name); return cbm_mcp_text_result(msg, true); From 701436e9a983353be931249749bb624be4c925fd Mon Sep 17 00:00:00 2001 From: alexisperinger-ux Date: Tue, 23 Jun 2026 17:15:01 +0200 Subject: [PATCH 2/3] style(dbt): clang-format-20 reflow of manifest ingestion Signed-off-by: alexisperinger-ux (cherry picked from commit f96cc3966f30d135ef389d59c8ff16c59d656e0d) Signed-off-by: alexisperinger-ux --- src/mcp/mcp.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/mcp/mcp.c b/src/mcp/mcp.c index d7617169..437f14f0 100644 --- a/src/mcp/mcp.c +++ b/src/mcp/mcp.c @@ -4554,8 +4554,9 @@ static int64_t dbt_upsert_node(cbm_store_t *store, const char *project, const ch mat = yyjson_get_str(yyjson_obj_get(cfg, "materialized")); } char props[CBM_SZ_512]; - snprintf(props, sizeof(props), "{\"source\":\"dbt\",\"package\":\"%s\",\"materialized\":\"%s\"}", - pkg ? pkg : "", mat ? mat : ""); + snprintf(props, sizeof(props), + "{\"source\":\"dbt\",\"package\":\"%s\",\"materialized\":\"%s\"}", pkg ? pkg : "", + mat ? mat : ""); cbm_node_t n; memset(&n, 0, sizeof(n)); n.project = project; From 1c00136393b9893e0f2e1f7945b9a219b6ea7adb Mon Sep 17 00:00:00 2001 From: alexisperinger-ux Date: Tue, 23 Jun 2026 18:02:29 +0200 Subject: [PATCH 3/3] test(dbt): regression fixture for ingest_dbt_manifest (#576) Signed-off-by: alexisperinger-ux --- tests/test_mcp.c | 124 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/tests/test_mcp.c b/tests/test_mcp.c index 68edc0e9..a75a3c48 100644 --- a/tests/test_mcp.c +++ b/tests/test_mcp.c @@ -2209,6 +2209,129 @@ TEST(tool_bad_project_name_no_overflow_issue235) { } #undef ISSUE235_DBNAME +/* Issue #576: ingest_dbt_manifest parses a compiled dbt manifest.json and + * upserts model/seed/snapshot as graph nodes, source as Source nodes, and each + * node's depends_on.nodes as DEPENDS_ON edges, keyed by dbt unique_id. The + * handler opens (and creates) the project's on-disk store at + * /.db, so an isolated cache dir plus a small fixture + * manifest exercises the whole path. The fixture carries 3 models + 1 seed + + * 1 snapshot + 2 sources and one `test` node that must be skipped, so the + * reported counts pin both the resource-type filter and the lineage edges. */ +static const char *DBT_MANIFEST_FIXTURE = + "{\"nodes\":{" + "\"model.shop.stg_orders\":{\"resource_type\":\"model\",\"name\":\"stg_orders\"," + "\"depends_on\":{\"nodes\":[\"source.shop.raw.orders\"]}}," + "\"model.shop.stg_customers\":{\"resource_type\":\"model\",\"name\":\"stg_customers\"," + "\"depends_on\":{\"nodes\":[\"source.shop.raw.customers\"]}}," + "\"model.shop.fct_orders\":{\"resource_type\":\"model\",\"name\":\"fct_orders\"," + "\"depends_on\":{\"nodes\":[\"model.shop.stg_orders\",\"model.shop.stg_customers\"]}}," + "\"seed.shop.country_codes\":{\"resource_type\":\"seed\",\"name\":\"country_codes\"," + "\"depends_on\":{\"nodes\":[]}}," + "\"snapshot.shop.orders_snap\":{\"resource_type\":\"snapshot\",\"name\":\"orders_snap\"," + "\"depends_on\":{\"nodes\":[\"model.shop.fct_orders\"]}}," + "\"test.shop.not_null_fct_orders\":{\"resource_type\":\"test\",\"name\":\"not_null\"," + "\"depends_on\":{\"nodes\":[\"model.shop.fct_orders\"]}}" + "},\"sources\":{" + "\"source.shop.raw.orders\":{\"resource_type\":\"source\",\"name\":\"orders\"}," + "\"source.shop.raw.customers\":{\"resource_type\":\"source\",\"name\":\"customers\"}" + "}}"; + +TEST(tool_ingest_dbt_manifest_issue576) { + char cache[256]; + snprintf(cache, sizeof(cache), "/tmp/cbm-dbt-manifest-XXXXXX"); + if (!cbm_mkdtemp(cache)) { + PASS(); /* skip if mkdtemp fails */ + } + + const char *saved = getenv("CBM_CACHE_DIR"); + char *saved_copy = saved ? strdup(saved) : NULL; + cbm_setenv("CBM_CACHE_DIR", cache, 1); + + const char *project = "test-dbt-manifest"; + + /* Write the fixture manifest into the isolated cache dir. */ + char manifest_path[512]; + snprintf(manifest_path, sizeof(manifest_path), "%s/manifest.json", cache); + FILE *fp = fopen(manifest_path, "w"); + int wrote = 0; + if (fp) { + fputs(DBT_MANIFEST_FIXTURE, fp); + fclose(fp); + wrote = 1; + } + + /* The tool augments an existing project store (index_repository normally + * creates it). Pre-create the store at the path the tool reopens and + * register the project row, otherwise the FK-checked node upserts are + * rejected and nothing is ingested. */ + char dbpath[512]; + snprintf(dbpath, sizeof(dbpath), "%s/%s.db", cache, project); + cbm_store_t *seed = cbm_store_open_path(dbpath); + int seeded = 0; + if (seed) { + cbm_store_upsert_project(seed, project, cache); + cbm_store_close(seed); + seeded = 1; + } + + /* Call the tool through the MCP server and capture the reported counts. */ + int ok_status = 0, ok_models = 0, ok_sources = 0, ok_edges = 0; + if (wrote && seeded) { + char req[2048]; + snprintf(req, sizeof(req), + "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/call\",\"params\":{\"name\":" + "\"ingest_dbt_manifest\",\"arguments\":{\"project\":\"%s\"," + "\"manifest_path\":\"%s\"}}}", + project, manifest_path); + cbm_mcp_server_t *srv = cbm_mcp_server_new(NULL); + if (srv) { + char *resp = cbm_mcp_server_handle(srv, req); + if (resp) { + char *inner = extract_text_content(resp); + if (inner) { + ok_status = strstr(inner, "\"status\":\"ingested\"") != NULL; + ok_models = strstr(inner, "\"models\":5") != NULL; + ok_sources = strstr(inner, "\"sources\":2") != NULL; + ok_edges = strstr(inner, "\"edges\":5") != NULL; + free(inner); + } + free(resp); + } + cbm_mcp_server_free(srv); + } + } + + /* Restore env and clean up BEFORE asserting, so a failed assertion never + * leaks the overridden CBM_CACHE_DIR into the rest of the suite. */ + if (saved_copy) { + cbm_setenv("CBM_CACHE_DIR", saved_copy, 1); + free(saved_copy); + } else { + cbm_unsetenv("CBM_CACHE_DIR"); + } + char dbwal[576]; + char dbshm[576]; + snprintf(dbwal, sizeof(dbwal), "%s-wal", dbpath); + snprintf(dbshm, sizeof(dbshm), "%s-shm", dbpath); + cbm_unlink(dbwal); + cbm_unlink(dbshm); + cbm_unlink(dbpath); + cbm_unlink(manifest_path); + cbm_rmdir(cache); + + ASSERT(wrote); + ASSERT(seeded); + /* 3 models + 1 seed + 1 snapshot = 5 lineage-bearing nodes; the `test` + * node is skipped (so models is 5, not 6). */ + ASSERT(ok_status); + ASSERT(ok_models); + ASSERT(ok_sources); + /* stg_orders/stg_customers -> source, fct_orders -> 2 stg, snap -> fct = 5 + * DEPENDS_ON edges; the skipped test node contributes none. */ + ASSERT(ok_edges); + PASS(); +} + /* ══════════════════════════════════════════════════════════════════ * SUITE * ══════════════════════════════════════════════════════════════════ */ @@ -2283,6 +2406,7 @@ SUITE(mcp) { RUN_TEST(tool_search_graph_includes_node_properties); RUN_TEST(tool_search_graph_query_honors_file_pattern_issue552); RUN_TEST(tool_query_graph_basic); + RUN_TEST(tool_ingest_dbt_manifest_issue576); RUN_TEST(tool_index_status_no_project); RUN_TEST(tool_index_status_includes_git_metadata);