Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions src/mcp/mcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ enum {
#include "foundation/log.h"
#include "foundation/str_util.h"
#include "foundation/dump_verify.h"
#include "foundation/hash_table.h"
#include "foundation/compat_regex.h"
#include "pipeline/artifact.h"

Expand Down Expand Up @@ -271,6 +272,12 @@ typedef struct {
} tool_def_t;

static const tool_def_t TOOLS[] = {
{"ingest_dbt_manifest",
"Ingest a dbt target/manifest.json into the indexed project: adds Model/Source nodes and "
"DEPENDS_ON lineage edges from each node's ref()/source() dependencies. Index the "
"repository first, then call with the project name.",
"{\"type\":\"object\",\"properties\":{\"project\":{\"type\":\"string\"},\"manifest_path\":{"
"\"type\":\"string\"}},\"required\":[\"project\",\"manifest_path\"]}"},
{"index_repository",
"Index a repository into the knowledge graph. "
"Special mode 'cross-repo-intelligence': skip extraction, only match Routes/Channels "
Expand Down Expand Up @@ -4488,6 +4495,241 @@ static char *handle_ingest_traces(cbm_mcp_server_t *srv, const char *args) {
return result;
}

/* Read an entire file into a heap buffer (NUL-terminated). Caller frees. */
static char *dbt_read_file(const char *path, size_t *out_len) {
FILE *f = fopen(path, "rb");
if (!f) {
return NULL;
}
if (fseek(f, 0, SEEK_END) != 0) {
fclose(f);
return NULL;
}
long sz = ftell(f);
if (sz < 0) {
fclose(f);
return NULL;
}
rewind(f);
char *buf = malloc((size_t)sz + 1);
if (!buf) {
fclose(f);
return NULL;
}
size_t rd = fread(buf, 1, (size_t)sz, f);
fclose(f);
buf[rd] = '\0';
if (out_len) {
*out_len = rd;
}
return buf;
}

/* Map a dbt resource_type to a graph node label, or NULL to skip. */
static const char *dbt_label_for_resource(const char *rtype) {
if (!rtype) {
return NULL;
}
if (strcmp(rtype, "model") == 0) {
return "Model";
}
if (strcmp(rtype, "seed") == 0) {
return "Seed";
}
if (strcmp(rtype, "snapshot") == 0) {
return "Snapshot";
}
return NULL; /* tests / analyses / operations skipped */
}

/* Upsert one dbt manifest entry as a graph node. Returns node id (>0) or <=0. */
static int64_t dbt_upsert_node(cbm_store_t *store, const char *project, const char *label,
yyjson_val *nv, const char *uid) {
const char *name = yyjson_get_str(yyjson_obj_get(nv, "name"));
const char *path = yyjson_get_str(yyjson_obj_get(nv, "path"));
const char *pkg = yyjson_get_str(yyjson_obj_get(nv, "package_name"));
const char *mat = NULL;
yyjson_val *cfg = yyjson_obj_get(nv, "config");
if (cfg) {
mat = yyjson_get_str(yyjson_obj_get(cfg, "materialized"));
}
char props[CBM_SZ_512];
snprintf(props, sizeof(props),
"{\"source\":\"dbt\",\"package\":\"%s\",\"materialized\":\"%s\"}", pkg ? pkg : "",
mat ? mat : "");
cbm_node_t n;
memset(&n, 0, sizeof(n));
n.project = project;
n.label = label;
n.name = name ? name : uid;
n.qualified_name = uid; /* dbt unique_id, globally stable */
n.file_path = path;
n.start_line = 1;
n.end_line = 0;
n.properties_json = props;
return cbm_store_upsert_node(store, &n);
}

/* Ingest a dbt target/manifest.json into an already-indexed project: add
* Model/Source/Seed/Snapshot nodes and DEPENDS_ON edges from depends_on.nodes
* (the compiled ref()/source() lineage). Authoritative dbt lineage without
* parsing Jinja-templated SQL. */
static char *handle_ingest_dbt_manifest(cbm_mcp_server_t *srv, const char *args) {
(void)srv;
char *project = cbm_mcp_get_string_arg(args, "project");
char *manifest_path = cbm_mcp_get_string_arg(args, "manifest_path");
if (!project || !project[0] || !manifest_path || !manifest_path[0]) {
free(project);
free(manifest_path);
return cbm_mcp_text_result("ingest_dbt_manifest requires 'project' and 'manifest_path'",
true);
}

size_t buf_len = 0;
char *buf = dbt_read_file(manifest_path, &buf_len);
if (!buf) {
char msg[CBM_SZ_512];
snprintf(msg, sizeof(msg), "could not read manifest file: %s", manifest_path);
free(project);
free(manifest_path);
return cbm_mcp_text_result(msg, true);
}

yyjson_doc *doc = yyjson_read(buf, buf_len, 0);
if (!doc) {
free(buf);
free(project);
free(manifest_path);
return cbm_mcp_text_result("manifest.json is not valid JSON", true);
}
yyjson_val *root = yyjson_doc_get_root(doc);
yyjson_val *nodes = root ? yyjson_obj_get(root, "nodes") : NULL;
yyjson_val *sources = root ? yyjson_obj_get(root, "sources") : NULL;

char db_path[CBM_SZ_1K];
project_db_path(project, db_path, sizeof(db_path));
cbm_store_t *store = cbm_store_open_path(db_path);
if (!store) {
yyjson_doc_free(doc);
free(buf);
free(project);
free(manifest_path);
return cbm_mcp_text_result("could not open project store (index the repo first)", true);
}

CBMHashTable *idmap = cbm_ht_create(1024); /* dbt unique_id -> node id */
int n_nodes = 0, n_sources = 0, n_edges = 0;
cbm_store_begin(store);

/* Pass 1: model/seed/snapshot nodes. */
if (nodes && yyjson_is_obj(nodes)) {
yyjson_obj_iter it;
yyjson_obj_iter_init(nodes, &it);
yyjson_val *key;
while ((key = yyjson_obj_iter_next(&it))) {
const char *uid = yyjson_get_str(key);
yyjson_val *nv = yyjson_obj_iter_get_val(key);
const char *label =
dbt_label_for_resource(yyjson_get_str(yyjson_obj_get(nv, "resource_type")));
if (!uid || !label) {
continue;
}
int64_t id = dbt_upsert_node(store, project, label, nv, uid);
if (id > 0) {
cbm_ht_set(idmap, uid, (void *)(intptr_t)id);
n_nodes++;
}
}
}

/* Pass 1b: source nodes. */
if (sources && yyjson_is_obj(sources)) {
yyjson_obj_iter it;
yyjson_obj_iter_init(sources, &it);
yyjson_val *key;
while ((key = yyjson_obj_iter_next(&it))) {
const char *uid = yyjson_get_str(key);
yyjson_val *nv = yyjson_obj_iter_get_val(key);
if (!uid) {
continue;
}
int64_t id = dbt_upsert_node(store, project, "Source", nv, uid);
if (id > 0) {
cbm_ht_set(idmap, uid, (void *)(intptr_t)id);
n_sources++;
}
}
}

/* Pass 2: depends_on.nodes -> DEPENDS_ON edges. */
if (nodes && yyjson_is_obj(nodes)) {
yyjson_obj_iter it;
yyjson_obj_iter_init(nodes, &it);
yyjson_val *key;
while ((key = yyjson_obj_iter_next(&it))) {
const char *uid = yyjson_get_str(key);
yyjson_val *nv = yyjson_obj_iter_get_val(key);
if (!uid) {
continue;
}
void *src_v = cbm_ht_get(idmap, uid);
if (!src_v) {
continue;
}
yyjson_val *dep = yyjson_obj_get(nv, "depends_on");
yyjson_val *deps = dep ? yyjson_obj_get(dep, "nodes") : NULL;
if (!deps || !yyjson_is_arr(deps)) {
continue;
}
yyjson_arr_iter ait;
yyjson_arr_iter_init(deps, &ait);
yyjson_val *d;
while ((d = yyjson_arr_iter_next(&ait))) {
const char *duid = yyjson_get_str(d);
if (!duid) {
continue;
}
void *tgt_v = cbm_ht_get(idmap, duid);
if (!tgt_v || src_v == tgt_v) {
continue;
}
cbm_edge_t e;
memset(&e, 0, sizeof(e));
e.project = project;
e.source_id = (int64_t)(intptr_t)src_v;
e.target_id = (int64_t)(intptr_t)tgt_v;
e.type = "DEPENDS_ON";
e.properties_json = "{\"via\":\"dbt\"}";
if (cbm_store_insert_edge(store, &e) > 0) {
n_edges++;
}
}
}
}

cbm_store_commit(store);
cbm_store_close(store);
cbm_ht_free(idmap);
yyjson_doc_free(doc);
free(buf);

yyjson_mut_doc *out = yyjson_mut_doc_new(NULL);
yyjson_mut_val *oroot = yyjson_mut_obj(out);
yyjson_mut_doc_set_root(out, oroot);
yyjson_mut_obj_add_str(out, oroot, "status", "ingested");
yyjson_mut_obj_add_str(out, oroot, "project", project);
yyjson_mut_obj_add_int(out, oroot, "models", n_nodes);
yyjson_mut_obj_add_int(out, oroot, "sources", n_sources);
yyjson_mut_obj_add_int(out, oroot, "edges", n_edges);
char *ojson = yy_doc_to_str(out);
yyjson_mut_doc_free(out);
char *result = cbm_mcp_text_result(ojson, false);
free(ojson);
free(project);
free(manifest_path);
return result;
}

/* ── Tool dispatch ────────────────────────────────────────────── */

char *cbm_mcp_handle_tool(cbm_mcp_server_t *srv, const char *tool_name, const char *args_json) {
Expand Down Expand Up @@ -4539,6 +4781,9 @@ char *cbm_mcp_handle_tool(cbm_mcp_server_t *srv, const char *tool_name, const ch
if (strcmp(tool_name, "ingest_traces") == 0) {
return handle_ingest_traces(srv, args_json);
}
if (strcmp(tool_name, "ingest_dbt_manifest") == 0) {
return handle_ingest_dbt_manifest(srv, args_json);
}
char msg[CBM_SZ_256];
snprintf(msg, sizeof(msg), "unknown tool: %s", tool_name);
return cbm_mcp_text_result(msg, true);
Expand Down
Loading
Loading