From 37caace215b191b527cfde6f32a416647c7facb9 Mon Sep 17 00:00:00 2001 From: Abhishek Yadav Date: Sun, 22 Mar 2026 20:49:01 +0530 Subject: [PATCH] feat: add Database.merge() and sqlite-utils merge command Implements the ability to merge tables from one or more source SQLite databases into a destination database, as requested in #491. Python API: db.merge([src1, src2], alter=True, replace=False, ignore=False, tables=None) - source_dbs can be Database objects or file paths - Tables not in dest are created; existing tables have rows inserted - alter=True adds missing columns to existing destination tables - replace=True overwrites rows with matching primary keys - ignore=True skips rows with conflicting primary keys - tables= limits which tables are merged - Virtual tables and their shadow tables are automatically skipped CLI: sqlite-utils merge combined.db one.db two.db [options] - Supports --alter, --replace, --ignore, --pk, --table, --load-extension Closes #491 --- docs/cli.rst | 39 +++++++ sqlite_utils/cli.py | 57 ++++++++++ sqlite_utils/db.py | 71 +++++++++++++ tests/test_merge.py | 251 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 418 insertions(+) create mode 100644 tests/test_merge.py diff --git a/docs/cli.rst b/docs/cli.rst index a6081609..167aa11a 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -1057,6 +1057,45 @@ That will look for SpatiaLite in a set of predictable locations. To load it from sqlite-utils create-database empty.db --init-spatialite --load-extension /path/to/spatialite.so +.. _cli_merge: + +Merging databases +================= + +Use ``sqlite-utils merge`` to merge tables from one or more source databases into a destination database. + +Tables that do not exist in the destination are created. Tables that already exist have their rows inserted. + +.. code-block:: bash + + sqlite-utils merge combined.db one.db two.db + +To automatically add missing columns to existing destination tables, use ``--alter``: + +.. code-block:: bash + + sqlite-utils merge combined.db one.db two.db --alter + +To replace rows that have conflicting primary keys, use ``--replace``: + +.. code-block:: bash + + sqlite-utils merge combined.db one.db two.db --replace + +To skip rows that have conflicting primary keys, use ``--ignore``: + +.. code-block:: bash + + sqlite-utils merge combined.db one.db two.db --ignore + +To merge only specific tables, use ``--table`` (can be specified multiple times): + +.. code-block:: bash + + sqlite-utils merge combined.db one.db two.db --table mytable + +Virtual tables (such as FTS indexes) and their shadow tables are automatically skipped. + .. _cli_inserting_data: Inserting JSON data diff --git a/sqlite_utils/cli.py b/sqlite_utils/cli.py index 9b9ee20e..2d28b559 100644 --- a/sqlite_utils/cli.py +++ b/sqlite_utils/cli.py @@ -1541,6 +1541,63 @@ def create_database(path, enable_wal, init_spatialite, load_extension): db.vacuum() +@cli.command(name="merge") +@click.argument( + "path", + type=click.Path(file_okay=True, dir_okay=False, allow_dash=False), + required=True, +) +@click.argument( + "sources", + type=click.Path(file_okay=True, dir_okay=False, allow_dash=False, exists=True), + nargs=-1, + required=True, +) +@click.option("pks", "--pk", help="Column to use as primary key", multiple=True) +@click.option("--alter", is_flag=True, help="Alter destination tables to add any missing columns") +@click.option( + "--replace", is_flag=True, help="Replace rows with matching primary keys" +) +@click.option( + "--ignore", is_flag=True, help="Ignore rows with conflicting primary keys" +) +@click.option( + "tables", + "--table", + help="Specific tables to merge (can be specified multiple times)", + multiple=True, +) +@load_extension_option +def merge_cmd(path, sources, pks, alter, replace, ignore, tables, load_extension): + """ + Merge tables from one or more SOURCE databases into a DEST database. + + Tables that do not exist in DEST are created. Tables that already exist + have their rows inserted. Use --alter to add missing columns automatically. + + Example: + + \b + sqlite-utils merge combined.db one.db two.db + sqlite-utils merge combined.db one.db two.db --alter + sqlite-utils merge combined.db one.db two.db --replace --table mytable + """ + db = sqlite_utils.Database(path) + _register_db_for_cleanup(db) + _load_extensions(db, load_extension) + try: + db.merge( + sources, + pk=list(pks) if pks else None, + alter=alter, + replace=replace, + ignore=ignore, + tables=list(tables) if tables else None, + ) + except OperationalError as e: + raise click.ClickException(str(e)) + + @cli.command(name="create-table") @click.argument( "path", diff --git a/sqlite_utils/db.py b/sqlite_utils/db.py index aacdc893..fe3345d2 100644 --- a/sqlite_utils/db.py +++ b/sqlite_utils/db.py @@ -535,6 +535,77 @@ def attach(self, alias: str, filepath: Union[str, pathlib.Path]) -> None: ).strip() self.execute(attach_sql) + def merge( + self, + sources: Iterable[Union[str, pathlib.Path, "Database"]], + *, + pk: Optional[Any] = None, + alter: bool = False, + replace: bool = False, + ignore: bool = False, + tables: Optional[Iterable[str]] = None, + ) -> "Database": + """ + Merge tables from one or more source databases into this database. + + Tables that do not exist in the destination are created with the source + schema and all rows. Tables that already exist have rows inserted into + them. Use ``alter=True`` to automatically add any missing columns to + existing destination tables. + + Virtual tables (e.g. FTS indexes) in source databases are skipped. + + :param sources: One or more source databases. Each item may be a + ``Database`` instance, or a path to a SQLite database file. + :param pk: Primary key column(s) to use for all merged tables. When + ``None``, each source table's own primary key(s) are used. + :param alter: Add any missing columns to existing destination tables. + :param replace: Replace rows whose primary key already exists in the + destination table. + :param ignore: Skip rows whose primary key already exists in the + destination table. + :param tables: If provided, only merge these named tables. Tables + listed here that do not exist in a particular source are silently + skipped. + :return: ``self`` (the destination database). + """ + for source in sources: + if isinstance(source, (str, pathlib.Path)): + source = Database(source) + source_table_names = source.table_names() + # Collect virtual table names so their shadow tables can be skipped too. + virtual_table_names = { + name + for name in source_table_names + if source.table(name).virtual_table_using is not None + } + names_to_merge = list(tables) if tables is not None else source_table_names + for table_name in names_to_merge: + if table_name not in source_table_names: + continue + source_table = source.table(table_name) + # Skip virtual tables (e.g. FTS indexes). + if source_table.virtual_table_using is not None: + continue + # Skip shadow tables created by virtual tables (e.g. docs_fts_data). + if any(table_name.startswith(vt + "_") for vt in virtual_table_names): + continue + if pk is not None: + effective_pk: Any = pk[0] if len(pk) == 1 else list(pk) + elif source_table.use_rowid: + effective_pk = None + else: + source_pks = source_table.pks + effective_pk = source_pks[0] if len(source_pks) == 1 else source_pks + self[table_name].insert_all( + source_table.rows, + pk=effective_pk, + alter=alter, + replace=replace, + ignore=ignore, + ) + return self + def query( self, sql: str, params: Optional[Union[Sequence, Dict[str, Any]]] = None ) -> Generator[dict, None, None]: diff --git a/tests/test_merge.py b/tests/test_merge.py new file mode 100644 index 00000000..d5099e4d --- /dev/null +++ b/tests/test_merge.py @@ -0,0 +1,251 @@ +import pytest +from click.testing import CliRunner +from sqlite_utils import Database, cli + + +# --------------------------------------------------------------------------- +# Python API tests +# --------------------------------------------------------------------------- + + +def test_merge_basic(tmpdir): + """Tables from source databases are created in the destination.""" + dest = Database(str(tmpdir / "dest.db")) + src1 = Database(str(tmpdir / "src1.db")) + src2 = Database(str(tmpdir / "src2.db")) + + src1["cats"].insert_all([{"id": 1, "name": "Socks"}, {"id": 2, "name": "Mittens"}], pk="id") + src2["dogs"].insert_all([{"id": 1, "name": "Rex"}], pk="id") + + dest.merge([str(tmpdir / "src1.db"), str(tmpdir / "src2.db")]) + + assert set(dest.table_names()) == {"cats", "dogs"} + assert list(dest["cats"].rows) == [{"id": 1, "name": "Socks"}, {"id": 2, "name": "Mittens"}] + assert list(dest["dogs"].rows) == [{"id": 1, "name": "Rex"}] + + +def test_merge_appends_rows_to_existing_table(tmpdir): + """Rows from source are appended to existing destination table.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + + dest["items"].insert_all([{"id": 1, "name": "a"}], pk="id") + src["items"].insert_all([{"id": 2, "name": "b"}, {"id": 3, "name": "c"}], pk="id") + + dest.merge([src]) + + rows = list(dest["items"].rows) + assert len(rows) == 3 + assert {"id": 2, "name": "b"} in rows + + +def test_merge_replace(tmpdir): + """--replace causes conflicting rows to be overwritten.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + + dest["items"].insert_all([{"id": 1, "val": "original"}], pk="id") + src["items"].insert_all([{"id": 1, "val": "updated"}], pk="id") + + dest.merge([src], replace=True) + + assert list(dest["items"].rows) == [{"id": 1, "val": "updated"}] + + +def test_merge_ignore(tmpdir): + """--ignore causes conflicting rows to be silently skipped.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + + dest["items"].insert_all([{"id": 1, "val": "original"}], pk="id") + src["items"].insert_all([{"id": 1, "val": "updated"}, {"id": 2, "val": "new"}], pk="id") + + dest.merge([src], ignore=True) + + rows = {r["id"]: r["val"] for r in dest["items"].rows} + assert rows[1] == "original" # not overwritten + assert rows[2] == "new" # new row inserted + + +def test_merge_alter_adds_missing_columns(tmpdir): + """alter=True adds columns that exist in source but not in destination.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + + dest["items"].insert_all([{"id": 1, "name": "a"}], pk="id") + src["items"].insert_all([{"id": 2, "name": "b", "extra": "bonus"}], pk="id") + + dest.merge([src], alter=True) + + assert "extra" in dest["items"].columns_dict + row = next(r for r in dest["items"].rows if r["id"] == 2) + assert row["extra"] == "bonus" + + +def test_merge_specific_tables(tmpdir): + """tables= parameter limits which tables are merged.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + + src["wanted"].insert_all([{"id": 1}], pk="id") + src["unwanted"].insert_all([{"id": 99}], pk="id") + + dest.merge([src], tables=["wanted"]) + + assert "wanted" in dest.table_names() + assert "unwanted" not in dest.table_names() + + +def test_merge_table_not_in_source_is_skipped(tmpdir): + """Tables listed in tables= that don't exist in a source are silently skipped.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + src["existing"].insert({"id": 1}) + + # Should not raise even though "missing" doesn't exist in src + dest.merge([src], tables=["existing", "missing"]) + + assert "existing" in dest.table_names() + + +def test_merge_multiple_sources(tmpdir): + """Rows from multiple source DBs are all merged into destination.""" + dest = Database(str(tmpdir / "dest.db")) + srcs = [] + for i in range(3): + path = str(tmpdir / f"src{i}.db") + db = Database(path) + db["nums"].insert({"id": i, "val": i * 10}, pk="id") + srcs.append(path) + + dest.merge(srcs) + + assert list(sorted(dest["nums"].rows, key=lambda r: r["id"])) == [ + {"id": 0, "val": 0}, + {"id": 1, "val": 10}, + {"id": 2, "val": 20}, + ] + + +def test_merge_skips_virtual_tables(tmpdir): + """Virtual tables (e.g. FTS) in source are silently skipped.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + + src["docs"].insert_all([{"id": 1, "body": "hello world"}], pk="id") + src["docs"].enable_fts(["body"]) + + dest.merge([src]) + + # Normal table merged, FTS virtual table skipped + assert "docs" in dest.table_names() + fts_tables = [t for t in dest.table_names() if "fts" in t.lower()] + assert fts_tables == [] + + +def test_merge_accepts_database_objects(tmpdir): + """Source can be a Database object instead of a file path.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + src["items"].insert({"id": 1, "val": "x"}, pk="id") + + dest.merge([src]) + + assert list(dest["items"].rows) == [{"id": 1, "val": "x"}] + + +def test_merge_returns_self(tmpdir): + """merge() returns the destination Database for chaining.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + src["t"].insert({"x": 1}) + + result = dest.merge([src]) + + assert result is dest + + +def test_merge_no_pk_table(tmpdir): + """Tables without an explicit primary key are merged without conflicts.""" + dest = Database(str(tmpdir / "dest.db")) + src = Database(str(tmpdir / "src.db")) + + src["log"].insert_all([{"msg": "a"}, {"msg": "b"}]) # no pk + + dest.merge([src]) + + assert len(list(dest["log"].rows)) == 2 + + +# --------------------------------------------------------------------------- +# CLI tests +# --------------------------------------------------------------------------- + + +def test_cli_merge_basic(tmpdir): + """CLI merge creates destination and copies tables from sources.""" + src1_path = str(tmpdir / "src1.db") + src2_path = str(tmpdir / "src2.db") + dest_path = str(tmpdir / "dest.db") + + Database(src1_path)["cats"].insert_all([{"id": 1, "name": "Socks"}], pk="id") + Database(src2_path)["dogs"].insert_all([{"id": 1, "name": "Rex"}], pk="id") + + result = CliRunner().invoke(cli.cli, ["merge", dest_path, src1_path, src2_path]) + assert result.exit_code == 0, result.output + + dest = Database(dest_path) + assert set(dest.table_names()) == {"cats", "dogs"} + + +def test_cli_merge_alter(tmpdir): + """CLI merge --alter adds missing columns.""" + src_path = str(tmpdir / "src.db") + dest_path = str(tmpdir / "dest.db") + + Database(dest_path)["items"].insert({"id": 1, "name": "a"}, pk="id") + Database(src_path)["items"].insert({"id": 2, "name": "b", "extra": "x"}, pk="id") + + result = CliRunner().invoke(cli.cli, ["merge", dest_path, src_path, "--alter"]) + assert result.exit_code == 0, result.output + assert "extra" in Database(dest_path)["items"].columns_dict + + +def test_cli_merge_replace(tmpdir): + """CLI merge --replace overwrites conflicting rows.""" + src_path = str(tmpdir / "src.db") + dest_path = str(tmpdir / "dest.db") + + Database(dest_path)["items"].insert({"id": 1, "val": "old"}, pk="id") + Database(src_path)["items"].insert({"id": 1, "val": "new"}, pk="id") + + CliRunner().invoke(cli.cli, ["merge", dest_path, src_path, "--replace"]) + assert list(Database(dest_path)["items"].rows) == [{"id": 1, "val": "new"}] + + +def test_cli_merge_ignore(tmpdir): + """CLI merge --ignore skips conflicting rows.""" + src_path = str(tmpdir / "src.db") + dest_path = str(tmpdir / "dest.db") + + Database(dest_path)["items"].insert({"id": 1, "val": "original"}, pk="id") + Database(src_path)["items"].insert({"id": 1, "val": "new"}, pk="id") + + CliRunner().invoke(cli.cli, ["merge", dest_path, src_path, "--ignore"]) + assert list(Database(dest_path)["items"].rows) == [{"id": 1, "val": "original"}] + + +def test_cli_merge_table_filter(tmpdir): + """CLI merge --table limits which tables are merged.""" + src_path = str(tmpdir / "src.db") + dest_path = str(tmpdir / "dest.db") + + src = Database(src_path) + src["wanted"].insert({"id": 1}) + src["unwanted"].insert({"id": 2}) + + CliRunner().invoke(cli.cli, ["merge", dest_path, src_path, "--table", "wanted"]) + + dest = Database(dest_path) + assert "wanted" in dest.table_names() + assert "unwanted" not in dest.table_names()