From 4ca33c784ffbdff24c4572ae9b40be166b906408 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Fri, 26 Jun 2026 18:12:47 +0800 Subject: [PATCH 1/2] feat: add serial mutation recovery --- openkb/agent/compiler.py | 29 ++- openkb/cli.py | 428 +++++++++++++++++++------------ openkb/converter.py | 188 ++++++++------ openkb/indexer.py | 70 ++++- openkb/lint.py | 3 +- openkb/locks.py | 29 +++ openkb/mutation.py | 431 +++++++++++++++++++++++++++++++ tests/test_add_command.py | 72 ++++-- tests/test_mutation.py | 520 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 1479 insertions(+), 291 deletions(-) create mode 100644 openkb/mutation.py create mode 100644 tests/test_mutation.py diff --git a/openkb/agent/compiler.py b/openkb/agent/compiler.py index af3ac1bd..29a47066 100644 --- a/openkb/agent/compiler.py +++ b/openkb/agent/compiler.py @@ -36,6 +36,7 @@ resolve_entity_types, ) from openkb.lint import list_existing_wiki_targets, strip_ghost_wikilinks +from openkb.locks import atomic_write_text from openkb.schema import INDEX_SEED, get_agents_md logger = logging.getLogger(__name__) @@ -779,7 +780,7 @@ def _write_summary(wiki_dir: Path, doc_name: str, summary: str, fm_lines.append(f"doc_type: {doc_type}") fm_lines.append(_yaml_kv_line("full_text", f"sources/{doc_name}.{ext}")) fm_block = "---\n" + "\n".join(fm_lines) + "\n---\n\n" - (summaries_dir / f"{doc_name}.md").write_text(fm_block + summary, encoding="utf-8") + atomic_write_text(summaries_dir / f"{doc_name}.md", fm_block + summary) _SAFE_NAME_RE = re.compile(r'[^\w\-]') @@ -839,7 +840,7 @@ def _write_concept(wiki_dir: Path, name: str, content: str, source_file: str, is if brief: fm_lines.append(_yaml_kv_line("description", brief)) existing = frontmatter.block(fm_lines) + clean - path.write_text(existing, encoding="utf-8") + atomic_write_text(path, existing) return # Guarantee type + refresh description on update; remove legacy brief:. ex_parts2 = frontmatter.split(existing) @@ -851,7 +852,7 @@ def _write_concept(wiki_dir: Path, name: str, content: str, source_file: str, is # Drop legacy brief: lines (migrated to description:). fm_block = frontmatter.drop_line(fm_block, "brief") existing = fm_block + body - path.write_text(existing, encoding="utf-8") + atomic_write_text(path, existing) else: clean_parts = frontmatter.split(content) if clean_parts is not None: @@ -863,7 +864,7 @@ def _write_concept(wiki_dir: Path, name: str, content: str, source_file: str, is if brief: fm_lines.append(_yaml_kv_line("description", brief)) fm_block = "---\n" + "\n".join(fm_lines) + "\n---\n\n" - path.write_text(fm_block + content, encoding="utf-8") + atomic_write_text(path, fm_block + content) def _write_entity( @@ -927,10 +928,10 @@ def _build_entity_frontmatter(sources: list[str]) -> str: break merged = [source_file] + [s for s in recovered if s != source_file] existing = _build_entity_frontmatter(merged) + clean - path.write_text(existing, encoding="utf-8") + atomic_write_text(path, existing) return - path.write_text(_build_entity_frontmatter([source_file]) + clean, encoding="utf-8") + atomic_write_text(path, _build_entity_frontmatter([source_file]) + clean) _set_fm_line = frontmatter.set_line @@ -1041,7 +1042,7 @@ def _add_related_link( text = _prepend_source_to_frontmatter(text, source_file) text += f"\n\nSee also: {link}" - path.write_text(text, encoding="utf-8") + atomic_write_text(path, text) return True @@ -1068,7 +1069,7 @@ def _backlink_summary_pages( _ensure_h2_section(lines, section, quiet=True) for slug in reversed(missing): _insert_section_entry(lines, section, f"- [[{page_dir}/{slug}]]") - summary_path.write_text("\n".join(lines), encoding="utf-8") + atomic_write_text(summary_path, "\n".join(lines)) def _backlink_pages( @@ -1089,7 +1090,7 @@ def _backlink_pages( lines = text.split("\n") _ensure_h2_section(lines, "## Related Documents", quiet=True) _insert_section_entry(lines, "## Related Documents", f"- {link}") - path.write_text("\n".join(lines), encoding="utf-8") + atomic_write_text(path, "\n".join(lines)) def _backlink_summary(wiki_dir: Path, doc_name: str, concept_slugs: list[str]) -> None: @@ -1195,7 +1196,7 @@ def _remove_doc_from_pages( path.unlink() deleted.append(path.stem) elif new_text != text: - path.write_text(new_text, encoding="utf-8") + atomic_write_text(path, new_text) modified.append(path.stem) return {"modified": modified, "deleted": deleted} @@ -1291,7 +1292,7 @@ def remove_doc_from_index(wiki_dir: Path, doc_name: str, concept_slugs_deleted: while _remove_section_entry(lines, "## Entities", entity_link): pass - index_path.write_text("\n".join(lines), encoding="utf-8") + atomic_write_text(index_path, "\n".join(lines)) def _update_index( @@ -1315,7 +1316,7 @@ def _update_index( index_path = wiki_dir / "index.md" if not index_path.exists(): - index_path.write_text(INDEX_SEED, encoding="utf-8") + atomic_write_text(index_path, INDEX_SEED) lines = index_path.read_text(encoding="utf-8").split("\n") @@ -1361,7 +1362,7 @@ def _update_index( else: _insert_section_entry(lines, "## Entities", entry) - index_path.write_text("\n".join(lines), encoding="utf-8") + atomic_write_text(index_path, "\n".join(lines)) # --------------------------------------------------------------------------- @@ -2035,7 +2036,7 @@ async def compile_long_doc( updated = fm_block + body if updated != summary_content: summary_content = updated - summary_path.write_text(summary_content, encoding="utf-8") + atomic_write_text(summary_path, summary_content) # Base context A. cache_control marker on the doc message creates a # cache breakpoint covering (system + doc) for every concept call. diff --git a/openkb/cli.py b/openkb/cli.py index 28694987..ff4e2228 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -13,6 +13,7 @@ import shutil import sys import time +import uuid from functools import wraps from pathlib import Path from typing import Literal @@ -47,10 +48,11 @@ def filter(self, record: logging.LogRecord) -> bool: resolve_extra_headers, set_extra_headers, resolve_timeout, set_timeout, resolve_litellm_settings, ) -from openkb.converter import _registry_path, convert_document -from openkb.indexer import import_cloud_document +from openkb.converter import _registry_path, _sanitize_stem, convert_document +from openkb.indexer import _write_long_doc_artifacts, prepare_cloud_import from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock from openkb.log import append_log +from openkb.mutation import MutationSnapshot, publish_staged_tree, snapshot_paths from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS # Suppress warnings after all imports — markitdown overrides filters at import time @@ -332,13 +334,83 @@ def _clear_existing_skill_dir(kb_dir: Path, name: str) -> None: shutil.rmtree(target) -def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]: +def _staging_dir_for(kb_dir: Path, file_path: Path) -> Path: + safe = _sanitize_stem(file_path.stem) + path = kb_dir / ".openkb" / "staging" / f"add-{safe}-{uuid.uuid4().hex[:8]}" + path.mkdir(parents=True, exist_ok=False) + return path + + +def _cleanup_staging(path: Path | None) -> None: + if path is not None: + shutil.rmtree(path, ignore_errors=True) + + +def _final_artifact_paths(result, kb_dir: Path) -> tuple[Path | None, Path | None]: + final_raw = None + final_source = None + if result.raw_path is not None: + final_raw = kb_dir / "raw" / result.raw_path.name + if result.source_path is not None: + final_source = kb_dir / "wiki" / "sources" / result.source_path.name + return final_raw, final_source + + +def _snapshot_add_paths( + kb_dir: Path, + doc_name: str, + final_raw: Path | None, + final_source: Path | None, +) -> list[Path]: + paths = [ + kb_dir / ".openkb" / "hashes.json", + kb_dir / ".openkb" / "pageindex.db", + kb_dir / ".openkb" / "pageindex.db-wal", + kb_dir / ".openkb" / "pageindex.db-shm", + kb_dir / ".openkb" / "pageindex.db-journal", + kb_dir / ".openkb" / "files", + kb_dir / "wiki" / "summaries" / f"{doc_name}.md", + kb_dir / "wiki" / "sources" / f"{doc_name}.json", + kb_dir / "wiki" / "sources" / "images" / doc_name, + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + kb_dir / "wiki" / "index.md", + kb_dir / "wiki" / "log.md", + ] + if final_raw is not None: + paths.append(final_raw) + if final_source is not None: + paths.append(final_source) + return paths + + +def _run_compile_with_retry(coro_factory, label: str) -> None: + click.echo(f" {label}...") + for attempt in range(2): + try: + asyncio.run(coro_factory()) + return + except Exception as exc: + if attempt == 0: + click.echo(" Retrying compilation in 2s...") + time.sleep(2) + else: + click.echo(f" [ERROR] Compilation failed: {exc}") + logger.debug("Compilation traceback:", exc_info=True) + raise + + +def add_single_file( + file_path: Path, kb_dir: Path, *, stage: bool = True +) -> Literal["added", "skipped", "failed"]: """Convert, index, and compile a single document under the KB mutation lock.""" with kb_ingest_lock(kb_dir / ".openkb"): - return _add_single_file_locked(file_path, kb_dir) + return _add_single_file_locked(file_path, kb_dir, stage=stage) -def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]: +def _add_single_file_locked( + file_path: Path, kb_dir: Path, *, stage: bool = True +) -> Literal["added", "skipped", "failed"]: """Convert, index, and compile a single document into the knowledge base. Steps: @@ -363,129 +435,145 @@ def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", " _setup_llm_key(kb_dir) model: str = config.get("model", DEFAULT_CONFIG["model"]) - # 2. Convert document + staging_dir = _staging_dir_for(kb_dir, file_path) if stage else None + snapshot: MutationSnapshot | None = None + + # 2. Convert document into staging when possible. click.echo(f"Adding: {file_path.name}") try: - result = convert_document(file_path, kb_dir) + result = convert_document(file_path, kb_dir, staging_dir=staging_dir) except Exception as exc: click.echo(f" [ERROR] Conversion failed: {exc}") logger.debug("Conversion traceback:", exc_info=True) + _cleanup_staging(staging_dir) return "failed" if result.skipped: click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") + _cleanup_staging(staging_dir) return "skipped" doc_name = result.doc_name or file_path.stem index_result = None # populated only on the long-doc branch - # 3/4. Index and compile - if result.is_long_doc: - click.echo(f" Long document detected — indexing with PageIndex...") - try: - from openkb.indexer import index_long_document - index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name) - except Exception as exc: - click.echo(f" [ERROR] Indexing failed: {exc}") - logger.debug("Indexing traceback:", exc_info=True) - return "failed" - - summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" - click.echo(f" Compiling long doc (doc_id={index_result.doc_id})...") - for attempt in range(2): + final_raw, final_source = _final_artifact_paths(result, kb_dir) + try: + snapshot = snapshot_paths( + kb_dir, + _snapshot_add_paths(kb_dir, doc_name, final_raw, final_source), + operation="add", + details={ + "file_hash": result.file_hash, + "name": file_path.name, + "doc_name": doc_name, + }, + hardlink_dirs={ + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + kb_dir / ".openkb" / "files", + }, + ) + publish_staged_tree(staging_dir, kb_dir) + if final_raw is not None: + result.raw_path = final_raw + if final_source is not None: + result.source_path = final_source + + # 3/4. Index and compile + if result.is_long_doc: + if result.raw_path is None: + raise RuntimeError(f"Converted long document has no raw artifact: {file_path.name}") + click.echo(" Long document detected — indexing with PageIndex...") try: - asyncio.run( - compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model, - doc_description=index_result.description) + from openkb.indexer import index_long_document + + index_result = index_long_document( + result.raw_path, kb_dir, doc_name=doc_name ) - break except Exception as exc: - if attempt == 0: - click.echo(f" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - return "failed" - else: - click.echo(f" Compiling short doc...") - for attempt in range(2): - try: - asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model)) - break - except Exception as exc: - if attempt == 0: - click.echo(f" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - return "failed" - - # Register hash only after successful compilation - if result.file_hash: - # Construct the registry NOW, not earlier: convert_document may have - # backfilled a legacy entry (doc_name/path) on disk via its own - # instance, and an earlier snapshot would clobber that backfill on - # the full rewrite in add(). - registry = HashRegistry(openkb_dir / "hashes.json") - doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") - meta = { - "name": file_path.name, - "doc_name": doc_name, - "type": doc_type, - "path": _registry_path(file_path, kb_dir), - } - if result.raw_path is not None: - meta["raw_path"] = _registry_path(result.raw_path, kb_dir) - if result.source_path is not None: - meta["source_path"] = _registry_path(result.source_path, kb_dir) - # For long PDFs we also persist the PageIndex doc_id so `openkb - # remove` can later call ``Collection.delete_document(doc_id)`` - # to free the managed PDF copy + SQLite row. - if index_result is not None: - meta["doc_id"] = index_result.doc_id - # An edited document arrives with a new content hash; drop the - # stale entry for the same doc_name so the registry keeps exactly - # one entry per document. - registry.remove_by_doc_name(doc_name) - registry.add(result.file_hash, meta) + click.echo(f" [ERROR] Indexing failed: {exc}") + logger.debug("Indexing traceback:", exc_info=True) + raise + + summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" + _run_compile_with_retry( + lambda: compile_long_doc( + doc_name, + summary_path, + index_result.doc_id, + kb_dir, + model, + doc_description=index_result.description, + ), + label=f"Compiling long doc (doc_id={index_result.doc_id})", + ) + else: + if result.source_path is None: + raise RuntimeError(f"Converted document has no source artifact: {file_path.name}") + source_path = result.source_path + _run_compile_with_retry( + lambda: compile_short_doc(doc_name, source_path, kb_dir, model), + label="Compiling short doc", + ) + + # Register hash only after successful compilation. + if result.file_hash: + registry = HashRegistry(openkb_dir / "hashes.json") + doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") + meta = { + "name": file_path.name, + "doc_name": doc_name, + "type": doc_type, + "path": _registry_path(file_path, kb_dir), + } + if result.raw_path is not None: + meta["raw_path"] = _registry_path(result.raw_path, kb_dir) + if result.source_path is not None: + meta["source_path"] = _registry_path(result.source_path, kb_dir) + if index_result is not None: + meta["doc_id"] = index_result.doc_id + registry.remove_by_doc_name(doc_name) + for existing_hash, existing_meta in list(registry.all_entries().items()): + if ( + existing_hash != result.file_hash + and not existing_meta.get("doc_name") + and existing_meta.get("name") == file_path.name + ): + registry.remove_by_hash(existing_hash) + registry.add(result.file_hash, meta) + + snapshot.mark_committed() + except Exception: + if snapshot is None: + click.echo(f" [ERROR] Failed to prepare mutation snapshot for {file_path.name}.") + _cleanup_staging(staging_dir) + return "failed" + rollback_error = snapshot.rollback_best_effort() + if rollback_error is None: + snapshot.discard_best_effort() + else: + click.echo( + " [ERROR] Rollback failed; mutation journal retained for recovery: " + f"{snapshot.journal_path}" + ) + _cleanup_staging(staging_dir) + return "failed" + finally: + _cleanup_staging(staging_dir) - append_log(kb_dir / "wiki", "ingest", file_path.name) + try: + append_log(kb_dir / "wiki", "ingest", file_path.name) + except Exception as exc: + logger.warning("Failed to append ingest log for %s: %s", file_path.name, exc) + cleanup_error = snapshot.discard_best_effort() + if cleanup_error is not None: + click.echo( + f" [WARN] {file_path.name} added, but mutation journal cleanup failed: {cleanup_error}" + ) click.echo(f" [OK] {file_path.name} added to knowledge base.") return "added" -def _cleanup_failed_cloud_import(kb_dir: Path, doc_name: str) -> None: - """Best-effort wiki cleanup after a cloud import whose compilation failed. - - import_cloud_document writes the summary + per-page JSON source before - compile, and compile_long_doc writes concept/entity pages incrementally — so - a compile failure (which happens before the registry entry is added) would - otherwise strand wiki artifacts that ``openkb remove`` cannot reach. Mirror - remove's wiki cleanup (by doc_name, idempotent) but touch neither the - registry (no entry was added) nor PageIndex (the cloud doc is the user's). - """ - from openkb.agent.compiler import ( - remove_doc_from_concept_pages, - remove_doc_from_entity_pages, - remove_doc_from_index, - ) - - wiki_dir = kb_dir / "wiki" - (wiki_dir / "summaries" / f"{doc_name}.md").unlink(missing_ok=True) - (wiki_dir / "sources" / f"{doc_name}.json").unlink(missing_ok=True) - images_dir = wiki_dir / "sources" / "images" / doc_name - if images_dir.is_dir(): - shutil.rmtree(images_dir, ignore_errors=True) - concept_result = remove_doc_from_concept_pages(wiki_dir, doc_name, keep_empty=False) - entity_result = remove_doc_from_entity_pages(wiki_dir, doc_name, keep_empty=False) - remove_doc_from_index( - wiki_dir, doc_name, concept_result["deleted"], - entity_slugs_deleted=entity_result["deleted"], - ) - - def import_from_pageindex_cloud( doc_id: str, kb_dir: Path ) -> Literal["added", "skipped", "failed"]: @@ -514,61 +602,87 @@ def import_from_pageindex_cloud( return "skipped" click.echo(f"Importing from PageIndex Cloud: {doc_id}") + snapshot: MutationSnapshot | None = None + doc_name = "" try: - import_result = import_cloud_document(doc_id, kb_dir, path_key) - except Exception as exc: - click.echo(f" [ERROR] Import failed: {exc}") - logger.debug("Cloud import traceback:", exc_info=True) - return "failed" - - doc_name = import_result.doc_name - summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" - click.echo(f" Compiling imported doc (doc_id={doc_id})...") - compiled = False - for attempt in range(2): try: - asyncio.run( - compile_long_doc( - doc_name, summary_path, doc_id, kb_dir, model, - doc_description=import_result.description, - ) - ) - compiled = True - break + cloud = prepare_cloud_import(doc_id, kb_dir, path_key) except Exception as exc: - if attempt == 0: - click.echo(" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - if not compiled: - # No registry entry exists yet, so `openkb remove` can't reach the - # summary/source/concept/entity artifacts already written; clean them - # best-effort so a failed import leaves no orphans and a retry is clean. - try: - _cleanup_failed_cloud_import(kb_dir, doc_name) - except Exception: - logger.debug("Cleanup after failed cloud import errored:", exc_info=True) - return "failed" + click.echo(f" [ERROR] Import failed: {exc}") + logger.debug("Cloud import traceback:", exc_info=True) + return "failed" - # Register the raw-less cloud entry only after successful compilation. - registry = HashRegistry(openkb_dir / "hashes.json") - meta = { - "name": import_result.name, - "doc_name": doc_name, - "type": "pageindex_cloud", - "origin": "cloud", - "path": path_key, - "source_path": _registry_path( - kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir - ), - "doc_id": doc_id, - } - registry.remove_by_doc_name(doc_name) - registry.add(synthetic_hash, meta) + doc_name = cloud.doc_name + snapshot = snapshot_paths( + kb_dir, + _snapshot_add_paths(kb_dir, doc_name, None, None), + operation="cloud_import", + details={"doc_id": doc_id, "doc_name": doc_name}, + hardlink_dirs={ + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + kb_dir / ".openkb" / "files", + }, + ) + summary_path = _write_long_doc_artifacts( + cloud.tree, + cloud.all_pages, + doc_name, + doc_id, + kb_dir, + description=cloud.description, + ) + _run_compile_with_retry( + lambda: compile_long_doc( + doc_name, + summary_path, + doc_id, + kb_dir, + model, + doc_description=cloud.description, + ), + label=f"Compiling imported doc (doc_id={doc_id})", + ) + + # Register the raw-less cloud entry only after successful compilation. + registry = HashRegistry(openkb_dir / "hashes.json") + meta = { + "name": cloud.cloud_name, + "doc_name": doc_name, + "type": "pageindex_cloud", + "origin": "cloud", + "path": path_key, + "source_path": _registry_path( + kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir + ), + "doc_id": doc_id, + } + registry.remove_by_doc_name(doc_name) + registry.add(synthetic_hash, meta) + snapshot.mark_committed() + except Exception: + if snapshot is None: + click.echo(f" [ERROR] Failed to prepare mutation snapshot for cloud import {doc_id}.") + return "failed" + rollback_error = snapshot.rollback_best_effort() + if rollback_error is None: + snapshot.discard_best_effort() + else: + click.echo( + " [ERROR] Rollback failed; mutation journal retained for recovery: " + f"{snapshot.journal_path}" + ) + return "failed" - append_log(kb_dir / "wiki", "ingest", doc_name) + try: + append_log(kb_dir / "wiki", "ingest", doc_name) + except Exception as exc: + logger.warning("Failed to append ingest log for cloud import %s: %s", doc_id, exc) + cleanup_error = snapshot.discard_best_effort() + if cleanup_error is not None: + click.echo( + f" [WARN] {doc_name} imported, but mutation journal cleanup failed: {cleanup_error}" + ) click.echo(f" [OK] {doc_name} imported from PageIndex Cloud.") return "added" @@ -847,7 +961,7 @@ def add(ctx, path, from_pageindex_cloud): fetched = fetch_url_to_raw(path, kb_dir) if fetched is None: return - outcome = add_single_file(fetched, kb_dir) + outcome = add_single_file(fetched, kb_dir, stage=False) # Only clean up on dedup-skip. On "failed" we keep the file so # the user can retry (e.g. transient LLM error during compile) # without re-downloading — and so they don't lose data when diff --git a/openkb/converter.py b/openkb/converter.py index 4b9246ec..fc3c96b5 100644 --- a/openkb/converter.py +++ b/openkb/converter.py @@ -14,6 +14,7 @@ from openkb.config import load_config from openkb.images import copy_relative_images, extract_base64_images, convert_pdf_with_images +from openkb.locks import atomic_write_text, kb_ingest_lock from openkb.state import HashRegistry logger = logging.getLogger(__name__) @@ -70,7 +71,13 @@ def _name_taken(candidate: str, registry: HashRegistry) -> bool: return False -def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str: +def resolve_doc_name( + src: Path, + kb_dir: Path, + registry: HashRegistry, + *, + persist_legacy: bool = True, +) -> str: """Resolve the stable wiki name for ``src`` (Scheme A). Identity is keyed by path: a source we've seen before (same path, even @@ -93,9 +100,10 @@ def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str: file_hash, meta = legacy meta = dict(meta) name = meta.get("doc_name") or Path(meta.get("name", "")).stem - meta["doc_name"] = name - meta["path"] = path_key - registry.add(file_hash, meta) # backfill + persist + if persist_legacy: + meta["doc_name"] = name + meta["path"] = path_key + registry.add(file_hash, meta) # backfill + persist return name return resolve_doc_name_from_key(src.stem, path_key, registry) @@ -130,7 +138,12 @@ def get_pdf_page_count(path: Path) -> int: return doc.page_count -def convert_document(src: Path, kb_dir: Path) -> ConvertResult: +def convert_document( + src: Path, + kb_dir: Path, + *, + staging_dir: Path | None = None, +) -> ConvertResult: """Convert a document and integrate it into the knowledge base. Steps: @@ -141,86 +154,93 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: 5. Otherwise — run MarkItDown, extract base64 images, save to ``wiki/sources/``. 6. Register hash in the registry. """ - # ------------------------------------------------------------------ - # Load config & state - # ------------------------------------------------------------------ - openkb_dir = kb_dir / ".openkb" - config = load_config(openkb_dir / "config.yaml") - threshold: int = config.get("pageindex_threshold", 20) - registry = HashRegistry(openkb_dir / "hashes.json") - - # ------------------------------------------------------------------ - # 1. Hash check + identity resolution - # ------------------------------------------------------------------ - file_hash = HashRegistry.hash_file(src) - if registry.is_known(file_hash): - logger.info("Skipping already-known file: %s", src.name) - stored = registry.get(file_hash) or {} - return ConvertResult( - skipped=True, - file_hash=file_hash, - doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem, - ) - doc_name = resolve_doc_name(src, kb_dir, registry) - - # ------------------------------------------------------------------ - # 2. Copy to raw/ - # ------------------------------------------------------------------ - raw_dir = kb_dir / "raw" - raw_dir.mkdir(parents=True, exist_ok=True) - if src.resolve().is_relative_to(raw_dir.resolve()): - # Watch mode: the file already lives in raw/ — don't copy/rename. - raw_dest = src - else: - raw_dest = raw_dir / f"{doc_name}{src.suffix.lower()}" - shutil.copy2(src, raw_dest) - - # ------------------------------------------------------------------ - # 3. PDF long-doc detection - # ------------------------------------------------------------------ - if src.suffix.lower() == ".pdf": - page_count = get_pdf_page_count(src) - if page_count >= threshold: - logger.info( - "Long PDF detected (%d pages >= %d threshold): %s", - page_count, - threshold, - src.name, - ) + with kb_ingest_lock(kb_dir / ".openkb"): + # ------------------------------------------------------------------ + # Load config & state + # ------------------------------------------------------------------ + openkb_dir = kb_dir / ".openkb" + config = load_config(openkb_dir / "config.yaml") + threshold: int = config.get("pageindex_threshold", 20) + artifact_root = staging_dir if staging_dir is not None else kb_dir + registry = HashRegistry(openkb_dir / "hashes.json") + + # ------------------------------------------------------------------ + # 1. Hash check + identity resolution + # ------------------------------------------------------------------ + file_hash = HashRegistry.hash_file(src) + if registry.is_known(file_hash): + logger.info("Skipping already-known file: %s", src.name) + stored = registry.get(file_hash) or {} return ConvertResult( - raw_path=raw_dest, - is_long_doc=True, + skipped=True, file_hash=file_hash, - doc_name=doc_name, + doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem, ) + doc_name = resolve_doc_name( + src, + kb_dir, + registry, + persist_legacy=staging_dir is None, + ) - # ------------------------------------------------------------------ - # 4/5. Convert to Markdown - # ------------------------------------------------------------------ - sources_dir = kb_dir / "wiki" / "sources" - sources_dir.mkdir(parents=True, exist_ok=True) - images_dir = kb_dir / "wiki" / "sources" / "images" / doc_name - images_dir.mkdir(parents=True, exist_ok=True) - - if src.suffix.lower() == ".md": - markdown = src.read_text(encoding="utf-8") - markdown = copy_relative_images(markdown, src.parent, doc_name, images_dir) - elif src.suffix.lower() == ".pdf": - # Use pymupdf dict-mode for PDFs: text + images inline at correct positions - markdown = convert_pdf_with_images(src, doc_name, images_dir) - else: - # Non-PDF, non-MD: use markitdown (docx, pptx, html, etc.) - mid = MarkItDown() - result = mid.convert(str(src)) - markdown = result.text_content - markdown = extract_base64_images(markdown, doc_name, images_dir) - - dest_md = sources_dir / f"{doc_name}.md" - dest_md.write_text(markdown, encoding="utf-8") - - return ConvertResult( - raw_path=raw_dest, - source_path=dest_md, - file_hash=file_hash, - doc_name=doc_name, - ) + # ------------------------------------------------------------------ + # 2. Copy to raw/ + # ------------------------------------------------------------------ + raw_dir = artifact_root / "raw" + raw_dir.mkdir(parents=True, exist_ok=True) + if staging_dir is None and src.resolve().is_relative_to(raw_dir.resolve()): + # Watch mode: the file already lives in raw/ — don't copy/rename. + raw_dest = src + else: + raw_dest = raw_dir / f"{doc_name}{src.suffix.lower()}" + shutil.copy2(src, raw_dest) + + # ------------------------------------------------------------------ + # 3. PDF long-doc detection + # ------------------------------------------------------------------ + if src.suffix.lower() == ".pdf": + page_count = get_pdf_page_count(src) + if page_count >= threshold: + logger.info( + "Long PDF detected (%d pages >= %d threshold): %s", + page_count, + threshold, + src.name, + ) + return ConvertResult( + raw_path=raw_dest, + is_long_doc=True, + file_hash=file_hash, + doc_name=doc_name, + ) + + # ------------------------------------------------------------------ + # 4/5. Convert to Markdown + # ------------------------------------------------------------------ + sources_dir = artifact_root / "wiki" / "sources" + sources_dir.mkdir(parents=True, exist_ok=True) + images_dir = artifact_root / "wiki" / "sources" / "images" / doc_name + images_dir.mkdir(parents=True, exist_ok=True) + + if src.suffix.lower() == ".md": + markdown = src.read_text(encoding="utf-8") + markdown = copy_relative_images(markdown, src.parent, doc_name, images_dir) + elif src.suffix.lower() == ".pdf": + # Use pymupdf dict-mode for PDFs: text + images inline at correct positions + markdown = convert_pdf_with_images(src, doc_name, images_dir) + else: + # Non-PDF, non-MD: use markitdown (docx, pptx, html, etc.) + mid = MarkItDown() + result = mid.convert(str(src)) + markdown = result.text_content + markdown = extract_base64_images(markdown, doc_name, images_dir) + + dest_md = sources_dir / f"{doc_name}.md" + atomic_write_text(dest_md, markdown) + + return ConvertResult( + raw_path=raw_dest, + source_path=dest_md, + file_hash=file_hash, + doc_name=doc_name, + ) diff --git a/openkb/indexer.py b/openkb/indexer.py index 691a126c..fd50a38c 100644 --- a/openkb/indexer.py +++ b/openkb/indexer.py @@ -5,7 +5,7 @@ import logging from dataclasses import dataclass -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import Any import os @@ -37,6 +37,30 @@ class CloudImportResult: description: str +@dataclass +class CloudImportData: + """A fetched cloud doc + its resolved wiki name, before any KB write. + + Returned by :func:`prepare_cloud_import` so the caller can snapshot this + doc's specific paths (O(1)) before :func:`_write_long_doc_artifacts` writes + them — instead of copying the whole summaries/sources trees on every import. + """ + + doc_id: str + doc_name: str # collision-resistant wiki slug (resolved, not yet written) + cloud_name: str # cloud display name (original filename in the cloud) + description: str + tree: dict + all_pages: list + + +def _cloud_display_stem(cloud_name: str, fallback: str) -> str: + """Return a platform-independent stem for a PageIndex Cloud display name.""" + normalized = cloud_name.replace("\\", "/").rstrip("/") + leaf = normalized.rsplit("/", 1)[-1] if normalized else "" + return PurePosixPath(leaf).stem or fallback + + def _normalize_page_content(raw_pages: Any) -> list[dict[str, Any]]: """Normalize PageIndex/local PDF page content into OpenKB's JSON shape.""" if not isinstance(raw_pages, list): @@ -246,14 +270,13 @@ def _fetch_cloud_pages(col, doc_id: str) -> list[dict[str, Any]]: return pages -def import_cloud_document(doc_id: str, kb_dir: Path, path_key: str) -> CloudImportResult: - """Import an already-indexed PageIndex Cloud document by ``doc_id``. +def prepare_cloud_import(doc_id: str, kb_dir: Path, path_key: str) -> CloudImportData: + """Fetch a PageIndex Cloud doc and resolve its wiki name WITHOUT writing. - Fetches structure + OCR'd page content from the cloud (no local PDF) and - writes the same wiki artifacts as :func:`index_long_document`. Requires - ``PAGEINDEX_API_KEY``. ``path_key`` is the synthetic identity key - (``pageindex-cloud:``) used to resolve a collision-resistant - wiki name. + Cloud fetch + collision-resistant name resolution only — no KB mutation — + so the caller knows ``doc_name`` before writing and can snapshot just this + doc's paths instead of copying the whole summaries/sources trees. Name + resolution reads the registry but does not mutate it. """ from openkb.converter import resolve_doc_name_from_key from openkb.state import HashRegistry @@ -274,7 +297,7 @@ def import_cloud_document(doc_id: str, kb_dir: Path, path_key: str) -> CloudImpo structure: list = doc.get("structure", []) registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") - stem = Path(cloud_name).stem or doc_id + stem = _cloud_display_stem(cloud_name, doc_id) doc_name = resolve_doc_name_from_key(stem, path_key, registry) tree = { @@ -289,7 +312,32 @@ def import_cloud_document(doc_id: str, kb_dir: Path, path_key: str) -> CloudImpo f"No page content returned from PageIndex Cloud for doc_id={doc_id}" ) - _write_long_doc_artifacts(tree, all_pages, doc_name, doc_id, kb_dir, description=description) + return CloudImportData( + doc_id=doc_id, doc_name=doc_name, cloud_name=cloud_name, + description=description, tree=tree, all_pages=all_pages, + ) + + +def import_cloud_document(doc_id: str, kb_dir: Path, path_key: str) -> CloudImportResult: + """Import an already-indexed PageIndex Cloud document by ``doc_id``. + + Fetches structure + OCR'd page content from the cloud (no local PDF) and + writes the same wiki artifacts as :func:`index_long_document`. Requires + ``PAGEINDEX_API_KEY``. ``path_key`` is the synthetic identity key + (``pageindex-cloud:``) used to resolve a collision-resistant + wiki name. + + Writes immediately. Callers that need to snapshot before writing (e.g. the + crash-safe CLI path) should call :func:`prepare_cloud_import` then + :func:`_write_long_doc_artifacts`, so the snapshot can cover only this + doc's paths. + """ + cloud = prepare_cloud_import(doc_id, kb_dir, path_key) + _write_long_doc_artifacts( + cloud.tree, cloud.all_pages, cloud.doc_name, cloud.doc_id, kb_dir, + description=cloud.description, + ) return CloudImportResult( - doc_id=doc_id, doc_name=doc_name, name=cloud_name, description=description, + doc_id=cloud.doc_id, doc_name=cloud.doc_name, + name=cloud.cloud_name, description=cloud.description, ) diff --git a/openkb/lint.py b/openkb/lint.py index 8b7674b7..ce695ecc 100644 --- a/openkb/lint.py +++ b/openkb/lint.py @@ -16,6 +16,7 @@ import yaml from openkb import frontmatter +from openkb.locks import atomic_write_text from openkb.schema import PAGE_CONTENT_DIRS # Matches [[wikilink]] or [[subdir/link]] @@ -249,7 +250,7 @@ def fix_broken_links( text, known_targets, norm_index=norm_index, ) if cleaned != text: - md.write_text(cleaned, encoding="utf-8") + atomic_write_text(md, cleaned) files_changed += 1 ghosts_stripped += len(ghosts) return files_changed, ghosts_stripped diff --git a/openkb/locks.py b/openkb/locks.py index 2fa2815a..72966fc1 100644 --- a/openkb/locks.py +++ b/openkb/locks.py @@ -9,6 +9,7 @@ import contextlib import json +import logging import os import tempfile import threading @@ -97,6 +98,32 @@ def _local_lock(lock_path: Path) -> _LocalRwLock: return lock +def _drain_pending_journals(openkb_dir: Path) -> None: + """Roll back any mutation journals an interrupted process left behind. + + Draining recovery is part of *taking* the mutation lock, not part of any + one command: a process that acquires the exclusive lock must restore the + KB to a known state before mutating it. Wiring this into ``kb_lock`` means + every exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``, + ``chat`` — drains on first acquisition, so an ``add`` that crashed mid- + commit cannot leave an active journal on disk that a later ``add`` rolls + back over the top of an intervening ``remove``/``recompile`` (clobbering + those edits). ``openkb_dir`` is the ``kb_dir/.openkb`` directory callers + pass to ``kb_lock``; the journal lives at ``openkb_dir/journal``, so the + KB root is ``openkb_dir.parent``. + + The delayed import breaks the ``locks`` ↔ ``mutation`` cycle (``mutation`` + imports atomic-write helpers from this module at top level). Called only + on first OS-lock acquisition (the reentrant branch above returns early), + never on a read lock, so queries pay nothing. + """ + from openkb.mutation import recover_pending_journals + + log = logging.getLogger(__name__) + for message in recover_pending_journals(openkb_dir.parent): + log.warning(message) + + @contextlib.contextmanager def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]: """Hold a KB-level advisory lock.""" @@ -134,6 +161,8 @@ def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]: flock(fh, exclusive=exclusive) held[resolved] = (1, 0) if exclusive else (0, 1) try: + if exclusive: + _drain_pending_journals(openkb_dir) yield finally: held.pop(resolved, None) diff --git a/openkb/mutation.py b/openkb/mutation.py new file mode 100644 index 00000000..cb1353b5 --- /dev/null +++ b/openkb/mutation.py @@ -0,0 +1,431 @@ +"""Transactional helpers for KB mutation paths.""" +from __future__ import annotations + +import errno +import json +import logging +import os +import shutil +import tempfile +import uuid +from dataclasses import dataclass, field +from pathlib import Path + +from openkb.locks import _fsync_directory, _target_mode, atomic_write_json + +logger = logging.getLogger(__name__) + +# Cap how many times recover_pending_journals retries an active journal whose +# rollback keeps failing. Without a cap, a deterministically-failing rollback +# (e.g. persistent ENOSPC) is retried on every lock acquisition forever, +# re-doing the failed work and never releasing the backup dir + journal. +MAX_ROLLBACK_ATTEMPTS = 5 + + +def _apply_mode(path: Path, mode: int) -> None: + """Set ``path``'s permission bits (no-op where ``os.chmod`` is absent).""" + if hasattr(os, "chmod"): + os.chmod(path, mode) + + +def _fsync_file(path: Path) -> None: + """Best-effort fsync of a file's data, for durability after a rename. + + Opens read+write so ``FlushFileBuffers`` works on Windows (a read-only + handle can be denied). Best-effort: a failure here only weakens durability + of already-written bytes (the OS write-back still flushes them); it must + not fail the publish. + """ + try: + with open(path, "r+b") as fh: + os.fsync(fh.fileno()) + except OSError: + pass + + +def _hardlink_or_copy(src: str, dst: str) -> None: + """``copytree`` copy_function that hardlinks (O(1), shares the inode). + + Used for directory backups the caller has marked hardlink-safe — trees + whose writers all go through atomic temp+replace (so the live file moves + to a new inode) or that are append-only across documents. The hardlink + backup then keeps pointing at the old inode while the live tree is + mutated, so rollback restores the pre-mutation bytes without copying them + up front. Falls back to a real copy on EXDEV/EPERM/EACCES — cross-device, + a filesystem that forbids hardlinks, or (Windows) an ACL / cloud-sync + folder (OneDrive/Dropbox) that blocks CREATE_HARD_LINK. If the copy also + fails it surfaces the real error. + """ + src_path = Path(src) + dst_path = Path(dst) + try: + os.link(src_path, dst_path) + except OSError as exc: + if exc.errno not in (errno.EXDEV, errno.EPERM, errno.EACCES): + raise + shutil.copy2(src_path, dst_path) + + +def _copy_file_atomic(src: Path, dest: Path) -> None: + """Stream ``src`` to ``dest`` through a temp file, then atomically replace. + + Streams (never buffers the whole file) so copying a large raw PDF does + not spike peak memory. The temp-file + ``os.replace`` means a torn + intermediate state can never be observed at ``dest``. Used by snapshot + backup creation, rollback restore, and the cross-filesystem fallback of + :func:`_publish_staged_file` — so every byte copy in this module shares + one atomic, streaming, durable semantic: the parent directory is fsynced + and the result carries the umask mode (not ``mkstemp``'s 0600). + """ + dest.parent.mkdir(parents=True, exist_ok=True) + # Capture the destination mode before the temp file shadows it: a brand- + # new file gets the process umask mode (0o666 & ~umask), an existing file + # keeps its current mode — the same rule ``atomic_write_bytes`` applies. + mode = _target_mode(dest) + fd, tmp_name = tempfile.mkstemp(prefix=f".{dest.name}.", suffix=".tmp", dir=dest.parent) + tmp_path = Path(tmp_name) + try: + with os.fdopen(fd, "wb") as out, src.open("rb") as inp: + shutil.copyfileobj(inp, out) + out.flush() + os.fsync(out.fileno()) + os.replace(tmp_path, dest) + _apply_mode(dest, mode) + _fsync_directory(dest.parent) + finally: + tmp_path.unlink(missing_ok=True) + + +def _publish_staged_file(src: Path, dest: Path) -> None: + """Publish one staged file into its live-KB location. + + Staging sits on the same filesystem as ``raw/`` and ``wiki/sources/``, so + an O(1) atomic ``os.replace`` (rename) is used instead of streaming the + bytes — a full copy + fsync per published file was the old per-file cost. + Only on ``EXDEV`` (staging and the live KB genuinely on different devices) + does it fall back to :func:`_copy_file_atomic`. Both branches leave the + result durable (file data + parent dir fsynced) and at the umask mode. + """ + dest.parent.mkdir(parents=True, exist_ok=True) + mode = _target_mode(dest) + try: + os.replace(src, dest) + except OSError as exc: + if exc.errno != errno.EXDEV: + raise + _copy_file_atomic(src, dest) # already fsyncs data + dir + sets mode + return + _apply_mode(dest, mode) + # Parity with _copy_file_atomic: the renamed inode's data may still be in + # the page cache. Without this, a crash right after publish can leave a + # 0-byte / stale raw or source file that committed metadata points at, + # even though the directory entry (fsynced below) survived. + _fsync_file(dest) + _fsync_directory(dest.parent) + + +@dataclass +class MutationSnapshot: + """Snapshot of final KB paths touched by a mutation attempt.""" + + kb_dir: Path + backup_dir: Path + journal_path: Path + operation: str + details: dict = field(default_factory=dict) + entries: dict[Path, Path | None] = field(default_factory=dict) + attempts: int = 0 + # Dirs whose backup was hardlinked (in-process only; not persisted, so a + # crash-rebuilt snapshot leaves this empty and rollback falls back to the + # safe full-copy path). Drives O(touched) rollback via inode-diff restore. + hardlinked_dirs: set[Path] = field(default_factory=set) + + def _journal_data(self, status: str) -> dict: + return { + "version": 1, + "operation": self.operation, + "status": status, + "kb_dir": str(self.kb_dir), + "backup_dir": str(self.backup_dir), + "details": self.details, + "attempts": self.attempts, + "entries": [ + { + "target": str(target), + "backup": str(backup) if backup is not None else None, + } + for target, backup in self.entries.items() + ], + } + + def write_journal(self, status: str) -> None: + self.journal_path.parent.mkdir(parents=True, exist_ok=True) + atomic_write_json(self.journal_path, self._journal_data(status)) + + def mark_committed(self) -> None: + """Mark the journal committed without removing the backup. + + Call this the instant the mutation is durably applied (e.g. the + registry write has landed) so a subsequent + :func:`recover_pending_journals` discards the journal instead of + rolling it back. This is the commit signal; :meth:`discard` is the + post-commit cleanup that also removes the backup dir and journal + file and must itself be best-effort — it runs *after* the commit + point and its failure must never trigger a rollback. + """ + self.write_journal("committed") + + def rollback(self) -> None: + # Restore children before parents so directory deletes cannot remove + # paths that still need to be restored from a more specific backup. + for target, backup in sorted( + self.entries.items(), + key=lambda item: len(item[0].parts), + reverse=True, + ): + # A hardlinked dir backup supports an O(touched) inode-diff restore + # (leave untouched shared-inode files, only touch changed ones) — + # do NOT rmtree it first, which would discard those shared inodes. + if target.is_dir() and target in self.hardlinked_dirs: + if backup is not None and backup.is_dir(): + _restore_hardlinked_dir(backup, target) + else: + shutil.rmtree(target, ignore_errors=True) # new dir, no backup + continue + # Non-hardlinked (file, or copied dir): unconditional remove + restore. + if target.is_dir(): + shutil.rmtree(target, ignore_errors=True) + else: + target.unlink(missing_ok=True) + if backup is None: + continue + target.parent.mkdir(parents=True, exist_ok=True) + if backup.is_dir(): + shutil.copytree(backup, target) + else: + _copy_file_atomic(backup, target) + self.write_journal("rolled_back") + + def rollback_best_effort(self) -> Exception | None: + try: + self.rollback() + except Exception as exc: + logger.warning("Mutation rollback failed: %s", exc) + return exc + return None + + def discard(self) -> None: + # Best-effort post-commit/post-rollback cleanup: callers have already + # written a terminal status (mark_committed or rollback), so there is + # nothing to re-write here — doing so would be dead work and would + # silently downgrade a "rolled_back" journal to "committed" moments + # before it is deleted. + shutil.rmtree(self.backup_dir, ignore_errors=True) + self.journal_path.unlink(missing_ok=True) + + def discard_best_effort(self) -> Exception | None: + try: + self.discard() + except Exception as exc: + logger.warning("Mutation journal cleanup failed: %s", exc) + return exc + return None + + +def _restore_hardlinked_dir(backup: Path, target: Path) -> None: + """O(touched) restore for a hardlinked directory backup. + + The backup was built with ``os.link``, so live files the mutation never + touched still share the backup's inode — leave them. Only files the + mutation changed need work: new files (no backup counterpart) are removed, + modified files (atomic temp+replace → new inode) and deleted files are + restored from the backup's pre-mutation bytes. This avoids recopying the + whole tree on rollback — the cost that bit ``.openkb/files`` (the blob + store) and large concept/entity trees on every failed add. + + Degrades gracefully to a full copy if the backup isn't actually hardlinked + (e.g. the EXDEV/EACCES fallback fired at snapshot time): every file then has + a different inode, so every file is treated as modified and recopied. + """ + def _file_key(path: Path) -> tuple[int, int]: + st = path.stat() # follows symlinks; these trees hold regular files only + return (st.st_dev, st.st_ino) + + backup_files = {p.relative_to(backup): p for p in backup.rglob("*") if p.is_file()} + + # Pass 1: remove new + modified live regular files; leave untouched ones + # (they share the backup inode) in place. + if target.exists(): + for live in list(target.rglob("*")): + if not live.is_file(): + continue + counterpart = backup_files.get(live.relative_to(target)) + if counterpart is None or _file_key(live) != _file_key(counterpart): + live.unlink() + + # Pass 2: restore modified + deleted files from backup. + for rel, src in backup_files.items(): + dest = target / rel + if not dest.exists() or _file_key(dest) != _file_key(src): + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src, dest) + + # Pass 3: prune directories the mutation created that are now empty. + if target.exists(): + for d in sorted((p for p in target.rglob("*") if p.is_dir()), + key=lambda p: len(p.parts), reverse=True): + if not (backup / d.relative_to(target)).exists() and not any(d.iterdir()): + d.rmdir() + + +def snapshot_paths( + kb_dir: Path, + paths: list[Path], + *, + operation: str, + details: dict | None = None, + hardlink_dirs: set[Path] | None = None, +) -> MutationSnapshot: + """Snapshot final KB paths before a mutation starts. + + ``hardlink_dirs`` marks directories whose backup may be hardlinks instead + of copies (O(1), no per-file byte copy). A directory is only safe to list + here if every writer into it is either atomic temp+replace (new inode, so + the hardlink backup keeps the old bytes) or append-only. This is the + required caller contract for hardlinked dirs; any in-place writer into one + of those trees would silently corrupt the backup and make rollback a no-op + for that file. + """ + kb_dir = kb_dir.resolve() + hardlink_resolved = {p.resolve() for p in (hardlink_dirs or ())} + journal_id = uuid.uuid4().hex + backup_dir = kb_dir / ".openkb" / "staging" / f"rollback-{journal_id}" + backup_dir.mkdir(parents=True, exist_ok=False) + snapshot = MutationSnapshot( + kb_dir=kb_dir, + backup_dir=backup_dir, + journal_path=kb_dir / ".openkb" / "journal" / f"{journal_id}.json", + operation=operation, + details=details or {}, + ) + try: + for path in paths: + target = path.resolve() + if target in snapshot.entries: + continue + if not target.exists(): + snapshot.entries[target] = None + continue + rel = target.relative_to(kb_dir) + backup = backup_dir / rel + backup.parent.mkdir(parents=True, exist_ok=True) + if target.is_dir(): + if target in hardlink_resolved: + shutil.copytree(target, backup, copy_function=_hardlink_or_copy) + snapshot.hardlinked_dirs.add(target) + else: + shutil.copytree(target, backup) + else: + _copy_file_atomic(target, backup) + snapshot.entries[target] = backup + # The active journal is the recovery signal: once this exists, a future + # process can restore every recorded target even if the current one exits. + snapshot.write_journal("active") + except Exception: + # Partial snapshot: backup_dir exists on disk but no journal was + # written. recover_pending_journals only scans journals, so remove the + # orphan backup here — otherwise it leaks forever with nothing able to + # reach or clean it. + shutil.rmtree(backup_dir, ignore_errors=True) + raise + return snapshot + + +def _snapshot_from_journal(path: Path, data: dict) -> MutationSnapshot: + snapshot = MutationSnapshot( + kb_dir=Path(data["kb_dir"]), + backup_dir=Path(data["backup_dir"]), + journal_path=path, + operation=data.get("operation", "mutation"), + details=data.get("details") or {}, + ) + snapshot.entries = { + Path(item["target"]): Path(item["backup"]) if item.get("backup") else None + for item in data.get("entries", []) + } + snapshot.attempts = int(data.get("attempts", 0) or 0) + return snapshot + + +def recover_pending_journals(kb_dir: Path) -> list[str]: + """Rollback active journals left by an interrupted process.""" + journal_dir = kb_dir / ".openkb" / "journal" + if not journal_dir.is_dir(): + return [] + messages: list[str] = [] + for journal_path in sorted(journal_dir.glob("*.json")): + snapshot: MutationSnapshot | None = None + try: + data = json.loads(journal_path.read_text(encoding="utf-8")) + snapshot = _snapshot_from_journal(journal_path, data) + status = data.get("status", "active") + if status in {"committed", "rolled_back"}: + snapshot.discard() + messages.append(f"Cleaned terminal mutation journal {journal_path.name}.") + continue + snapshot.rollback() + snapshot.discard() + messages.append( + f"Rolled back interrupted {snapshot.operation} journal {journal_path.name}." + ) + except Exception as exc: + if snapshot is None: + # The journal couldn't be read or reconstructed (corrupt/empty/ + # stray .json, or missing the kb_dir/backup_dir keys recovery + # needs). There is nothing to roll back or retry — and leaving + # it in place would re-trigger this failure on every future lock + # acquisition (draining runs on first exclusive acquisition), + # bricking add/remove/recompile/chat for the whole KB. Best-effort + # remove the unrecoverable journal and log loudly; any backup_dir + # it referenced is unreachable now and may leak. + journal_path.unlink(missing_ok=True) + messages.append( + f"Unrecoverable mutation journal {journal_path.name} " + f"({type(exc).__name__}: {exc}); removed so it can't block " + f"recovery. The KB may need manual review." + ) + continue + # Rollback failed. Retry a bounded number of times across recovery + # runs (a later attempt may succeed once the cause clears, e.g. disk + # space freed), then give up: discard the journal + backup and log + # loudly so it can't leak forever re-doing the same failing rollback. + snapshot.attempts += 1 + if snapshot.attempts >= MAX_ROLLBACK_ATTEMPTS: + snapshot.discard() + messages.append( + f"GAVE UP on {snapshot.operation} journal {journal_path.name} after " + f"{snapshot.attempts} failed rollback(s): {type(exc).__name__}: {exc}. " + f"The KB may be in a partially-rolled-back state — manual review needed." + ) + else: + snapshot.write_journal("active") # persist incremented attempts + messages.append( + f"Rollback of {snapshot.operation} journal {journal_path.name} failed " + f"(attempt {snapshot.attempts}/{MAX_ROLLBACK_ATTEMPTS}): " + f"{type(exc).__name__}: {exc}; retained for retry." + ) + return messages + + +def publish_staged_tree(staging_dir: Path | None, kb_dir: Path) -> None: + """Move staged raw/source artifacts into their final KB locations.""" + if staging_dir is None or not staging_dir.exists(): + return + for rel in ("raw", "wiki/sources"): + src_root = staging_dir / rel + if not src_root.exists(): + continue + for src in src_root.rglob("*"): + if not src.is_file(): + continue + _publish_staged_file(src, kb_dir / rel / src.relative_to(src_root)) diff --git a/tests/test_add_command.py b/tests/test_add_command.py index f819900f..8dacec26 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -72,6 +72,24 @@ def test_add_single_file_calls_helper(self, tmp_path): runner.invoke(cli, ["add", str(doc)]) mock_add.assert_called_once_with(doc, kb_dir) + def test_add_single_file_compile_failure_rolls_back_converted_artifacts(self, tmp_path): + from openkb.cli import add_single_file + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + doc = tmp_path / "notes.md" + doc.write_text("# Notes\n\nBody", encoding="utf-8") + + with patch("openkb.agent.compiler.compile_short_doc", side_effect=RuntimeError("boom")), \ + patch("openkb.cli.time.sleep"), \ + patch("openkb.cli._setup_llm_key"): + outcome = add_single_file(doc, kb_dir) + + assert outcome == "failed" + assert not (kb_dir / "raw" / "notes.md").exists() + assert not (kb_dir / "wiki" / "sources" / "notes.md").exists() + assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + def test_add_directory_calls_helper_for_each_file(self, tmp_path): kb_dir = self._setup_kb(tmp_path) docs_dir = tmp_path / "docs" @@ -188,11 +206,15 @@ def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path): doc = tmp_path / "notes.md" doc.write_text("# Notes, edited") # new content hash != "old-hash" - # Compilation mocked out (asyncio.run), but convert_document REAL so + # Compilation mocked out, but convert_document REAL so # the legacy backfill actually happens on disk mid-pipeline. + def close_coro(coro): + if hasattr(coro, "close"): + coro.close() + runner = CliRunner() with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ - patch("openkb.cli.asyncio.run"): + patch("openkb.cli.asyncio.run", side_effect=close_coro): result = runner.invoke(cli, ["add", str(doc)]) assert "OK" in result.output @@ -255,19 +277,31 @@ def _setup_kb(self, tmp_path): (openkb_dir / "hashes.json").write_text(json.dumps({})) return tmp_path + def _cloud_data(self, doc_name="Cloud-Paper"): + from openkb.indexer import CloudImportData + + return CloudImportData( + doc_id="cloud-1", + doc_name=doc_name, + cloud_name="Cloud Paper.pdf", + description="desc", + tree={ + "doc_name": "Cloud Paper.pdf", + "doc_description": "desc", + "structure": [], + }, + all_pages=[{"page": 1, "content": "Cloud page", "images": []}], + ) + def test_registers_rawless_cloud_entry(self, tmp_path): import hashlib from openkb.cli import import_from_pageindex_cloud - from openkb.indexer import CloudImportResult from openkb.state import HashRegistry kb_dir = self._setup_kb(tmp_path) - result = CloudImportResult( - doc_id="cloud-1", doc_name="Cloud-Paper", name="Cloud Paper.pdf", - description="desc", - ) + cloud = self._cloud_data() - with patch("openkb.cli.import_cloud_document", return_value=result), \ + with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ patch("openkb.cli.compile_long_doc", return_value=None) as mock_compile, \ patch("openkb.cli._setup_llm_key"): outcome = import_from_pageindex_cloud("cloud-1", kb_dir) @@ -286,29 +320,25 @@ def test_registers_rawless_cloud_entry(self, tmp_path): def test_second_import_is_skipped(self, tmp_path): from openkb.cli import import_from_pageindex_cloud - from openkb.indexer import CloudImportResult kb_dir = self._setup_kb(tmp_path) - result = CloudImportResult( - doc_id="cloud-1", doc_name="Cloud-Paper", name="Cloud Paper.pdf", - description="desc", - ) + cloud = self._cloud_data() - with patch("openkb.cli.import_cloud_document", return_value=result) as mock_import, \ + with patch("openkb.cli.prepare_cloud_import", return_value=cloud) as mock_prepare, \ patch("openkb.cli.compile_long_doc", return_value=None), \ patch("openkb.cli._setup_llm_key"): import_from_pageindex_cloud("cloud-1", kb_dir) second = import_from_pageindex_cloud("cloud-1", kb_dir) assert second == "skipped" - assert mock_import.call_count == 1 # not fetched again + assert mock_prepare.call_count == 1 # not fetched again def test_import_failure_returns_failed_and_registers_nothing(self, tmp_path): from openkb.cli import import_from_pageindex_cloud from openkb.state import HashRegistry kb_dir = self._setup_kb(tmp_path) - with patch("openkb.cli.import_cloud_document", side_effect=RuntimeError("boom")), \ + with patch("openkb.cli.prepare_cloud_import", side_effect=RuntimeError("boom")), \ patch("openkb.cli._setup_llm_key"): outcome = import_from_pageindex_cloud("cloud-9", kb_dir) @@ -322,21 +352,15 @@ def test_compile_failure_cleans_up_orphan_artifacts(self, tmp_path): `openkb remove` couldn't reach them otherwise — and nothing is registered (so a retry isn't skipped).""" from openkb.cli import import_from_pageindex_cloud - from openkb.indexer import CloudImportResult from openkb.state import HashRegistry kb_dir = self._setup_kb(tmp_path) (kb_dir / "wiki" / "entities").mkdir(parents=True, exist_ok=True) (kb_dir / "wiki" / "index.md").write_text("# Index\n", encoding="utf-8") doc_name = "Cloud-Paper" - # Simulate the artifacts import_cloud_document writes before compile. - (kb_dir / "wiki" / "summaries" / f"{doc_name}.md").write_text("---\n---\n# s\n") - (kb_dir / "wiki" / "sources" / f"{doc_name}.json").write_text("[]") - result = CloudImportResult( - doc_id="cloud-1", doc_name=doc_name, name="Cloud Paper.pdf", description="d", - ) + cloud = self._cloud_data(doc_name=doc_name) - with patch("openkb.cli.import_cloud_document", return_value=result), \ + with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ patch("openkb.cli.compile_long_doc", side_effect=RuntimeError("boom")), \ patch("openkb.cli.time.sleep"), \ patch("openkb.cli._setup_llm_key"): diff --git a/tests/test_mutation.py b/tests/test_mutation.py new file mode 100644 index 00000000..f3dbf614 --- /dev/null +++ b/tests/test_mutation.py @@ -0,0 +1,520 @@ +from __future__ import annotations + +import errno +import os +from pathlib import Path + +import pytest + +from openkb.mutation import publish_staged_tree, recover_pending_journals, snapshot_paths + + +def test_recover_pending_add_journal_rolls_back_files(tmp_path): + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + new_file = kb_dir / "wiki" / "sources" / "doc.md" + + snapshot_paths( + kb_dir, + [target, new_file], + operation="add", + details={"doc_name": "doc"}, + ) + target.write_text("after", encoding="utf-8") + new_file.parent.mkdir(parents=True) + new_file.write_text("new", encoding="utf-8") + + messages = recover_pending_journals(kb_dir) + + assert any("Rolled back interrupted add journal" in message for message in messages) + assert target.read_text(encoding="utf-8") == "before" + assert not new_file.exists() + assert not any((openkb_dir / "journal").glob("*.json")) + + +def test_mark_committed_prevents_recovery_rollback(tmp_path): + """A snapshot marked committed must be discarded (not rolled back) by + recovery — the commit signal that protects a completed mutation from + being undone when post-commit cleanup fails. + """ + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [target], operation="add", details={"doc_name": "doc"} + ) + target.write_text("after", encoding="utf-8") # the "committed" mutation + snapshot.mark_committed() + + messages = recover_pending_journals(kb_dir) + + assert any("Cleaned terminal mutation journal" in m for m in messages) + assert target.read_text(encoding="utf-8") == "after" # NOT rolled back + assert not any((openkb_dir / "journal").glob("*.json")) + + +def test_snapshot_paths_cleans_backup_dir_on_failure(tmp_path): + """A partially-created snapshot must not leak its backup dir: on any + failure before the journal is written, snapshot_paths removes the + rollback dir it created (recover_pending_journals only scans journals + and could never reach it otherwise). + """ + kb_dir = tmp_path / "kb" + kb_dir.mkdir() + # A target that resolves OUTSIDE kb_dir makes relative_to(kb_dir) raise + # mid-loop, after backup_dir was already mkdir'd. + outside = tmp_path / "outside.txt" + outside.write_text("hi", encoding="utf-8") + + with pytest.raises(ValueError): + snapshot_paths(kb_dir, [outside], operation="add", details={}) + + staging = kb_dir / ".openkb" / "staging" + if staging.exists(): + assert not any(staging.iterdir()) # no orphan rollback- dir + + +def test_exclusive_lock_drains_active_journal_before_yielding(tmp_path): + """Recovery runs on every exclusive-lock acquisition, not just the add path. + + ``recover_pending_journals`` is wired into ``kb_lock``'s first exclusive + acquisition, so any mutation command — ``remove``/``recompile``/``lint``/ + ``chat``, all of which take ``kb_ingest_lock`` directly — drains a crashed + predecessor's active journal before it mutates. This is the regression + guard for the bug where an ``add`` crash left an active journal that an + intervening ``remove`` ignored and a later ``add`` then rolled back over + the remove's edits. + """ + from openkb.locks import kb_ingest_lock + + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + + # Simulate a crashed add: snapshot taken, file mutated, but mark_committed + # never ran — an ACTIVE journal is left on disk. + snapshot_paths(kb_dir, [target], operation="add", details={"doc_name": "doc"}) + target.write_text("after", encoding="utf-8") + + # Any exclusive-lock holder drains before its body runs. + with kb_ingest_lock(openkb_dir): + assert target.read_text(encoding="utf-8") == "before" + + assert target.read_text(encoding="utf-8") == "before" + assert not any((openkb_dir / "journal").glob("*.json")) + + +# --- publish_staged_tree: O(1) rename + durability (review #2) ------------- + +def _staged_raw(staging: Path, name: str, payload: bytes) -> Path: + src = staging / "raw" / name + src.parent.mkdir(parents=True, exist_ok=True) + src.write_bytes(payload) + return src + + +def test_publish_moves_staged_files_on_same_filesystem(tmp_path): + """Publish must rename staged files into place (O(1)) when staging and + the live KB share a filesystem, not stream-copy them. The surest + observable signal: after publish the staged source is GONE (moved), + whereas a copy leaves it behind. + """ + kb_dir = tmp_path / "kb" + staging = kb_dir / ".openkb" / "staging" / "add-x" + src = _staged_raw(staging, "doc.pdf", b"%PDF-1.4 payload") + + publish_staged_tree(staging, kb_dir) + + published = kb_dir / "raw" / "doc.pdf" + assert published.read_bytes() == b"%PDF-1.4 payload" + assert not src.exists() # moved, not copied + + +def test_published_files_keep_umask_mode_not_0600(tmp_path): + """Published artifacts must be created at the process umask mode, not + inherit tempfile.mkstemp's 0600. 0600 would make the KB's published + files owner-only and inconsistent with atomic_write_bytes. + """ + prev_umask = os.umask(0o022) + try: + kb_dir = tmp_path / "kb" + staging = kb_dir / ".openkb" / "staging" / "add-y" + _staged_raw(staging, "doc.pdf", b"data") + + publish_staged_tree(staging, kb_dir) + + from openkb.locks import _default_file_mode + + published = kb_dir / "raw" / "doc.pdf" + assert (published.stat().st_mode & 0o777) == _default_file_mode() + finally: + os.umask(prev_umask) + + +def test_publish_falls_back_to_copy_on_cross_filesystem(tmp_path, monkeypatch): + """When staging and the live KB are on different filesystems, the publish + rename raises EXDEV; publish must fall back to a durable copy and still + land the file with correct content at the destination. + + Only the cross-device publish rename raises EXDEV — the fallback copy's + own temp-file rename is on the destination's filesystem and must succeed, + so the fake raises exactly once then delegates to the real ``os.replace``. + """ + import openkb.mutation as mut + + kb_dir = tmp_path / "kb" + staging = kb_dir / ".openkb" / "staging" / "add-z" + _staged_raw(staging, "doc.pdf", b"cross-fs payload") + + real_replace = os.replace + calls = {"n": 0} + + def fake_replace(src, dst, *args, **kwargs): + calls["n"] += 1 + if calls["n"] == 1: + raise OSError(errno.EXDEV, "cross-device link") + return real_replace(src, dst, *args, **kwargs) + + monkeypatch.setattr(mut.os, "replace", fake_replace) + + publish_staged_tree(staging, kb_dir) + + assert calls["n"] >= 2 # publish rename failed, fallback copy renamed + assert (kb_dir / "raw" / "doc.pdf").read_bytes() == b"cross-fs payload" + + +# --- snapshot_paths: hardlinked dir backups (review #1) -------------------- + +def test_snapshot_hardlinks_marked_directory_trees(tmp_path): + """Directory snapshots the caller marks hardlink-safe must hardlink the + live files into the backup (shared inode) — O(1), no per-file byte copy — + instead of streaming a fresh copy. This is what makes per-file concept / + entity / PageIndex-blob snapshots cheap on a large KB. + """ + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + existing = concepts / "old.md" + existing.write_text("old", encoding="utf-8") + live_inode = existing.stat().st_ino + + snapshot = snapshot_paths( + kb_dir, + [concepts], + operation="add", + details={}, + hardlink_dirs={concepts}, + ) + try: + backup_file = snapshot.backup_dir / "wiki" / "concepts" / "old.md" + assert backup_file.exists() + assert backup_file.stat().st_ino == live_inode # hardlink, not copy + finally: + snapshot.discard_best_effort() + + +def test_hardlinked_dir_rollback_correct_after_atomic_writes(tmp_path): + """With a hardlinked dir backup, an atomic (temp+replace) rewrite of an + existing page and creation of a new page must still roll back correctly: + existing page restored to its pre-snapshot content, new page removed. + + This is the correctness invariant hardlinking relies on — the wiki + writers must go through atomic temp+replace so the hardlink backup keeps + pointing at the old inode while the live file moves to a new one. + """ + from openkb.locks import atomic_write_text + + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + existing = concepts / "old.md" + existing.write_text("old-content", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + # Mirror the (now atomic) compiler writers: rewrite the existing page via + # atomic temp+replace, and add a brand-new page the doc creates. + atomic_write_text(existing, "rewritten-content") + (concepts / "new.md").write_text("new", encoding="utf-8") + + snapshot.rollback() + snapshot.discard_best_effort() + + assert existing.read_text(encoding="utf-8") == "old-content" + assert not (concepts / "new.md").exists() + + +def test_openkb_files_tree_is_hardlinked(tmp_path): + """The PageIndex blob store (.openkb/files) is append-only across docs — + each add creates new {doc_id} blobs and never modifies existing ones — so + it is hardlink-safe and must be snapshotted via hardlinks, not copied. + """ + kb_dir = tmp_path + blobs = kb_dir / ".openkb" / "files" / "col" + blobs.mkdir(parents=True) + existing = blobs / "an-existing-doc.pdf" + existing.write_bytes(b"existing-blob") + live_inode = existing.stat().st_ino + + snapshot = snapshot_paths( + kb_dir, [kb_dir / ".openkb" / "files"], operation="add", + details={}, hardlink_dirs={kb_dir / ".openkb" / "files"}, + ) + try: + backup = ( + snapshot.backup_dir / ".openkb" / "files" / "col" / "an-existing-doc.pdf" + ) + assert backup.stat().st_ino == live_inode + finally: + snapshot.discard_best_effort() + + +def test_concept_writer_is_atomic_so_hardlink_rollback_restores(tmp_path): + """Regression guard for the hardlink invariant: the wiki page writers must + go through atomic temp+replace (new inode). If any regresses to in-place + ``write_text`` (same inode), the hardlinked snapshot backup aliases that + inode and rollback restores the MUTATED content instead of the original. + + Exercises _write_concept's update path — the canonical in-place modify — + through a real hardlinked snapshot + rollback. + """ + from openkb.agent.compiler import _write_concept + + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + existing = concepts / "topic.md" + existing.write_text("---\nsources: []\n---\n\noriginal body", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + # The compiler rewrites the concept page as part of the doc ingest. If this + # write is in-place, the hardlink backup is corrupted and rollback fails. + _write_concept(kb_dir / "wiki", "topic", "rewritten body", "summaries/doc.md", is_update=True) + + snapshot.rollback() + snapshot.discard_best_effort() + + restored = existing.read_text(encoding="utf-8") + assert "original body" in restored + assert "rewritten body" not in restored + + +def test_fix_broken_links_is_atomic_so_hardlink_rollback_restores(tmp_path): + """Regression guard for lint --fix/remove cleanup writers. + + ``fix_broken_links`` rewrites concept/entity pages outside the add path. If + it writes in place, a hardlinked snapshot aliases the live inode and rollback + restores the cleaned content instead of the original page. + """ + from openkb.lint import fix_broken_links + + kb_dir = tmp_path + wiki = kb_dir / "wiki" + concepts = wiki / "concepts" + concepts.mkdir(parents=True) + page = concepts / "topic.md" + page.write_text("# Topic\n\nGhost [[concepts/missing]] link.\n", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + fix_broken_links(wiki, restrict_to=[page]) + + snapshot.rollback() + snapshot.discard_best_effort() + + restored = page.read_text(encoding="utf-8") + assert "[[concepts/missing]]" in restored + assert "Ghost link" not in restored + + +def test_hardlink_falls_back_to_copy_on_eacces(tmp_path, monkeypatch): + """A hardlink blocked by a Windows ACL / OneDrive sync folder surfaces as + EACCES, not EXDEV/EPERM. _hardlink_or_copy must fall back to a real copy so + the snapshot still succeeds — otherwise the POSIX-oriented errno set aborts + the whole add on Windows where a plain copy would have worked. + """ + import openkb.mutation as mut + + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + (concepts / "page.md").write_text("content", encoding="utf-8") + + def link_eacces(src, dst, *args, **kwargs): + raise OSError(errno.EACCES, "simulated Windows ACL hardlink block") + monkeypatch.setattr(mut.os, "link", link_eacces) + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + try: + backup = snapshot.backup_dir / "wiki" / "concepts" / "page.md" + assert backup.read_text(encoding="utf-8") == "content" # copy fallback landed + # It is a real copy, not a hardlink (distinct inode). + assert backup.stat().st_ino != (concepts / "page.md").stat().st_ino + finally: + snapshot.discard_best_effort() + + +# --- recover_pending_journals: bounded retry (pre-existing issue) ---------- + +def test_recovery_gives_up_on_persistently_failing_journal(tmp_path, monkeypatch): + """A journal whose rollback keeps failing (e.g. persistent ENOSPC) must + not be retried forever — otherwise the backup dir + journal leak and every + future lock acquisition re-attempts the same failing rollback. After + MAX_ROLLBACK_ATTEMPTS failed attempts recovery discards it with a loud + message so a human can intervene, bounding the on-disk retention. + """ + import openkb.mutation as mut + + kb_dir = tmp_path + (kb_dir / ".openkb").mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + # Leave an ACTIVE journal (simulating a crashed add). + snapshot_paths(kb_dir, [target], operation="add", details={}) + target.write_text("after", encoding="utf-8") + + # Make rollback deterministically fail. + def boom(self): + raise OSError("persistent rollback failure") + monkeypatch.setattr(mut.MutationSnapshot, "rollback", boom) + + for _ in range(mut.MAX_ROLLBACK_ATTEMPTS + 1): + recover_pending_journals(kb_dir) + + # Given up + discarded, not retained forever. + journal_dir = kb_dir / ".openkb" / "journal" + assert not any(journal_dir.glob("*.json")) + + +@pytest.mark.parametrize( + "payload", + [ + "", # empty file -> JSONDecodeError + "{not json", # truncated/invalid -> JSONDecodeError + '{"status": "active"}', # valid JSON missing kb_dir/backup_dir -> KeyError + '{"not": "a journal"}', # valid JSON, wrong shape -> KeyError + ], +) +def test_recover_skips_malformed_journal_without_bricking_lock(tmp_path, payload): + """A corrupt/empty/stray .json in journal/ must not crash recovery. + + ``snapshot`` is assigned inside the try (after json.loads / + _snapshot_from_journal), but the except block referenced it unconditionally + — so a single malformed journal raised NameError out of recovery, and thus + out of every exclusive kb_lock acquisition (draining runs on first + acquisition), bricking add/remove/recompile/chat for the whole KB. Recovery + must instead drop the unrecoverable journal, log loudly, and keep going so + the lock still acquires. + """ + from openkb.locks import kb_ingest_lock + + kb_dir = tmp_path + journal_dir = kb_dir / ".openkb" / "journal" + journal_dir.mkdir(parents=True) + (journal_dir / "deadbeef.json").write_text(payload, encoding="utf-8") + + messages = recover_pending_journals(kb_dir) # must not raise NameError + assert any("Unrecoverable mutation journal" in m for m in messages) + assert not any(journal_dir.glob("*.json")) # bad journal removed, not retained + + # The whole point: the KB's mutation lock still acquires afterwards. + with kb_ingest_lock(kb_dir / ".openkb"): + pass + + +# --- O(touched) rollback for hardlinked dirs (pre-existing issue) ---------- + +def test_hardlinked_dir_rollback_leaves_untouched_files_in_place(tmp_path): + """O(touched) rollback: an untouched file in a hardlinked dir shares the + backup's inode, so rollback must leave it in place (same inode) instead + of delete + recopy. A full-copy rollback would give it a new inode — this + is the regression driver for the inode-aware restore. + """ + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + keep = concepts / "keep.md" + keep.write_text("keep", encoding="utf-8") + keep_inode = keep.stat().st_ino + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + # keep.md is not mutated — it stays shared-inode with the backup. + snapshot.rollback() + snapshot.discard_best_effort() + + assert keep.exists() + assert keep.read_text(encoding="utf-8") == "keep" + assert keep.stat().st_ino == keep_inode # NOT recopied + + +def test_hardlinked_dir_rollback_removes_new_and_restores_modified(tmp_path): + from openkb.locks import atomic_write_text + + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + (concepts / "old.md").write_text("old", encoding="utf-8") + page = concepts / "page.md" + page.write_text("original", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + # Commit created a new page and atomically rewrote an existing one. + (concepts / "new.md").write_text("new", encoding="utf-8") + atomic_write_text(page, "rewritten") + + snapshot.rollback() + snapshot.discard_best_effort() + + assert (concepts / "old.md").read_text(encoding="utf-8") == "old" + assert page.read_text(encoding="utf-8") == "original" + assert not (concepts / "new.md").exists() + + +def test_hardlinked_dir_rollback_prunes_new_nested_blob_dirs(tmp_path): + """PageIndex blob-store scenario: an existing blob is untouched (shared + inode, left in place), while a new doc's blob + its nested images subdir + are removed on rollback — including the now-empty newdoc/ directory. + """ + kb_dir = tmp_path + files = kb_dir / ".openkb" / "files" + (files / "col").mkdir(parents=True) + existing = files / "col" / "existing.pdf" + existing.write_bytes(b"existing") + existing_inode = existing.stat().st_ino + + snapshot = snapshot_paths( + kb_dir, [files], operation="add", details={}, hardlink_dirs={files}, + ) + (files / "col" / "newdoc.pdf").write_bytes(b"new") + (files / "col" / "newdoc" / "images").mkdir(parents=True) + (files / "col" / "newdoc" / "images" / "p1.png").write_bytes(b"png") + + snapshot.rollback() + snapshot.discard_best_effort() + + assert existing.read_bytes() == b"existing" + assert existing.stat().st_ino == existing_inode # untouched, not recopied + assert not (files / "col" / "newdoc.pdf").exists() + assert not (files / "col" / "newdoc").exists() # empty new dir pruned From 7823795451793df77bb1ded2658212ef1d3d1155 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Tue, 30 Jun 2026 20:01:48 +0800 Subject: [PATCH 2/2] fix: stage URL add mutations Keep URL ingests on add_single_file's staged conversion path so source artifacts are published only after the mutation snapshot exists and can roll back on failure. Add regression coverage for URL failure cleanup while preserving downloaded raw files for retry, and replace legacy asyncio.run mocks with async compiler stubs to avoid unawaited coroutine warnings in the touched tests. --- openkb/cli.py | 11 +++--- tests/test_add_command.py | 9 ++++- tests/test_url_ingest.py | 82 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 91 insertions(+), 11 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index ff4e2228..f4297469 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -951,17 +951,16 @@ def add(ctx, path, from_pageindex_cloud): click.echo("Provide a PATH or use --from-pageindex-cloud .") return - # URL ingest: download into raw/ first, then call add_single_file - # explicitly so we can clean up the just-downloaded file if it - # turns out to be a duplicate (registry already has its hash). - # Without this, re-adding the same URL leaves an orphan in raw/ - # that the registry can't reach via openkb remove. + # URL ingest: download into raw/ first, then call add_single_file explicitly. + # Keep staged conversion enabled so converted source artifacts do not touch + # the live KB before the mutation snapshot exists. The tri-state outcome + # still lets us clean up the just-downloaded raw file on dedup. from openkb.url_ingest import looks_like_url, fetch_url_to_raw if looks_like_url(path): fetched = fetch_url_to_raw(path, kb_dir) if fetched is None: return - outcome = add_single_file(fetched, kb_dir, stage=False) + outcome = add_single_file(fetched, kb_dir) # Only clean up on dedup-skip. On "failed" we keep the file so # the user can retry (e.g. transient LLM error during compile) # without re-downloading — and so they don't lose data when diff --git a/tests/test_add_command.py b/tests/test_add_command.py index 8dacec26..a73ed4ac 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -167,12 +167,17 @@ def test_add_short_doc_runs_compiler(self, tmp_path): "stale-old-hash", {"name": "test.md", "doc_name": "test", "type": "md"} ) + compile_calls = [] + + async def compile_noop(*args, **kwargs): + compile_calls.append((args, kwargs)) + runner = CliRunner() with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ patch("openkb.cli.convert_document", return_value=mock_result), \ - patch("openkb.cli.asyncio.run") as mock_arun: + patch("openkb.agent.compiler.compile_short_doc", new=compile_noop): result = runner.invoke(cli, ["add", str(doc)]) - mock_arun.assert_called_once() + assert len(compile_calls) == 1 assert "OK" in result.output import json as json_mod diff --git a/tests/test_url_ingest.py b/tests/test_url_ingest.py index 1b8548ee..0b4cffc5 100644 --- a/tests/test_url_ingest.py +++ b/tests/test_url_ingest.py @@ -488,8 +488,11 @@ def test_add_single_file_returns_added_on_success(tmp_path): is_long_doc=False, file_hash="cafe" * 16, ) + async def compile_noop(*args, **kwargs): + return None + with patch("openkb.cli.convert_document", return_value=mock_result), \ - patch("openkb.cli.asyncio.run"): + patch("openkb.agent.compiler.compile_short_doc", new=compile_noop): outcome = add_single_file(doc, tmp_path) assert outcome == "added" @@ -538,9 +541,12 @@ def test_add_single_file_returns_failed_on_pipeline_error(tmp_path): is_long_doc=False, file_hash="cafe" * 16, ) + async def fail_compile(*args, **kwargs): + raise RuntimeError("LLM 503") + # Make both compile attempts raise to drive the failure path. with patch("openkb.cli.convert_document", return_value=mock_result), \ - patch("openkb.cli.asyncio.run", side_effect=RuntimeError("LLM 503")), \ + patch("openkb.agent.compiler.compile_short_doc", new=fail_compile), \ patch("openkb.cli.time.sleep"): outcome = add_single_file(doc, tmp_path) @@ -580,6 +586,34 @@ def test_url_ingest_cleans_up_orphan_on_dedup_skip(tmp_path, monkeypatch): assert not fetched_path.exists() +def test_url_ingest_uses_staged_add_for_crash_safe_conversion(tmp_path): + """URL ingest must keep add_single_file's default staged conversion path. + + Passing stage=False writes converted source artifacts into the live KB before + the mutation snapshot exists, which leaves URL adds outside the rollback + contract. + """ + from click.testing import CliRunner + from openkb.cli import cli + + (tmp_path / ".openkb").mkdir() + (tmp_path / ".openkb" / "config.yaml").write_text("model: gpt-4o-mini\n") + (tmp_path / ".openkb" / "hashes.json").write_text("{}") + (tmp_path / "raw").mkdir() + + fetched_path = tmp_path / "raw" / "paper.md" + fetched_path.write_text("# Paper", encoding="utf-8") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=tmp_path), \ + patch("openkb.url_ingest.fetch_url_to_raw", return_value=fetched_path), \ + patch("openkb.cli.add_single_file", return_value="added") as mock_add: + result = runner.invoke(cli, ["add", "https://example.com/paper"]) + + assert result.exit_code == 0, result.output + mock_add.assert_called_once_with(fetched_path, tmp_path) + + def test_url_ingest_keeps_raw_file_on_pipeline_failure(tmp_path): """The point of the tri-state return: a pipeline failure (e.g. LLM timeout during compilation) must NOT delete the downloaded file — @@ -607,11 +641,14 @@ def test_url_ingest_keeps_raw_file_on_pipeline_failure(tmp_path): is_long_doc=False, file_hash="cafe" * 16, ) + async def fail_compile(*args, **kwargs): + raise RuntimeError("LLM 503") + runner = CliRunner() with patch("openkb.cli._find_kb_dir", return_value=tmp_path), \ patch("openkb.url_ingest.fetch_url_to_raw", return_value=fetched_path), \ patch("openkb.cli.convert_document", return_value=mock_result), \ - patch("openkb.cli.asyncio.run", side_effect=RuntimeError("LLM 503")), \ + patch("openkb.agent.compiler.compile_short_doc", new=fail_compile), \ patch("openkb.cli.time.sleep"): result = runner.invoke(cli, ["add", "https://example.com/paper.pdf"]) @@ -619,3 +656,42 @@ def test_url_ingest_keeps_raw_file_on_pipeline_failure(tmp_path): assert "[ERROR] Compilation failed" in result.output # The raw file must be preserved so the user can retry. assert fetched_path.exists() + + +def test_url_ingest_pipeline_failure_rolls_back_converted_source_but_keeps_download(tmp_path): + """A URL add that fails after conversion should not leave source artifacts. + + The downloaded raw file is intentionally kept for retry, but converted + artifacts must be published only under the mutation journal so rollback can + remove them. + """ + from click.testing import CliRunner + from openkb.cli import cli + + (tmp_path / ".openkb").mkdir() + (tmp_path / ".openkb" / "config.yaml").write_text("model: gpt-4o-mini\n") + (tmp_path / ".openkb" / "hashes.json").write_text("{}") + (tmp_path / "raw").mkdir() + (tmp_path / "wiki" / "summaries").mkdir(parents=True) + (tmp_path / "wiki" / "sources").mkdir(parents=True) + (tmp_path / "wiki" / "concepts").mkdir(parents=True) + (tmp_path / "wiki" / "log.md").write_text("", encoding="utf-8") + + fetched_path = tmp_path / "raw" / "paper.md" + fetched_path.write_text("# Paper\n\nBody", encoding="utf-8") + + async def fail_compile(*args, **kwargs): + raise RuntimeError("LLM 503") + + runner = CliRunner() + with patch("openkb.cli._find_kb_dir", return_value=tmp_path), \ + patch("openkb.url_ingest.fetch_url_to_raw", return_value=fetched_path), \ + patch("openkb.agent.compiler.compile_short_doc", new=fail_compile), \ + patch("openkb.cli.time.sleep"), \ + patch("openkb.cli._setup_llm_key"): + result = runner.invoke(cli, ["add", "https://example.com/paper"]) + + assert result.exit_code == 0, result.output + assert "[ERROR] Compilation failed" in result.output + assert fetched_path.exists() + assert not (tmp_path / "wiki" / "sources" / "paper.md").exists()