From 723c8e911ec55f68129984fe0b7f02a46a10b1ad Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 1 Jul 2026 10:31:59 +0800 Subject: [PATCH 1/2] perf(mutation): don't snapshot the whole blob store on every add MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The serial crash-safe add path (#142) listed `.openkb/files` (the PageIndex blob store) in both the snapshot path set and `hardlink_dirs`, so every add hardlinked the entire store into the rollback backup — one `os.link` per existing blob, plus the matching unlink on commit — a cost that scales with total KB size, not with the document being added. On a filesystem without hardlink support (cross-device staging, some Windows / cloud-sync folders) `_hardlink_or_copy` fell back to a full byte copy of the whole store on *every* add. The blob store is append-only by `{doc_id}`: an add only ever creates the new doc's blob, and that name is assigned during indexing — after the snapshot is taken. So instead of snapshotting the whole tree up front, register just the new blob once indexing has run, via `MutationSnapshot.track_new()`, which records it with no backup and rewrites the active journal so both in-process rollback and crash recovery remove exactly this doc's artifacts. Cloud import never writes a local blob, so `.openkb/files` is dropped from its snapshot entirely (it was pure waste there). - `MutationSnapshot.track_new(paths)`: register post-snapshot-created paths for removal on rollback; persists to the journal for crash recovery. - add: drop `.openkb/files` from the eager snapshot + `hardlink_dirs`; call `track_new(files/*/*)` right after `index_long_document`. - cloud import: drop `.openkb/files` from `hardlink_dirs`. Tests: new-blob rollback removes exactly the doc's blob + images subtree and leaves existing blobs untouched (same inode); track_new persists so recover_pending_journals cleans a crashed add; `_snapshot_add_paths` no longer lists `.openkb/files`. Full suite green (pre-existing trafilatura-missing url_ingest failures aside). Claude-Session: https://claude.ai/code/session_018WiFnTo1YW9mtw47Fzir9K --- openkb/cli.py | 22 ++++++++++++-- openkb/mutation.py | 24 +++++++++++++++ tests/test_mutation.py | 69 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+), 3 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index f4297469..96e539db 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -362,13 +362,18 @@ def _snapshot_add_paths( final_raw: Path | None, final_source: Path | None, ) -> list[Path]: + # NOTE: .openkb/files (the PageIndex blob store) is intentionally NOT + # snapshotted here. It is append-only by {doc_id}, and the doc_id is only + # assigned during indexing (after this snapshot). Eagerly snapshotting the + # whole tree cost one os.link per existing blob on every add; instead the + # long-doc add path registers just the new blob via snapshot.track_new() + # once indexing has run. paths = [ kb_dir / ".openkb" / "hashes.json", kb_dir / ".openkb" / "pageindex.db", kb_dir / ".openkb" / "pageindex.db-wal", kb_dir / ".openkb" / "pageindex.db-shm", kb_dir / ".openkb" / "pageindex.db-journal", - kb_dir / ".openkb" / "files", kb_dir / "wiki" / "summaries" / f"{doc_name}.md", kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir / "wiki" / "sources" / "images" / doc_name, @@ -470,7 +475,6 @@ def _add_single_file_locked( hardlink_dirs={ kb_dir / "wiki" / "concepts", kb_dir / "wiki" / "entities", - kb_dir / ".openkb" / "files", }, ) publish_staged_tree(staging_dir, kb_dir) @@ -495,6 +499,17 @@ def _add_single_file_locked( logger.debug("Indexing traceback:", exc_info=True) raise + # Indexing just created the append-only blob(s) for this doc_id + # under .openkb/files//. Register them now (their names + # weren't known at snapshot time) so rollback + crash recovery + # remove exactly this doc's blob instead of us snapshotting the + # whole store up front. + files_root = kb_dir / ".openkb" / "files" + if files_root.exists(): + snapshot.track_new( + sorted(files_root.glob(f"*/{index_result.doc_id}*")) + ) + summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" _run_compile_with_retry( lambda: compile_long_doc( @@ -618,10 +633,11 @@ def import_from_pageindex_cloud( _snapshot_add_paths(kb_dir, doc_name, None, None), operation="cloud_import", details={"doc_id": doc_id, "doc_name": doc_name}, + # Cloud import reads from PageIndex Cloud and writes no local blob, + # so .openkb/files is never touched — nothing to snapshot there. hardlink_dirs={ kb_dir / "wiki" / "concepts", kb_dir / "wiki" / "entities", - kb_dir / ".openkb" / "files", }, ) summary_path = _write_long_doc_artifacts( diff --git a/openkb/mutation.py b/openkb/mutation.py index cb1353b5..8b190ebb 100644 --- a/openkb/mutation.py +++ b/openkb/mutation.py @@ -175,6 +175,30 @@ def mark_committed(self) -> None: """ self.write_journal("committed") + def track_new(self, paths: list[Path]) -> None: + """Register paths created *after* the snapshot for removal on rollback. + + Some artifacts get their final name only once the mutation runs — the + PageIndex ``{doc_id}`` blob under ``.openkb/files`` is named by indexing, + which happens after :func:`snapshot_paths`. Rather than eagerly + snapshotting the whole append-only blob store up front (an ``os.link`` + per existing blob on *every* add — O(total blobs), not O(this doc)), + the caller invokes this once the new artifacts exist. Each is recorded + with no backup, so both :meth:`rollback` and a crash-recovery replay + delete exactly the new paths and nothing else. The active journal is + rewritten so a crash after the artifacts land but before commit still + cleans them up. Paths already tracked are ignored; missing paths are a + no-op (nothing was created). + """ + changed = False + for path in paths: + target = path.resolve() + if target not in self.entries: + self.entries[target] = None + changed = True + if changed: + self.write_journal("active") + def rollback(self) -> None: # Restore children before parents so directory deletes cannot remove # paths that still need to be restored from a more specific backup. diff --git a/tests/test_mutation.py b/tests/test_mutation.py index f3dbf614..968179f7 100644 --- a/tests/test_mutation.py +++ b/tests/test_mutation.py @@ -518,3 +518,72 @@ def test_hardlinked_dir_rollback_prunes_new_nested_blob_dirs(tmp_path): assert existing.stat().st_ino == existing_inode # untouched, not recopied assert not (files / "col" / "newdoc.pdf").exists() assert not (files / "col" / "newdoc").exists() # empty new dir pruned + + +# --- track_new: cheap blob-store rollback without whole-tree snapshot ------- + +def test_track_new_removes_new_blob_on_rollback(tmp_path): + """The PageIndex blob under .openkb/files gets its {doc_id} name only once + indexing runs — after snapshot_paths. Instead of snapshotting the whole + (append-only) blob store up front, the add path calls track_new() with the + new artifacts; rollback must then remove exactly those and leave every + pre-existing blob untouched. + """ + kb_dir = tmp_path + blobs = kb_dir / ".openkb" / "files" / "col" + blobs.mkdir(parents=True) + existing = blobs / "old-doc.pdf" + existing.write_bytes(b"keep-me") + existing_inode = existing.stat().st_ino + + # Snapshot does NOT include .openkb/files at all. + snapshot = snapshot_paths(kb_dir, [kb_dir / "wiki"], operation="add", details={}) + + # Indexing creates the new blob + its images subtree (doc_id now known). + new_blob = blobs / "new-doc.pdf" + new_blob.write_bytes(b"remove-me") + new_images = blobs / "new-doc" + (new_images / "images").mkdir(parents=True) + (new_images / "images" / "p1.png").write_bytes(b"img") + + snapshot.track_new([new_blob, new_images]) + snapshot.rollback() + snapshot.discard_best_effort() + + assert not new_blob.exists() # new blob removed + assert not new_images.exists() # new images subtree removed + assert existing.read_bytes() == b"keep-me" # pre-existing untouched + assert existing.stat().st_ino == existing_inode # not recopied/relinked + + +def test_track_new_persists_to_journal_for_crash_recovery(tmp_path): + """track_new() must rewrite the active journal so a crash *after* indexing + but before commit still rolls back the new blob on the next exclusive-lock + acquisition (recover_pending_journals), not just via in-process rollback. + """ + kb_dir = tmp_path + blobs = kb_dir / ".openkb" / "files" / "col" + blobs.mkdir(parents=True) + + snapshot = snapshot_paths(kb_dir, [kb_dir / "wiki"], operation="add", details={}) + new_blob = blobs / "new-doc.pdf" + new_blob.write_bytes(b"remove-me") + snapshot.track_new([new_blob]) + # Simulate a crash: journal left "active", no rollback()/mark_committed(). + + messages = recover_pending_journals(kb_dir) + + assert not new_blob.exists() + assert any("Rolled back" in m for m in messages) + + +def test_snapshot_add_paths_excludes_blob_store(tmp_path): + """The blob store is registered lazily via track_new(), so it must NOT be + in the eager add snapshot path list (that was the O(total blobs)-per-add + cost this change removes).""" + from openkb.cli import _snapshot_add_paths + + paths = _snapshot_add_paths(tmp_path, "doc", None, None) + assert (tmp_path / ".openkb" / "files") not in paths + # hashes.json / pageindex.db are still snapshotted eagerly. + assert (tmp_path / ".openkb" / "hashes.json") in paths From 37496483b8e6aba1605f4129d730b6c2c528d688 Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 1 Jul 2026 11:43:46 +0800 Subject: [PATCH 2/2] fix(mutation): only track blobs this add created (dedup-hit rollback bug) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-review of the previous commit found a regression it introduced: track_new globbed `.openkb/files/*/*` and registered whatever matched for removal on rollback. But PageIndex content-dedups — `add_document` returns an EXISTING doc_id and writes no new blob when the same content is already indexed. If hashes.json and pageindex.db diverge (e.g. a prior `remove` whose PageIndex cleanup failed left the row + blob but dropped the hashes.json entry), re-adding that content makes col.add() return the OLD doc_id, so a subsequent compile failure would roll back and DELETE that prior document's blob. The old whole-store hardlink snapshot did not have this bug (a dedup-hit blob shares the backup inode and is left in place on rollback). Fix: capture the blob set *before* indexing and register only the paths this add actually created (set difference), guarded by `if index_result.doc_id`. That also neutralizes an unexpected empty/falsy doc_id, which would otherwise glob `*/*` and register — then delete on rollback — the entire blob store. Tests (tests/test_add_command.py): - test_long_doc_rollback_removes_only_the_new_blob: a failed long-doc add rolls back its own new blob + images subtree while a pre-existing blob survives. - test_long_doc_dedup_hit_does_not_delete_existing_blob: a dedup hit (existing doc_id, no new blob) must not delete the pre-existing blob on rollback — verified this test FAILS on the pre-fix code. Claude-Session: https://claude.ai/code/session_018WiFnTo1YW9mtw47Fzir9K --- openkb/cli.py | 33 +++++++++++----- tests/test_add_command.py | 83 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 10 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index 96e539db..4d850f04 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -488,6 +488,16 @@ def _add_single_file_locked( if result.raw_path is None: raise RuntimeError(f"Converted long document has no raw artifact: {file_path.name}") click.echo(" Long document detected — indexing with PageIndex...") + # PageIndex content-dedups: if the same content is already indexed + # (e.g. hashes.json and pageindex.db diverged after a remove whose + # PageIndex cleanup failed), col.add() returns the EXISTING doc_id + # and writes no new blob. Capture the blob set *before* indexing so + # we register only blobs THIS add actually created — otherwise + # rollback would delete a prior document's blob. + files_root = kb_dir / ".openkb" / "files" + blobs_before = ( + set(files_root.glob("*/*")) if files_root.exists() else set() + ) try: from openkb.indexer import index_long_document @@ -499,16 +509,19 @@ def _add_single_file_locked( logger.debug("Indexing traceback:", exc_info=True) raise - # Indexing just created the append-only blob(s) for this doc_id - # under .openkb/files//. Register them now (their names - # weren't known at snapshot time) so rollback + crash recovery - # remove exactly this doc's blob instead of us snapshotting the - # whole store up front. - files_root = kb_dir / ".openkb" / "files" - if files_root.exists(): - snapshot.track_new( - sorted(files_root.glob(f"*/{index_result.doc_id}*")) - ) + # Register only the newly-created blob artifacts for this doc (the + # {doc_id} file + its images dir) — the append-only store means the + # name isn't known until now — so rollback + crash recovery remove + # exactly this add's blob, never a pre-existing one, instead of + # snapshotting the whole store up front. The doc_id guard + the + # blobs_before diff keep a dedup hit (or an unexpected empty doc_id) + # from registering — and later deleting — existing blobs. + if index_result.doc_id and files_root.exists(): + snapshot.track_new([ + p + for p in files_root.glob(f"*/{index_result.doc_id}*") + if p not in blobs_before + ]) summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" _run_compile_with_retry( diff --git a/tests/test_add_command.py b/tests/test_add_command.py index a73ed4ac..e0cc1e5f 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -90,6 +90,89 @@ def test_add_single_file_compile_failure_rolls_back_converted_artifacts(self, tm assert not (kb_dir / "wiki" / "sources" / "notes.md").exists() assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + def _long_doc_conv(self, kb_dir, name, file_hash): + from openkb.converter import ConvertResult + + return ConvertResult( + raw_path=kb_dir / "raw" / f"{name}.pdf", + source_path=None, + is_long_doc=True, + file_hash=file_hash, + doc_name=name, + ) + + def test_long_doc_rollback_removes_only_the_new_blob(self, tmp_path): + """A failed long-doc add must roll back the blob IT created under + .openkb/files, while a pre-existing blob (another document) survives — + the targeted track_new must not touch blobs this add didn't create.""" + from openkb.cli import add_single_file + from openkb.indexer import IndexResult + + kb_dir = self._setup_kb(tmp_path) + files = kb_dir / ".openkb" / "files" / "default" + files.mkdir(parents=True) + other = files / "other-doc.pdf" + other.write_bytes(b"another-doc-keep-me") + + new_id = "11111111-1111-1111-1111-111111111111" + + def fake_index(raw_path, kb_dir_arg, doc_name=None): + (files / f"{new_id}.pdf").write_bytes(b"new-blob") + (files / new_id / "images").mkdir(parents=True) + (files / new_id / "images" / "p1.png").write_bytes(b"img") + return IndexResult(doc_id=new_id, description="", tree={"structure": []}) + + doc = tmp_path / "paper.pdf" + doc.write_bytes(b"%PDF-1.4 fake") + conv = self._long_doc_conv(kb_dir, "paper", "cafebabe00" * 8) + + with patch("openkb.cli.convert_document", return_value=conv), \ + patch("openkb.indexer.index_long_document", side_effect=fake_index), \ + patch("openkb.agent.compiler.compile_long_doc", + side_effect=RuntimeError("boom")), \ + patch("openkb.cli.time.sleep"), \ + patch("openkb.cli._setup_llm_key"): + outcome = add_single_file(doc, kb_dir) + + assert outcome == "failed" + assert not (files / f"{new_id}.pdf").exists() # new blob rolled back + assert not (files / new_id).exists() # new images subtree rolled back + assert other.read_bytes() == b"another-doc-keep-me" # pre-existing survives + + def test_long_doc_dedup_hit_does_not_delete_existing_blob(self, tmp_path): + """PageIndex content-dedup can return an EXISTING doc_id and write no new + blob (diverged hashes.json/pageindex.db). A failed add must NOT delete + that pre-existing blob on rollback (regression: track_new globbing the + doc_id would otherwise register and delete it).""" + from openkb.cli import add_single_file + from openkb.indexer import IndexResult + + kb_dir = self._setup_kb(tmp_path) + files = kb_dir / ".openkb" / "files" / "default" + files.mkdir(parents=True) + existing_id = "22222222-2222-2222-2222-222222222222" + existing_blob = files / f"{existing_id}.pdf" + existing_blob.write_bytes(b"pre-existing-do-not-delete") + + def fake_index_dedup(raw_path, kb_dir_arg, doc_name=None): + # Dedup hit: return the existing doc_id, create NO new blob. + return IndexResult(doc_id=existing_id, description="", tree={"structure": []}) + + doc = tmp_path / "dup.pdf" + doc.write_bytes(b"%PDF-1.4 dup") + conv = self._long_doc_conv(kb_dir, "dup", "feedface00" * 8) + + with patch("openkb.cli.convert_document", return_value=conv), \ + patch("openkb.indexer.index_long_document", side_effect=fake_index_dedup), \ + patch("openkb.agent.compiler.compile_long_doc", + side_effect=RuntimeError("boom")), \ + patch("openkb.cli.time.sleep"), \ + patch("openkb.cli._setup_llm_key"): + outcome = add_single_file(doc, kb_dir) + + assert outcome == "failed" + assert existing_blob.read_bytes() == b"pre-existing-do-not-delete" + def test_add_directory_calls_helper_for_each_file(self, tmp_path): kb_dir = self._setup_kb(tmp_path) docs_dir = tmp_path / "docs"