diff --git a/openkb/cli.py b/openkb/cli.py index f4297469..4d850f04 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -362,13 +362,18 @@ def _snapshot_add_paths( final_raw: Path | None, final_source: Path | None, ) -> list[Path]: + # NOTE: .openkb/files (the PageIndex blob store) is intentionally NOT + # snapshotted here. It is append-only by {doc_id}, and the doc_id is only + # assigned during indexing (after this snapshot). Eagerly snapshotting the + # whole tree cost one os.link per existing blob on every add; instead the + # long-doc add path registers just the new blob via snapshot.track_new() + # once indexing has run. paths = [ kb_dir / ".openkb" / "hashes.json", kb_dir / ".openkb" / "pageindex.db", kb_dir / ".openkb" / "pageindex.db-wal", kb_dir / ".openkb" / "pageindex.db-shm", kb_dir / ".openkb" / "pageindex.db-journal", - kb_dir / ".openkb" / "files", kb_dir / "wiki" / "summaries" / f"{doc_name}.md", kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir / "wiki" / "sources" / "images" / doc_name, @@ -470,7 +475,6 @@ def _add_single_file_locked( hardlink_dirs={ kb_dir / "wiki" / "concepts", kb_dir / "wiki" / "entities", - kb_dir / ".openkb" / "files", }, ) publish_staged_tree(staging_dir, kb_dir) @@ -484,6 +488,16 @@ def _add_single_file_locked( if result.raw_path is None: raise RuntimeError(f"Converted long document has no raw artifact: {file_path.name}") click.echo(" Long document detected — indexing with PageIndex...") + # PageIndex content-dedups: if the same content is already indexed + # (e.g. hashes.json and pageindex.db diverged after a remove whose + # PageIndex cleanup failed), col.add() returns the EXISTING doc_id + # and writes no new blob. Capture the blob set *before* indexing so + # we register only blobs THIS add actually created — otherwise + # rollback would delete a prior document's blob. + files_root = kb_dir / ".openkb" / "files" + blobs_before = ( + set(files_root.glob("*/*")) if files_root.exists() else set() + ) try: from openkb.indexer import index_long_document @@ -495,6 +509,20 @@ def _add_single_file_locked( logger.debug("Indexing traceback:", exc_info=True) raise + # Register only the newly-created blob artifacts for this doc (the + # {doc_id} file + its images dir) — the append-only store means the + # name isn't known until now — so rollback + crash recovery remove + # exactly this add's blob, never a pre-existing one, instead of + # snapshotting the whole store up front. The doc_id guard + the + # blobs_before diff keep a dedup hit (or an unexpected empty doc_id) + # from registering — and later deleting — existing blobs. + if index_result.doc_id and files_root.exists(): + snapshot.track_new([ + p + for p in files_root.glob(f"*/{index_result.doc_id}*") + if p not in blobs_before + ]) + summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" _run_compile_with_retry( lambda: compile_long_doc( @@ -618,10 +646,11 @@ def import_from_pageindex_cloud( _snapshot_add_paths(kb_dir, doc_name, None, None), operation="cloud_import", details={"doc_id": doc_id, "doc_name": doc_name}, + # Cloud import reads from PageIndex Cloud and writes no local blob, + # so .openkb/files is never touched — nothing to snapshot there. hardlink_dirs={ kb_dir / "wiki" / "concepts", kb_dir / "wiki" / "entities", - kb_dir / ".openkb" / "files", }, ) summary_path = _write_long_doc_artifacts( diff --git a/openkb/mutation.py b/openkb/mutation.py index cb1353b5..8b190ebb 100644 --- a/openkb/mutation.py +++ b/openkb/mutation.py @@ -175,6 +175,30 @@ def mark_committed(self) -> None: """ self.write_journal("committed") + def track_new(self, paths: list[Path]) -> None: + """Register paths created *after* the snapshot for removal on rollback. + + Some artifacts get their final name only once the mutation runs — the + PageIndex ``{doc_id}`` blob under ``.openkb/files`` is named by indexing, + which happens after :func:`snapshot_paths`. Rather than eagerly + snapshotting the whole append-only blob store up front (an ``os.link`` + per existing blob on *every* add — O(total blobs), not O(this doc)), + the caller invokes this once the new artifacts exist. Each is recorded + with no backup, so both :meth:`rollback` and a crash-recovery replay + delete exactly the new paths and nothing else. The active journal is + rewritten so a crash after the artifacts land but before commit still + cleans them up. Paths already tracked are ignored; missing paths are a + no-op (nothing was created). + """ + changed = False + for path in paths: + target = path.resolve() + if target not in self.entries: + self.entries[target] = None + changed = True + if changed: + self.write_journal("active") + def rollback(self) -> None: # Restore children before parents so directory deletes cannot remove # paths that still need to be restored from a more specific backup. diff --git a/tests/test_add_command.py b/tests/test_add_command.py index a73ed4ac..e0cc1e5f 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -90,6 +90,89 @@ def test_add_single_file_compile_failure_rolls_back_converted_artifacts(self, tm assert not (kb_dir / "wiki" / "sources" / "notes.md").exists() assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + def _long_doc_conv(self, kb_dir, name, file_hash): + from openkb.converter import ConvertResult + + return ConvertResult( + raw_path=kb_dir / "raw" / f"{name}.pdf", + source_path=None, + is_long_doc=True, + file_hash=file_hash, + doc_name=name, + ) + + def test_long_doc_rollback_removes_only_the_new_blob(self, tmp_path): + """A failed long-doc add must roll back the blob IT created under + .openkb/files, while a pre-existing blob (another document) survives — + the targeted track_new must not touch blobs this add didn't create.""" + from openkb.cli import add_single_file + from openkb.indexer import IndexResult + + kb_dir = self._setup_kb(tmp_path) + files = kb_dir / ".openkb" / "files" / "default" + files.mkdir(parents=True) + other = files / "other-doc.pdf" + other.write_bytes(b"another-doc-keep-me") + + new_id = "11111111-1111-1111-1111-111111111111" + + def fake_index(raw_path, kb_dir_arg, doc_name=None): + (files / f"{new_id}.pdf").write_bytes(b"new-blob") + (files / new_id / "images").mkdir(parents=True) + (files / new_id / "images" / "p1.png").write_bytes(b"img") + return IndexResult(doc_id=new_id, description="", tree={"structure": []}) + + doc = tmp_path / "paper.pdf" + doc.write_bytes(b"%PDF-1.4 fake") + conv = self._long_doc_conv(kb_dir, "paper", "cafebabe00" * 8) + + with patch("openkb.cli.convert_document", return_value=conv), \ + patch("openkb.indexer.index_long_document", side_effect=fake_index), \ + patch("openkb.agent.compiler.compile_long_doc", + side_effect=RuntimeError("boom")), \ + patch("openkb.cli.time.sleep"), \ + patch("openkb.cli._setup_llm_key"): + outcome = add_single_file(doc, kb_dir) + + assert outcome == "failed" + assert not (files / f"{new_id}.pdf").exists() # new blob rolled back + assert not (files / new_id).exists() # new images subtree rolled back + assert other.read_bytes() == b"another-doc-keep-me" # pre-existing survives + + def test_long_doc_dedup_hit_does_not_delete_existing_blob(self, tmp_path): + """PageIndex content-dedup can return an EXISTING doc_id and write no new + blob (diverged hashes.json/pageindex.db). A failed add must NOT delete + that pre-existing blob on rollback (regression: track_new globbing the + doc_id would otherwise register and delete it).""" + from openkb.cli import add_single_file + from openkb.indexer import IndexResult + + kb_dir = self._setup_kb(tmp_path) + files = kb_dir / ".openkb" / "files" / "default" + files.mkdir(parents=True) + existing_id = "22222222-2222-2222-2222-222222222222" + existing_blob = files / f"{existing_id}.pdf" + existing_blob.write_bytes(b"pre-existing-do-not-delete") + + def fake_index_dedup(raw_path, kb_dir_arg, doc_name=None): + # Dedup hit: return the existing doc_id, create NO new blob. + return IndexResult(doc_id=existing_id, description="", tree={"structure": []}) + + doc = tmp_path / "dup.pdf" + doc.write_bytes(b"%PDF-1.4 dup") + conv = self._long_doc_conv(kb_dir, "dup", "feedface00" * 8) + + with patch("openkb.cli.convert_document", return_value=conv), \ + patch("openkb.indexer.index_long_document", side_effect=fake_index_dedup), \ + patch("openkb.agent.compiler.compile_long_doc", + side_effect=RuntimeError("boom")), \ + patch("openkb.cli.time.sleep"), \ + patch("openkb.cli._setup_llm_key"): + outcome = add_single_file(doc, kb_dir) + + assert outcome == "failed" + assert existing_blob.read_bytes() == b"pre-existing-do-not-delete" + def test_add_directory_calls_helper_for_each_file(self, tmp_path): kb_dir = self._setup_kb(tmp_path) docs_dir = tmp_path / "docs" diff --git a/tests/test_mutation.py b/tests/test_mutation.py index f3dbf614..968179f7 100644 --- a/tests/test_mutation.py +++ b/tests/test_mutation.py @@ -518,3 +518,72 @@ def test_hardlinked_dir_rollback_prunes_new_nested_blob_dirs(tmp_path): assert existing.stat().st_ino == existing_inode # untouched, not recopied assert not (files / "col" / "newdoc.pdf").exists() assert not (files / "col" / "newdoc").exists() # empty new dir pruned + + +# --- track_new: cheap blob-store rollback without whole-tree snapshot ------- + +def test_track_new_removes_new_blob_on_rollback(tmp_path): + """The PageIndex blob under .openkb/files gets its {doc_id} name only once + indexing runs — after snapshot_paths. Instead of snapshotting the whole + (append-only) blob store up front, the add path calls track_new() with the + new artifacts; rollback must then remove exactly those and leave every + pre-existing blob untouched. + """ + kb_dir = tmp_path + blobs = kb_dir / ".openkb" / "files" / "col" + blobs.mkdir(parents=True) + existing = blobs / "old-doc.pdf" + existing.write_bytes(b"keep-me") + existing_inode = existing.stat().st_ino + + # Snapshot does NOT include .openkb/files at all. + snapshot = snapshot_paths(kb_dir, [kb_dir / "wiki"], operation="add", details={}) + + # Indexing creates the new blob + its images subtree (doc_id now known). + new_blob = blobs / "new-doc.pdf" + new_blob.write_bytes(b"remove-me") + new_images = blobs / "new-doc" + (new_images / "images").mkdir(parents=True) + (new_images / "images" / "p1.png").write_bytes(b"img") + + snapshot.track_new([new_blob, new_images]) + snapshot.rollback() + snapshot.discard_best_effort() + + assert not new_blob.exists() # new blob removed + assert not new_images.exists() # new images subtree removed + assert existing.read_bytes() == b"keep-me" # pre-existing untouched + assert existing.stat().st_ino == existing_inode # not recopied/relinked + + +def test_track_new_persists_to_journal_for_crash_recovery(tmp_path): + """track_new() must rewrite the active journal so a crash *after* indexing + but before commit still rolls back the new blob on the next exclusive-lock + acquisition (recover_pending_journals), not just via in-process rollback. + """ + kb_dir = tmp_path + blobs = kb_dir / ".openkb" / "files" / "col" + blobs.mkdir(parents=True) + + snapshot = snapshot_paths(kb_dir, [kb_dir / "wiki"], operation="add", details={}) + new_blob = blobs / "new-doc.pdf" + new_blob.write_bytes(b"remove-me") + snapshot.track_new([new_blob]) + # Simulate a crash: journal left "active", no rollback()/mark_committed(). + + messages = recover_pending_journals(kb_dir) + + assert not new_blob.exists() + assert any("Rolled back" in m for m in messages) + + +def test_snapshot_add_paths_excludes_blob_store(tmp_path): + """The blob store is registered lazily via track_new(), so it must NOT be + in the eager add snapshot path list (that was the O(total blobs)-per-add + cost this change removes).""" + from openkb.cli import _snapshot_add_paths + + paths = _snapshot_add_paths(tmp_path, "doc", None, None) + assert (tmp_path / ".openkb" / "files") not in paths + # hashes.json / pageindex.db are still snapshotted eagerly. + assert (tmp_path / ".openkb" / "hashes.json") in paths