Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions openkb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,18 @@ def _snapshot_add_paths(
final_raw: Path | None,
final_source: Path | None,
) -> list[Path]:
# NOTE: .openkb/files (the PageIndex blob store) is intentionally NOT
# snapshotted here. It is append-only by {doc_id}, and the doc_id is only
# assigned during indexing (after this snapshot). Eagerly snapshotting the
# whole tree cost one os.link per existing blob on every add; instead the
# long-doc add path registers just the new blob via snapshot.track_new()
# once indexing has run.
paths = [
kb_dir / ".openkb" / "hashes.json",
kb_dir / ".openkb" / "pageindex.db",
kb_dir / ".openkb" / "pageindex.db-wal",
kb_dir / ".openkb" / "pageindex.db-shm",
kb_dir / ".openkb" / "pageindex.db-journal",
kb_dir / ".openkb" / "files",
kb_dir / "wiki" / "summaries" / f"{doc_name}.md",
kb_dir / "wiki" / "sources" / f"{doc_name}.json",
kb_dir / "wiki" / "sources" / "images" / doc_name,
Expand Down Expand Up @@ -470,7 +475,6 @@ def _add_single_file_locked(
hardlink_dirs={
kb_dir / "wiki" / "concepts",
kb_dir / "wiki" / "entities",
kb_dir / ".openkb" / "files",
},
)
publish_staged_tree(staging_dir, kb_dir)
Expand All @@ -484,6 +488,16 @@ def _add_single_file_locked(
if result.raw_path is None:
raise RuntimeError(f"Converted long document has no raw artifact: {file_path.name}")
click.echo(" Long document detected — indexing with PageIndex...")
# PageIndex content-dedups: if the same content is already indexed
# (e.g. hashes.json and pageindex.db diverged after a remove whose
# PageIndex cleanup failed), col.add() returns the EXISTING doc_id
# and writes no new blob. Capture the blob set *before* indexing so
# we register only blobs THIS add actually created — otherwise
# rollback would delete a prior document's blob.
files_root = kb_dir / ".openkb" / "files"
blobs_before = (
set(files_root.glob("*/*")) if files_root.exists() else set()
)
try:
from openkb.indexer import index_long_document

Expand All @@ -495,6 +509,20 @@ def _add_single_file_locked(
logger.debug("Indexing traceback:", exc_info=True)
raise

# Register only the newly-created blob artifacts for this doc (the
# {doc_id} file + its images dir) — the append-only store means the
# name isn't known until now — so rollback + crash recovery remove
# exactly this add's blob, never a pre-existing one, instead of
# snapshotting the whole store up front. The doc_id guard + the
# blobs_before diff keep a dedup hit (or an unexpected empty doc_id)
# from registering — and later deleting — existing blobs.
if index_result.doc_id and files_root.exists():
snapshot.track_new([
p
for p in files_root.glob(f"*/{index_result.doc_id}*")
if p not in blobs_before
])

summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md"
_run_compile_with_retry(
lambda: compile_long_doc(
Expand Down Expand Up @@ -618,10 +646,11 @@ def import_from_pageindex_cloud(
_snapshot_add_paths(kb_dir, doc_name, None, None),
operation="cloud_import",
details={"doc_id": doc_id, "doc_name": doc_name},
# Cloud import reads from PageIndex Cloud and writes no local blob,
# so .openkb/files is never touched — nothing to snapshot there.
hardlink_dirs={
kb_dir / "wiki" / "concepts",
kb_dir / "wiki" / "entities",
kb_dir / ".openkb" / "files",
},
)
summary_path = _write_long_doc_artifacts(
Expand Down
24 changes: 24 additions & 0 deletions openkb/mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ def mark_committed(self) -> None:
"""
self.write_journal("committed")

def track_new(self, paths: list[Path]) -> None:
"""Register paths created *after* the snapshot for removal on rollback.

Some artifacts get their final name only once the mutation runs — the
PageIndex ``{doc_id}`` blob under ``.openkb/files`` is named by indexing,
which happens after :func:`snapshot_paths`. Rather than eagerly
snapshotting the whole append-only blob store up front (an ``os.link``
per existing blob on *every* add — O(total blobs), not O(this doc)),
the caller invokes this once the new artifacts exist. Each is recorded
with no backup, so both :meth:`rollback` and a crash-recovery replay
delete exactly the new paths and nothing else. The active journal is
rewritten so a crash after the artifacts land but before commit still
cleans them up. Paths already tracked are ignored; missing paths are a
no-op (nothing was created).
"""
changed = False
for path in paths:
target = path.resolve()
if target not in self.entries:
self.entries[target] = None
changed = True
if changed:
self.write_journal("active")

def rollback(self) -> None:
# Restore children before parents so directory deletes cannot remove
# paths that still need to be restored from a more specific backup.
Expand Down
83 changes: 83 additions & 0 deletions tests/test_add_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,89 @@ def test_add_single_file_compile_failure_rolls_back_converted_artifacts(self, tm
assert not (kb_dir / "wiki" / "sources" / "notes.md").exists()
assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {}

def _long_doc_conv(self, kb_dir, name, file_hash):
from openkb.converter import ConvertResult

return ConvertResult(
raw_path=kb_dir / "raw" / f"{name}.pdf",
source_path=None,
is_long_doc=True,
file_hash=file_hash,
doc_name=name,
)

def test_long_doc_rollback_removes_only_the_new_blob(self, tmp_path):
"""A failed long-doc add must roll back the blob IT created under
.openkb/files, while a pre-existing blob (another document) survives —
the targeted track_new must not touch blobs this add didn't create."""
from openkb.cli import add_single_file
from openkb.indexer import IndexResult

kb_dir = self._setup_kb(tmp_path)
files = kb_dir / ".openkb" / "files" / "default"
files.mkdir(parents=True)
other = files / "other-doc.pdf"
other.write_bytes(b"another-doc-keep-me")

new_id = "11111111-1111-1111-1111-111111111111"

def fake_index(raw_path, kb_dir_arg, doc_name=None):
(files / f"{new_id}.pdf").write_bytes(b"new-blob")
(files / new_id / "images").mkdir(parents=True)
(files / new_id / "images" / "p1.png").write_bytes(b"img")
return IndexResult(doc_id=new_id, description="", tree={"structure": []})

doc = tmp_path / "paper.pdf"
doc.write_bytes(b"%PDF-1.4 fake")
conv = self._long_doc_conv(kb_dir, "paper", "cafebabe00" * 8)

with patch("openkb.cli.convert_document", return_value=conv), \
patch("openkb.indexer.index_long_document", side_effect=fake_index), \
patch("openkb.agent.compiler.compile_long_doc",
side_effect=RuntimeError("boom")), \
patch("openkb.cli.time.sleep"), \
patch("openkb.cli._setup_llm_key"):
outcome = add_single_file(doc, kb_dir)

assert outcome == "failed"
assert not (files / f"{new_id}.pdf").exists() # new blob rolled back
assert not (files / new_id).exists() # new images subtree rolled back
assert other.read_bytes() == b"another-doc-keep-me" # pre-existing survives

def test_long_doc_dedup_hit_does_not_delete_existing_blob(self, tmp_path):
"""PageIndex content-dedup can return an EXISTING doc_id and write no new
blob (diverged hashes.json/pageindex.db). A failed add must NOT delete
that pre-existing blob on rollback (regression: track_new globbing the
doc_id would otherwise register and delete it)."""
from openkb.cli import add_single_file
from openkb.indexer import IndexResult

kb_dir = self._setup_kb(tmp_path)
files = kb_dir / ".openkb" / "files" / "default"
files.mkdir(parents=True)
existing_id = "22222222-2222-2222-2222-222222222222"
existing_blob = files / f"{existing_id}.pdf"
existing_blob.write_bytes(b"pre-existing-do-not-delete")

def fake_index_dedup(raw_path, kb_dir_arg, doc_name=None):
# Dedup hit: return the existing doc_id, create NO new blob.
return IndexResult(doc_id=existing_id, description="", tree={"structure": []})

doc = tmp_path / "dup.pdf"
doc.write_bytes(b"%PDF-1.4 dup")
conv = self._long_doc_conv(kb_dir, "dup", "feedface00" * 8)

with patch("openkb.cli.convert_document", return_value=conv), \
patch("openkb.indexer.index_long_document", side_effect=fake_index_dedup), \
patch("openkb.agent.compiler.compile_long_doc",
side_effect=RuntimeError("boom")), \
patch("openkb.cli.time.sleep"), \
patch("openkb.cli._setup_llm_key"):
outcome = add_single_file(doc, kb_dir)

assert outcome == "failed"
assert existing_blob.read_bytes() == b"pre-existing-do-not-delete"

def test_add_directory_calls_helper_for_each_file(self, tmp_path):
kb_dir = self._setup_kb(tmp_path)
docs_dir = tmp_path / "docs"
Expand Down
69 changes: 69 additions & 0 deletions tests/test_mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,72 @@ def test_hardlinked_dir_rollback_prunes_new_nested_blob_dirs(tmp_path):
assert existing.stat().st_ino == existing_inode # untouched, not recopied
assert not (files / "col" / "newdoc.pdf").exists()
assert not (files / "col" / "newdoc").exists() # empty new dir pruned


# --- track_new: cheap blob-store rollback without whole-tree snapshot -------

def test_track_new_removes_new_blob_on_rollback(tmp_path):
"""The PageIndex blob under .openkb/files gets its {doc_id} name only once
indexing runs — after snapshot_paths. Instead of snapshotting the whole
(append-only) blob store up front, the add path calls track_new() with the
new artifacts; rollback must then remove exactly those and leave every
pre-existing blob untouched.
"""
kb_dir = tmp_path
blobs = kb_dir / ".openkb" / "files" / "col"
blobs.mkdir(parents=True)
existing = blobs / "old-doc.pdf"
existing.write_bytes(b"keep-me")
existing_inode = existing.stat().st_ino

# Snapshot does NOT include .openkb/files at all.
snapshot = snapshot_paths(kb_dir, [kb_dir / "wiki"], operation="add", details={})

# Indexing creates the new blob + its images subtree (doc_id now known).
new_blob = blobs / "new-doc.pdf"
new_blob.write_bytes(b"remove-me")
new_images = blobs / "new-doc"
(new_images / "images").mkdir(parents=True)
(new_images / "images" / "p1.png").write_bytes(b"img")

snapshot.track_new([new_blob, new_images])
snapshot.rollback()
snapshot.discard_best_effort()

assert not new_blob.exists() # new blob removed
assert not new_images.exists() # new images subtree removed
assert existing.read_bytes() == b"keep-me" # pre-existing untouched
assert existing.stat().st_ino == existing_inode # not recopied/relinked


def test_track_new_persists_to_journal_for_crash_recovery(tmp_path):
"""track_new() must rewrite the active journal so a crash *after* indexing
but before commit still rolls back the new blob on the next exclusive-lock
acquisition (recover_pending_journals), not just via in-process rollback.
"""
kb_dir = tmp_path
blobs = kb_dir / ".openkb" / "files" / "col"
blobs.mkdir(parents=True)

snapshot = snapshot_paths(kb_dir, [kb_dir / "wiki"], operation="add", details={})
new_blob = blobs / "new-doc.pdf"
new_blob.write_bytes(b"remove-me")
snapshot.track_new([new_blob])
# Simulate a crash: journal left "active", no rollback()/mark_committed().

messages = recover_pending_journals(kb_dir)

assert not new_blob.exists()
assert any("Rolled back" in m for m in messages)


def test_snapshot_add_paths_excludes_blob_store(tmp_path):
"""The blob store is registered lazily via track_new(), so it must NOT be
in the eager add snapshot path list (that was the O(total blobs)-per-add
cost this change removes)."""
from openkb.cli import _snapshot_add_paths

paths = _snapshot_add_paths(tmp_path, "doc", None, None)
assert (tmp_path / ".openkb" / "files") not in paths
# hashes.json / pageindex.db are still snapshotted eagerly.
assert (tmp_path / ".openkb" / "hashes.json") in paths