From ee2291402f0f37bd30056c080e0a00634d886519 Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Sat, 4 Jul 2026 09:49:30 +0800 Subject: [PATCH 1/2] refactor(mutation): split lock-free prepare from serial commit for directory ingest Stage 4 of the parallel-add roadmap (#151). `openkb add ` now routes through a worker-safe prepare / serial commit split: prepare converts into private .openkb/staging/prepare output without the KB mutation lock and without touching official raw/, wiki/, or .openkb/ state; the serial owner commits under kb_ingest_lock, resolving the final name and publishing. `--jobs` (Stage 5) is not included. Batch coordinator (openkb/cli.py): - add_directory_serial runs a serial prepare -> commit loop. `add` already holds kb_ingest_lock for its whole body via @_with_kb_lock, so prepare runs under that one outer lock; the reaper (first lock acquisition, before this batch's staging exists) cannot collide with a live prepare. Per-file failure continues the batch; DirtyRollbackError stops it. Prepare (openkb/add_prepare.py, openkb/converter.py): - convert_document_for_prepare: lock-free conversion into private staging under a placeholder doc_name (sanitized stem); returns ConvertResult without registering the hash or resolving the final name. - prepare_document owns the staging-dir lifecycle (rmtree on interrupt). prepare/commit are coordinator-internal, called only by add_directory_serial. Serial commit (openkb/cli.py): - commit_prepared_document requires kb_ingest_lock held (reentrant acquire). - The prepared branch of _add_single_file_locked re-validates under the lock because prepare ran without it: re-decides skip from live registry state, re-hashes the source, and re-converts when the source changed or prepare had short-circuited with no artifacts (the stale-prepare contract). - _retarget_prepared_document_artifacts renames staged raw/source/images from the placeholder name to the owner-resolved final name. Reaper (openkb/locks.py): - _reap_prepare_staging reclaims orphaned prepare staging at first exclusive acquisition; skips symlinks, unlinks stray files, and logs INFO on success / WARNING on failure. No per-prepare marker is needed: directory add holds the lock across its whole batch, so a live batch's staging is never visible to another reaper. Tests: prepare writes only private staging and takes no lock; commit resolves the final name under the owner and requires its lock; the reaper reaps orphans and skips symlinks; stale skip and source-changed (TOCTOU) re-prepare at commit; `add ` end-to-end lands every file via prepare/commit; a prepare failure isolates the file while the batch continues. --- openkb/add_prepare.py | 66 ++++++++++++ openkb/cli.py | 178 +++++++++++++++++++++++++++--- openkb/converter.py | 220 ++++++++++++++++++++++---------------- openkb/locks.py | 36 +++++++ tests/test_add_command.py | 125 ++++++++++++++++++---- tests/test_add_prepare.py | 194 +++++++++++++++++++++++++++++++++ tests/test_converter.py | 25 +++++ tests/test_locks.py | 82 ++++++++++++++ 8 files changed, 797 insertions(+), 129 deletions(-) create mode 100644 openkb/add_prepare.py create mode 100644 tests/test_add_prepare.py diff --git a/openkb/add_prepare.py b/openkb/add_prepare.py new file mode 100644 index 00000000..ad912711 --- /dev/null +++ b/openkb/add_prepare.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import shutil +import uuid +from dataclasses import dataclass +from pathlib import Path + +from openkb.converter import ConvertResult, _sanitize_stem, convert_document_for_prepare + + +@dataclass(slots=True) +class PreparedDocument: + input_index: int + source_path: Path + staging_dir: Path + result: ConvertResult + + @property + def doc_name_candidate(self) -> str | None: + return self.result.doc_name + + +def _prepare_staging_dir(kb_dir: Path, input_index: int, source: Path) -> Path: + safe = _sanitize_stem(source.stem) + path = ( + kb_dir + / ".openkb" + / "staging" + / "prepare" + / f"{input_index:06d}-{safe}-{uuid.uuid4().hex[:8]}" + ) + path.mkdir(parents=True, exist_ok=False) + return path + + +def prepare_document(source: Path, kb_dir: Path, *, input_index: int) -> PreparedDocument: + """Convert ``source`` into private staging without the KB mutation lock. + + Coordinator-internal: callers must run this under the serial batch owner's + held ``kb_ingest_lock`` (the ``add`` command acquires it via + ``@_with_kb_lock``). The reaper reclaims any staging present at the owner's + first lock acquisition, so once this runs under the held lock the staging + tree is private to this batch. + """ + staging_dir = _prepare_staging_dir(kb_dir, input_index, source) + try: + result = convert_document_for_prepare(source, kb_dir, staging_dir=staging_dir) + return PreparedDocument( + input_index=input_index, + source_path=source, + staging_dir=staging_dir, + result=result, + ) + except BaseException: + shutil.rmtree(staging_dir, ignore_errors=True) + raise + + +def _clear_staging_artifacts(staging_dir: Path) -> None: + """Drop convertible artifacts (raw/, wiki/) from a prepare staging dir. + + Used before re-converting into the same staging dir at commit time so stale + images/raw from the prior prepare don't leak into the re-convert. + """ + for sub in ("raw", "wiki"): + shutil.rmtree(staging_dir / sub, ignore_errors=True) diff --git a/openkb/cli.py b/openkb/cli.py index f39d0b44..0fa8e252 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -62,9 +62,11 @@ def filter(self, record: logging.LogRecord) -> bool: ) from openkb.add_coordinator import _cleanup_staging_dirs from openkb.converter import ( + ConvertResult, _registry_path, _sanitize_stem, convert_document, + resolve_doc_name, resolve_doc_name_from_key, ) from openkb.indexer import ( @@ -72,7 +74,13 @@ def filter(self, record: logging.LogRecord) -> bool: _write_long_doc_artifacts, prepare_cloud_import, ) -from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock +from openkb.locks import ( + atomic_write_json, + atomic_write_text, + kb_ingest_lock, + kb_ingest_lock_held, + kb_read_lock, +) from openkb.log import append_log from openkb.mutation import publish_staged_tree from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS @@ -452,8 +460,113 @@ def add_single_file( return _add_single_file_locked(file_path, kb_dir, stage=stage) +def commit_prepared_document(prepared, kb_dir: Path) -> Literal["added", "skipped", "failed"]: + """Commit a prepared document under the serial KB mutation owner. + + Coordinator-internal: called only by the serial batch owner + (:func:`add_directory_serial`), which already holds ``kb_ingest_lock`` for + the whole prepare+commit batch. The reentrant acquire below is only safe + because of that. Propagates :class:`DirtyRollbackError` if the mutation's + rollback failed, so the batch owner can stop on dirty state. + """ + openkb_dir = kb_dir / ".openkb" + if not kb_ingest_lock_held(openkb_dir): + raise RuntimeError("commit_prepared_document requires the caller to hold kb_ingest_lock") + with kb_ingest_lock(openkb_dir): + return _add_single_file_locked( + prepared.source_path, + kb_dir, + stage=True, + prepared=prepared, + ) + + +def add_directory_serial(files: list[Path], kb_dir: Path) -> None: + """Serially prepare and commit each file in input order. + + Stage 4 batch coordinator for ``openkb add ``. Must be called with + ``kb_ingest_lock`` already held — the ``add`` command's ``@_with_kb_lock`` + decorator holds it for the whole batch. Prepares are lock-free and run under + that held lock, so the reaper (which fires once at the owner's first lock + acquisition, before this batch's staging exists) cannot collide with a live + prepare. Stage 5 will add a parallel-prepare variant alongside this one. + + A per-file prepare/commit failure is logged and the batch continues; a + :class:`DirtyRollbackError` propagates so the caller can stop the batch on + dirty state. + """ + from openkb.add_prepare import prepare_document + + total = len(files) + for i, f in enumerate(files, 1): + click.echo(f"\n[{i}/{total}] ", nl=False) + try: + prepared = prepare_document(f, kb_dir, input_index=i - 1) + except Exception as exc: + click.echo(f" [ERROR] Prepare failed: {exc}") + logger.debug("Prepare traceback:", exc_info=True) + continue + commit_prepared_document(prepared, kb_dir) + + +def _retarget_prepared_document_artifacts(prepared, doc_name: str) -> None: + result = prepared.result + old_doc_name = prepared.doc_name_candidate or prepared.source_path.stem + if old_doc_name == doc_name: + return + + suffix = prepared.source_path.suffix.lower() + raw_dir = prepared.staging_dir / "raw" + old_raw = raw_dir / f"{old_doc_name}{suffix}" + new_raw = raw_dir / f"{doc_name}{suffix}" + if old_raw.exists(): + old_raw.rename(new_raw) + result.raw_path = new_raw + # Defensive: if prepare ever writes raw_path off the + # convention, retarget via the recorded path rather than silently no-op. + elif result.raw_path is not None and result.raw_path.exists(): + new_raw = result.raw_path.with_name(f"{doc_name}{result.raw_path.suffix}") + result.raw_path.rename(new_raw) + result.raw_path = new_raw + + sources_dir = prepared.staging_dir / "wiki" / "sources" + old_source = sources_dir / f"{old_doc_name}.md" + new_source = sources_dir / f"{doc_name}.md" + if old_source.exists(): + text = old_source.read_text(encoding="utf-8") + text = text.replace(f"sources/images/{old_doc_name}/", f"sources/images/{doc_name}/") + old_source.write_text(text, encoding="utf-8") + old_source.rename(new_source) + result.source_path = new_source + # Defensive: if prepare ever writes source_path off the .md + # convention, retarget via the recorded path rather than silently no-op. + elif result.source_path is not None and result.source_path.exists(): + new_source = result.source_path.with_name(f"{doc_name}{result.source_path.suffix}") + result.source_path.rename(new_source) + result.source_path = new_source + + images_dir = sources_dir / "images" + old_images = images_dir / old_doc_name + new_images = images_dir / doc_name + if old_images.exists() and old_images != new_images: + old_images.rename(new_images) + + result.doc_name = doc_name + + +def _convert_or_fail(src: Path, kb_dir: Path, staging_dir: Path | None) -> ConvertResult | None: + """Convert ``src``; on failure log, clean staging, and return ``None``.""" + try: + return convert_document(src, kb_dir, staging_dir=staging_dir) + except Exception as exc: + click.echo(f" [ERROR] Conversion failed: {exc}") + logger.debug("Conversion traceback:", exc_info=True) + _cleanup_staging_dirs([staging_dir]) + return None + + def _add_single_file_locked( - file_path: Path, kb_dir: Path, *, stage: bool = True + file_path: Path, kb_dir: Path, *, stage: bool = True, prepared=None ) -> Literal["added", "skipped", "failed"]: """Convert, index, and compile a single document into the knowledge base. @@ -478,18 +591,50 @@ def _add_single_file_locked( config = load_config(openkb_dir / "config.yaml") _setup_llm_key(kb_dir) model: str = config.get("model", DEFAULT_CONFIG["model"]) + # One registry instance covers the prepared-branch re-validation and the + # commit-time registration; the held kb_ingest_lock means nothing else + # mutates hashes.json in between, so reloading would just re-parse JSON. + registry = HashRegistry(openkb_dir / "hashes.json") - staging_dir = _staging_dir_for(kb_dir, file_path) if stage else None + staging_dir = ( + prepared.staging_dir + if prepared is not None + else (_staging_dir_for(kb_dir, file_path) if stage else None) + ) # 2. Convert document into staging when possible. click.echo(f"Adding: {file_path.name}") - try: - result = convert_document(file_path, kb_dir, staging_dir=staging_dir) - except Exception as exc: - click.echo(f" [ERROR] Conversion failed: {exc}") - logger.debug("Conversion traceback:", exc_info=True) - _cleanup_staging_dirs([staging_dir]) - return "failed" + result: ConvertResult + if prepared is None: + converted = _convert_or_fail(file_path, kb_dir, staging_dir) + if converted is None: + return "failed" + result = converted + else: + result = prepared.result + # Prepare ran without the lock: re-decide skip from live registry state + # (not the cached prepare-time decision) and re-validate the source. A + # hash removed between prepare and commit, or a source edited in that + # window, leaves the staged artifacts missing/stale — re-convert. + result.skipped = bool(result.file_hash and registry.is_known(result.file_hash)) + if not result.skipped: + source_intact = ( + result.file_hash is not None + and file_path.exists() + and HashRegistry.hash_file(file_path) == result.file_hash + and result.raw_path is not None + ) + if source_intact: + doc_name = resolve_doc_name(file_path, kb_dir, registry, persist_legacy=False) + _retarget_prepared_document_artifacts(prepared, doc_name) + else: + from openkb.add_prepare import _clear_staging_artifacts + + _clear_staging_artifacts(prepared.staging_dir) + converted = _convert_or_fail(file_path, kb_dir, staging_dir) + if converted is None: + return "failed" + result = converted if result.skipped: click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") @@ -569,7 +714,6 @@ def commit_body(snapshot) -> None: # Register hash only after successful compilation. if result.file_hash: - registry = HashRegistry(openkb_dir / "hashes.json") doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") meta = { "name": file_path.name, @@ -1056,9 +1200,15 @@ def add(ctx, path, from_pageindex_cloud): return total = len(files) click.echo(f"Found {total} supported file(s) in {path}.") - for i, f in enumerate(files, 1): - click.echo(f"\n[{i}/{total}] ", nl=False) - add_single_file(f, kb_dir) + from openkb.add_coordinator import DirtyRollbackError + + try: + add_directory_serial(files, kb_dir) + except DirtyRollbackError as exc: + # A normal "failed"/"skipped" outcome returns and the batch + # continues; a dirty rollback is fatal and stops the batch. + click.echo(f" [ERROR] {exc}") + ctx.exit(1) else: if target.suffix.lower() not in SUPPORTED_EXTENSIONS: click.echo( diff --git a/openkb/converter.py b/openkb/converter.py index 710d7a77..9ab79e91 100644 --- a/openkb/converter.py +++ b/openkb/converter.py @@ -139,109 +139,143 @@ def get_pdf_page_count(path: Path) -> int: return doc.page_count -def convert_document( +def _convert_document_impl( src: Path, kb_dir: Path, *, - staging_dir: Path | None = None, + staging_dir: Path | None, + resolve_final_name: bool, ) -> ConvertResult: - """Convert a document and integrate it into the knowledge base. - - Steps: - 1. Hash-check — skip if already known. - 2. Copy source to ``raw/``. - 3. If PDF and page count >= threshold → return :attr:`ConvertResult.is_long_doc`. - 4. If ``.md`` — read, process relative images, save to ``wiki/sources/``. - 5. Otherwise — run MarkItDown, extract base64 images, save to ``wiki/sources/``. - 6. Register hash in the registry. - """ - with kb_ingest_lock(kb_dir / ".openkb"): - # ------------------------------------------------------------------ - # Load config & state - # ------------------------------------------------------------------ - openkb_dir = kb_dir / ".openkb" - config = load_config(openkb_dir / "config.yaml") - threshold: int = config.get("pageindex_threshold", 20) - artifact_root = staging_dir if staging_dir is not None else kb_dir - registry = HashRegistry(openkb_dir / "hashes.json") - - # ------------------------------------------------------------------ - # 1. Hash check + identity resolution - # ------------------------------------------------------------------ - file_hash = HashRegistry.hash_file(src) - if registry.is_known(file_hash): - logger.info("Skipping already-known file: %s", src.name) - stored = registry.get(file_hash) or {} - return ConvertResult( - skipped=True, - file_hash=file_hash, - doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem, - ) + # ------------------------------------------------------------------ + # Load config & state + # ------------------------------------------------------------------ + openkb_dir = kb_dir / ".openkb" + config = load_config(openkb_dir / "config.yaml") + threshold: int = config.get("pageindex_threshold", 20) + artifact_root = staging_dir if staging_dir is not None else kb_dir + registry = HashRegistry(openkb_dir / "hashes.json") + + # ------------------------------------------------------------------ + # 1. Hash check + identity resolution + # ------------------------------------------------------------------ + file_hash = HashRegistry.hash_file(src) + if registry.is_known(file_hash): + logger.info("Skipping already-known file: %s", src.name) + stored = registry.get(file_hash) or {} + return ConvertResult( + skipped=True, + file_hash=file_hash, + doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem, + ) + if resolve_final_name: doc_name = resolve_doc_name( src, kb_dir, registry, persist_legacy=staging_dir is None, ) + else: + doc_name = _sanitize_stem(src.stem) + + # ------------------------------------------------------------------ + # 2. Copy to raw/ + # ------------------------------------------------------------------ + raw_dir = artifact_root / "raw" + raw_dir.mkdir(parents=True, exist_ok=True) + if staging_dir is None and src.resolve().is_relative_to(raw_dir.resolve()): + # Watch mode: the file already lives in raw/ — don't copy/rename. + raw_dest = src + else: + raw_dest = raw_dir / f"{doc_name}{src.suffix.lower()}" + shutil.copy2(src, raw_dest) + + # ------------------------------------------------------------------ + # 3. PDF long-doc detection + # ------------------------------------------------------------------ + if src.suffix.lower() == ".pdf": + page_count = get_pdf_page_count(src) + if page_count >= threshold: + logger.info( + "Long PDF detected (%d pages >= %d threshold): %s", + page_count, + threshold, + src.name, + ) + return ConvertResult( + raw_path=raw_dest, + is_long_doc=True, + file_hash=file_hash, + doc_name=doc_name, + ) - # ------------------------------------------------------------------ - # 2. Copy to raw/ - # ------------------------------------------------------------------ - raw_dir = artifact_root / "raw" - raw_dir.mkdir(parents=True, exist_ok=True) - if staging_dir is None and src.resolve().is_relative_to(raw_dir.resolve()): - # Watch mode: the file already lives in raw/ — don't copy/rename. - raw_dest = src - else: - raw_dest = raw_dir / f"{doc_name}{src.suffix.lower()}" - shutil.copy2(src, raw_dest) - - # ------------------------------------------------------------------ - # 3. PDF long-doc detection - # ------------------------------------------------------------------ - if src.suffix.lower() == ".pdf": - page_count = get_pdf_page_count(src) - if page_count >= threshold: - logger.info( - "Long PDF detected (%d pages >= %d threshold): %s", - page_count, - threshold, - src.name, - ) - return ConvertResult( - raw_path=raw_dest, - is_long_doc=True, - file_hash=file_hash, - doc_name=doc_name, - ) - - # ------------------------------------------------------------------ - # 4/5. Convert to Markdown - # ------------------------------------------------------------------ - sources_dir = artifact_root / "wiki" / "sources" - sources_dir.mkdir(parents=True, exist_ok=True) - images_dir = artifact_root / "wiki" / "sources" / "images" / doc_name - images_dir.mkdir(parents=True, exist_ok=True) - - if src.suffix.lower() == ".md": - markdown = src.read_text(encoding="utf-8") - markdown = copy_relative_images(markdown, src.parent, doc_name, images_dir) - elif src.suffix.lower() == ".pdf": - # Use pymupdf dict-mode for PDFs: text + images inline at correct positions - markdown = convert_pdf_with_images(src, doc_name, images_dir) - else: - # Non-PDF, non-MD: use markitdown (docx, pptx, html, etc.) - mid = MarkItDown() - result = mid.convert(str(src), keep_data_uris=True) - markdown = result.text_content - markdown = extract_base64_images(markdown, doc_name, images_dir) - - dest_md = sources_dir / f"{doc_name}.md" - atomic_write_text(dest_md, markdown) + # ------------------------------------------------------------------ + # 4/5. Convert to Markdown + # ------------------------------------------------------------------ + sources_dir = artifact_root / "wiki" / "sources" + sources_dir.mkdir(parents=True, exist_ok=True) + images_dir = artifact_root / "wiki" / "sources" / "images" / doc_name + images_dir.mkdir(parents=True, exist_ok=True) + + if src.suffix.lower() == ".md": + markdown = src.read_text(encoding="utf-8") + markdown = copy_relative_images(markdown, src.parent, doc_name, images_dir) + elif src.suffix.lower() == ".pdf": + # Use pymupdf dict-mode for PDFs: text + images inline at correct positions + markdown = convert_pdf_with_images(src, doc_name, images_dir) + else: + # Non-PDF, non-MD: use markitdown (docx, pptx, html, etc.) + mid = MarkItDown() + result = mid.convert(str(src), keep_data_uris=True) + markdown = result.text_content + markdown = extract_base64_images(markdown, doc_name, images_dir) + + dest_md = sources_dir / f"{doc_name}.md" + atomic_write_text(dest_md, markdown) + + return ConvertResult( + raw_path=raw_dest, + source_path=dest_md, + file_hash=file_hash, + doc_name=doc_name, + ) - return ConvertResult( - raw_path=raw_dest, - source_path=dest_md, - file_hash=file_hash, - doc_name=doc_name, + +def convert_document( + src: Path, + kb_dir: Path, + *, + staging_dir: Path | None = None, +) -> ConvertResult: + """Convert a document into live or staged raw/source artifacts. + + The returned metadata is consumed by the serial add commit path, which owns + final registry updates. When ``staging_dir`` is provided, artifacts are + written under that private tree and later published by the mutation owner. + """ + with kb_ingest_lock(kb_dir / ".openkb"): + return _convert_document_impl( + src, + kb_dir, + staging_dir=staging_dir, + resolve_final_name=True, ) + + +def convert_document_for_prepare( + src: Path, + kb_dir: Path, + *, + staging_dir: Path, +) -> ConvertResult: + """Convert into private staging without taking the KB mutation lock. + + The prepared doc name is a sanitized source-stem candidate only. The serial + commit owner resolves the final collision-safe name against live registry + state before publishing staged artifacts. + """ + return _convert_document_impl( + src, + kb_dir, + staging_dir=staging_dir, + resolve_final_name=False, + ) diff --git a/openkb/locks.py b/openkb/locks.py index a085da75..a7abf4f1 100644 --- a/openkb/locks.py +++ b/openkb/locks.py @@ -12,6 +12,7 @@ import json import logging import os +import shutil import tempfile import threading from pathlib import Path @@ -123,6 +124,41 @@ def _drain_pending_journals(openkb_dir: Path) -> None: log = logging.getLogger(__name__) for message in recover_pending_journals(openkb_dir.parent): log.warning(message) + _reap_prepare_staging(openkb_dir) + + +def _reap_prepare_staging(openkb_dir: Path) -> None: + """Remove orphaned prepare-staging dirs left by interrupted prepares. + + Prepare staging (``.openkb/staging/prepare/--/``) has no + mutation journal, so an interrupted prepare can strand it with nothing to + reclaim it. This runs at first exclusive-lock acquisition — under the OS + ``flock``, before this process creates staging of its own — so anything + present is an orphan from a crashed prior run and is reaped. Directory add + holds the same lock across its whole prepare+commit batch (via the ``add`` + command's ``@_with_kb_lock``), so a live batch's staging is never visible to + another process's reaper. Scoped strictly to ``staging/prepare/``; mutation + journals and ``staging/rollback-*`` backups are left to + :func:`openkb.mutation.recover_pending_journals`. + """ + log = logging.getLogger(__name__) + prepare_root = openkb_dir / "staging" / "prepare" + if not prepare_root.is_dir(): + return + # Materialize first: entries are removed from this dir inside the loop. + for orphan in list(prepare_root.iterdir()): + if orphan.is_symlink(): + # Never follow a symlink — rmtree could descend into its target. + log.warning("Skipping symlink in prepare staging (not followed): %s", orphan) + continue + if orphan.is_dir(): + shutil.rmtree(orphan, ignore_errors=True) + else: + orphan.unlink(missing_ok=True) + if orphan.exists(): + log.warning("Could not fully reap orphaned prepare staging: %s", orphan) + else: + log.info("Reaped orphaned prepare staging: %s", orphan) @contextlib.contextmanager diff --git a/tests/test_add_command.py b/tests/test_add_command.py index ae84461e..8e0bf7a6 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -223,43 +223,124 @@ def test_add_directory_calls_helper_for_each_file(self, tmp_path): runner = CliRunner() with ( - patch("openkb.cli.add_single_file") as mock_add, + patch("openkb.cli.add_directory_serial") as mock_serial, patch("openkb.cli._find_kb_dir", return_value=kb_dir), ): runner.invoke(cli, ["add", str(docs_dir)]) - # Should be called for .md and .txt but not .xyz - assert mock_add.call_count == 2 - called_names = {call.args[0].name for call in mock_add.call_args_list} - assert "a.md" in called_names - assert "b.txt" in called_names - assert "ignore.xyz" not in called_names - - def test_add_directory_stops_after_dirty_rollback(self, tmp_path): - import pytest + # The filtered, sorted file list is passed to the serial coordinator. + mock_serial.assert_called_once() + processed = [p.name for p in mock_serial.call_args.args[0]] + assert processed == ["a.md", "b.txt"] # .md and .txt, not .xyz + + def test_add_does_not_expose_jobs_option(self, tmp_path): + self._setup_kb(tmp_path) + runner = CliRunner() + result = runner.invoke(cli, ["add", "--jobs", "2"]) + + assert result.exit_code != 0 + assert "No such option '--jobs'" in result.output + + def test_add_serial_loop_stops_on_dirty_rollback(self, tmp_path): from openkb.add_coordinator import DirtyRollbackError kb_dir = self._setup_kb(tmp_path) docs_dir = tmp_path / "docs" docs_dir.mkdir() - (docs_dir / "a.md").write_text("# A") - (docs_dir / "b.md").write_text("# B") - dirty_error = DirtyRollbackError( - "add", - kb_dir / ".openkb" / "journal" / "retained.json", - ) + (docs_dir / "a.md").write_text("# A\n", encoding="utf-8") + (docs_dir / "b.md").write_text("# B\n", encoding="utf-8") runner = CliRunner() with ( - patch("openkb.cli.add_single_file", side_effect=dirty_error) as mock_add, patch("openkb.cli._find_kb_dir", return_value=kb_dir), + patch("openkb.add_prepare.prepare_document", return_value=None), + patch( + "openkb.cli.commit_prepared_document", + side_effect=DirtyRollbackError("add", tmp_path / "j.json"), + ) as mock_commit, ): - with pytest.raises(DirtyRollbackError) as exc_info: - runner.invoke(cli, ["add", str(docs_dir)], catch_exceptions=False) + result = runner.invoke(cli, ["add", str(docs_dir)]) - assert exc_info.value is dirty_error - mock_add.assert_called_once() - assert mock_add.call_args.args[0].name == "a.md" + assert result.exit_code == 1 + assert mock_commit.call_count == 1 # batch stopped after the dirty rollback + assert "Dirty rollback" in result.output + + def test_add_serial_loop_continues_on_normal_failure(self, tmp_path): + kb_dir = self._setup_kb(tmp_path) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + (docs_dir / "a.md").write_text("# A\n", encoding="utf-8") + (docs_dir / "b.md").write_text("# B\n", encoding="utf-8") + + runner = CliRunner() + with ( + patch("openkb.cli._find_kb_dir", return_value=kb_dir), + patch("openkb.add_prepare.prepare_document", return_value=None), + patch("openkb.cli.commit_prepared_document", return_value="failed") as mock_commit, + ): + result = runner.invoke(cli, ["add", str(docs_dir)]) + + assert result.exit_code == 0 # a normal "failed" does NOT stop the batch + assert mock_commit.call_count == 2 + + def test_add_directory_prepare_commit_end_to_end(self, tmp_path): + """`openkb add ` routes through prepare/commit and lands every file + in raw/ + wiki/sources/ and the registry, in input order.""" + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + (docs_dir / "a.md").write_text("# Alpha\n", encoding="utf-8") + (docs_dir / "b.md").write_text("# Beta\n", encoding="utf-8") + + runner = CliRunner() + with ( + patch("openkb.cli._find_kb_dir", return_value=kb_dir), + patch("openkb.cli.asyncio.run"), + patch("openkb.cli._setup_llm_key"), + ): + result = runner.invoke(cli, ["add", str(docs_dir)]) + + assert result.exit_code == 0 + assert (kb_dir / "raw" / "a.md").exists() + assert (kb_dir / "raw" / "b.md").exists() + assert (kb_dir / "wiki" / "sources" / "a.md").exists() + assert (kb_dir / "wiki" / "sources" / "b.md").exists() + names = { + meta.get("name") + for meta in HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries().values() + } + assert {"a.md", "b.md"} <= names + + def test_add_directory_prepare_failure_does_not_stop_batch(self, tmp_path): + """A prepare failure for one file is caught; the batch continues and the + other file is still added, with no official state change for the failure.""" + from openkb.add_prepare import prepare_document as real_prepare + + kb_dir = self._setup_kb(tmp_path) + docs_dir = tmp_path / "docs" + docs_dir.mkdir() + (docs_dir / "a.md").write_text("# Alpha\n", encoding="utf-8") + (docs_dir / "b.md").write_text("# Beta\n", encoding="utf-8") + + def fake_prepare(f, kb_dir, *, input_index): + if f.name == "a.md": + raise RuntimeError("boom") + return real_prepare(f, kb_dir, input_index=input_index) + + runner = CliRunner() + with ( + patch("openkb.cli._find_kb_dir", return_value=kb_dir), + patch("openkb.cli.asyncio.run"), + patch("openkb.cli._setup_llm_key"), + patch("openkb.add_prepare.prepare_document", side_effect=fake_prepare), + ): + result = runner.invoke(cli, ["add", str(docs_dir)]) + + assert result.exit_code == 0 # batch continues past the failed prepare + assert not (kb_dir / "raw" / "a.md").exists() # a.md not added + assert (kb_dir / "raw" / "b.md").exists() # b.md still added def test_add_unsupported_extension(self, tmp_path): kb_dir = self._setup_kb(tmp_path) diff --git a/tests/test_add_prepare.py b/tests/test_add_prepare.py new file mode 100644 index 00000000..176c0a98 --- /dev/null +++ b/tests/test_add_prepare.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import patch + + +def _setup_kb(tmp_path: Path) -> Path: + (tmp_path / "raw").mkdir(parents=True) + (tmp_path / "wiki" / "sources" / "images").mkdir(parents=True) + (tmp_path / "wiki" / "summaries").mkdir(parents=True) + (tmp_path / "wiki" / "concepts").mkdir(parents=True) + (tmp_path / "wiki" / "entities").mkdir(parents=True) + openkb_dir = tmp_path / ".openkb" + openkb_dir.mkdir() + (openkb_dir / "config.yaml").write_text("model: gpt-4o-mini\n", encoding="utf-8") + (openkb_dir / "hashes.json").write_text(json.dumps({}), encoding="utf-8") + return tmp_path + + +def test_prepare_document_writes_only_private_staging(tmp_path): + from openkb.add_prepare import prepare_document + + kb_dir = _setup_kb(tmp_path / "kb") + doc = tmp_path / "source.md" + doc.write_text("# Source\n", encoding="utf-8") + + prepared = prepare_document(doc, kb_dir, input_index=0) + + assert prepared.input_index == 0 + assert prepared.source_path == doc + assert prepared.staging_dir.is_dir() + assert prepared.staging_dir.is_relative_to(kb_dir / ".openkb" / "staging" / "prepare") + assert not (kb_dir / "raw" / "source.md").exists() + assert not (kb_dir / "wiki" / "sources" / "source.md").exists() + assert json.loads((kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")) == {} + assert not (kb_dir / ".openkb" / "journal").exists() + + +def test_prepare_document_does_not_take_mutation_lock(tmp_path): + from openkb.add_prepare import prepare_document + + kb_dir = _setup_kb(tmp_path / "kb") + doc = tmp_path / "source.md" + doc.write_text("# Source\n", encoding="utf-8") + + with patch("openkb.converter.kb_ingest_lock", side_effect=AssertionError("lock acquired")): + prepared = prepare_document(doc, kb_dir, input_index=3) + + assert prepared.input_index == 3 + + +def test_prepare_document_cleans_staging_on_keyboard_interrupt(tmp_path): + import pytest + + from openkb.add_prepare import prepare_document + + kb_dir = _setup_kb(tmp_path / "kb") + doc = tmp_path / "source.md" + doc.write_text("# Source\n", encoding="utf-8") + + with patch("openkb.add_prepare.convert_document_for_prepare", side_effect=KeyboardInterrupt): + with pytest.raises(KeyboardInterrupt): + prepare_document(doc, kb_dir, input_index=0) + + prepare_root = kb_dir / ".openkb" / "staging" / "prepare" + assert not list(prepare_root.glob("*")) + + +def test_prepare_staging_dir_uses_sanitize_stem_fallback(tmp_path): + from openkb.add_prepare import _prepare_staging_dir + + kb_dir = tmp_path / "kb" + # A stem of all non-word characters: the old hand-rolled sanitizer produced + # an empty segment; the shared _sanitize_stem falls back to "document". + source = tmp_path / "!!!.md" + staging = _prepare_staging_dir(kb_dir, 0, source) + + assert "document" in staging.name + assert staging.is_dir() + + +def test_commit_prepared_document_resolves_final_name_under_serial_owner(tmp_path): + from openkb.add_prepare import prepare_document + from openkb.cli import commit_prepared_document + from openkb.locks import kb_ingest_lock + from openkb.state import HashRegistry + + kb_dir = _setup_kb(tmp_path / "kb") + first_dir = tmp_path / "first" + second_dir = tmp_path / "second" + first_dir.mkdir() + second_dir.mkdir() + first = first_dir / "note.md" + second = second_dir / "note.md" + first.write_text("# First\n", encoding="utf-8") + second.write_text("# Second\n", encoding="utf-8") + + HashRegistry(kb_dir / ".openkb" / "hashes.json").add( + "existing", + {"name": "note.md", "doc_name": "note", "type": "md", "path": str(first)}, + ) + with patch("openkb.cli.asyncio.run"), patch("openkb.cli._setup_llm_key"): + with kb_ingest_lock(kb_dir / ".openkb"): + prepared = prepare_document(second, kb_dir, input_index=0) + outcome = commit_prepared_document(prepared, kb_dir) + + assert outcome == "added" + entries = HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() + committed = [meta for meta in entries.values() if meta.get("name") == "note.md"] + assert any(meta.get("doc_name") == "note" for meta in committed) + renamed = [meta for meta in committed if meta.get("doc_name", "").startswith("note-")] + assert renamed + assert (kb_dir / "raw" / f"{renamed[0]['doc_name']}.md").exists() + assert (kb_dir / "wiki" / "sources" / f"{renamed[0]['doc_name']}.md").exists() + + +def test_commit_prepared_document_requires_serial_owner_lock(tmp_path): + import pytest + + from openkb.add_prepare import prepare_document + from openkb.cli import commit_prepared_document + + kb_dir = _setup_kb(tmp_path / "kb") + doc = tmp_path / "source.md" + doc.write_text("# Source\n", encoding="utf-8") + prepared = prepare_document(doc, kb_dir, input_index=0) + + with pytest.raises(RuntimeError, match="requires the caller to hold kb_ingest_lock"): + commit_prepared_document(prepared, kb_dir) + + assert prepared.staging_dir.exists() + assert not (kb_dir / "raw" / "source.md").exists() + + +def test_commit_reprepares_when_prepare_skip_is_stale(tmp_path): + """Prepare saw the hash as known (skipped, no artifacts); the entry is then + removed before commit. Commit must re-decide skip authoritatively and + re-prepare, not silently drop the document.""" + from openkb.add_prepare import prepare_document + from openkb.cli import commit_prepared_document + from openkb.locks import kb_ingest_lock + from openkb.state import HashRegistry + + kb_dir = _setup_kb(tmp_path / "kb") + doc = tmp_path / "source.md" + doc.write_text("# Source\n", encoding="utf-8") + registry_path = kb_dir / ".openkb" / "hashes.json" + real_hash = HashRegistry.hash_file(doc) + HashRegistry(registry_path).add( + real_hash, {"name": "source.md", "doc_name": "source", "type": "md"} + ) + + with patch("openkb.cli.asyncio.run"), patch("openkb.cli._setup_llm_key"): + with kb_ingest_lock(kb_dir / ".openkb"): + prepared = prepare_document(doc, kb_dir, input_index=0) + assert prepared.result.skipped is True + assert prepared.result.raw_path is None # prepare short-circuited + # The hash is removed between prepare and commit (e.g. `openkb remove`). + HashRegistry(registry_path).remove_by_hash(real_hash) + outcome = commit_prepared_document(prepared, kb_dir) + + assert outcome == "added" # not silently skipped + assert (kb_dir / "wiki" / "sources" / "source.md").exists() + assert HashRegistry(registry_path).is_known(real_hash) + + +def test_commit_reprepares_when_source_changed_after_prepare(tmp_path): + """The source is edited between the lock-free prepare and commit. The KB must + hold the NEW content registered under the NEW hash, not the stale prepare-time + bytes/hash (the TOCTOU regression).""" + from openkb.add_prepare import prepare_document + from openkb.cli import commit_prepared_document + from openkb.locks import kb_ingest_lock + from openkb.state import HashRegistry + + kb_dir = _setup_kb(tmp_path / "kb") + doc = tmp_path / "source.md" + doc.write_text("# Original\n", encoding="utf-8") + + with patch("openkb.cli.asyncio.run"), patch("openkb.cli._setup_llm_key"): + with kb_ingest_lock(kb_dir / ".openkb"): + prepared = prepare_document(doc, kb_dir, input_index=0) + original_hash = prepared.result.file_hash + doc.write_text("# Tampered\n", encoding="utf-8") # edited mid-flight + outcome = commit_prepared_document(prepared, kb_dir) + + assert outcome == "added" + tampered_hash = HashRegistry.hash_file(doc) + assert tampered_hash != original_hash + registered = HashRegistry(kb_dir / ".openkb" / "hashes.json").get(tampered_hash) + assert registered is not None + assert registered["doc_name"] == "source" + assert (kb_dir / "raw" / "source.md").read_text(encoding="utf-8") == "# Tampered\n" diff --git a/tests/test_converter.py b/tests/test_converter.py index 1349b6fb..d8101866 100644 --- a/tests/test_converter.py +++ b/tests/test_converter.py @@ -450,3 +450,28 @@ def test_duplicate_copy_skip_does_not_backfill_path(self, kb_dir): # have backfilled the copy's path onto the legacy entry reg2 = HashRegistry(kb_dir / ".openkb" / "hashes.json") assert "path" not in reg2.get(first.file_hash) # not poisoned + + +def test_convert_document_for_prepare_does_not_acquire_mutation_lock_or_write_official_paths( + tmp_path, +): + from openkb.converter import convert_document_for_prepare + + kb_dir = tmp_path / "kb" + (kb_dir / ".openkb").mkdir(parents=True) + (kb_dir / ".openkb" / "config.yaml").write_text("model: gpt-4o-mini\n", encoding="utf-8") + (kb_dir / ".openkb" / "hashes.json").write_text("{}", encoding="utf-8") + (kb_dir / "raw").mkdir() + (kb_dir / "wiki" / "sources").mkdir(parents=True) + staging_dir = tmp_path / "private-staging" + doc = tmp_path / "note.md" + doc.write_text("# Note\n", encoding="utf-8") + + with patch("openkb.converter.kb_ingest_lock", side_effect=AssertionError("lock acquired")): + result = convert_document_for_prepare(doc, kb_dir, staging_dir=staging_dir) + + assert result.doc_name == "note" + assert (staging_dir / "raw" / "note.md").exists() + assert (staging_dir / "wiki" / "sources" / "note.md").exists() + assert not (kb_dir / "raw" / "note.md").exists() + assert not (kb_dir / "wiki" / "sources" / "note.md").exists() diff --git a/tests/test_locks.py b/tests/test_locks.py index fec8289f..34ef1741 100644 --- a/tests/test_locks.py +++ b/tests/test_locks.py @@ -127,3 +127,85 @@ def test_atomic_write_json_replaces_file(tmp_path): atomic_write_json(target, {"a": {"name": "doc.pdf"}}, ensure_ascii=False) assert json.loads(target.read_text(encoding="utf-8")) == {"a": {"name": "doc.pdf"}} + + +def test_kb_ingest_lock_held_is_per_thread(tmp_path): + from openkb.locks import kb_ingest_lock_held + + openkb_dir = tmp_path / ".openkb" + assert kb_ingest_lock_held(openkb_dir) is False + + holder_seen = {} + worker_seen = {} + + def worker(): + worker_seen["held"] = kb_ingest_lock_held(openkb_dir) + + with kb_ingest_lock(openkb_dir): + holder_seen["held"] = kb_ingest_lock_held(openkb_dir) + t = threading.Thread(target=worker) + t.start() + t.join() + + assert holder_seen["held"] is True + # Per-thread (threading.local): a worker does not see the main thread's lock. + assert worker_seen["held"] is False + assert kb_ingest_lock_held(openkb_dir) is False + + +def test_first_exclusive_lock_reaps_orphaned_prepare_staging(tmp_path): + openkb_dir = tmp_path / ".openkb" + prepare_root = openkb_dir / "staging" / "prepare" + prepare_root.mkdir(parents=True) + orphan = prepare_root / "000001-doc-abcdef12" + orphan.mkdir() + (orphan / "artifact.md").write_text("orphan", encoding="utf-8") + + # A non-prepare staging dir must NOT be touched (reaper is scoped to prepare/). + other = openkb_dir / "staging" / "rollback-deadbeef" + other.mkdir(parents=True) + (other / "backup").write_text("keep", encoding="utf-8") + + with kb_ingest_lock(openkb_dir): + pass + + assert not orphan.exists() + assert not any(prepare_root.iterdir()) + assert other.exists() # scope: only staging/prepare/ is reaped + + +def test_prepare_staging_created_under_lock_survives_then_is_reaped_next_acquire(tmp_path): + openkb_dir = tmp_path / ".openkb" + prepare_root = openkb_dir / "staging" / "prepare" + + with kb_ingest_lock(openkb_dir): + prepare_root.mkdir(parents=True) + live = prepare_root / "000002-note-12345678" + live.mkdir() + (live / "x.md").write_text("live", encoding="utf-8") + # Reaper does not run on the reentrant acquire a commit would make, and + # it already ran before this staging was created, so it survives the lock. + assert live.exists() + + # Released and re-acquired: the staging from the prior hold is now an orphan + # and is reaped at this new 0->1 acquisition. + with kb_ingest_lock(openkb_dir): + assert not live.exists() + + +def test_reaper_does_not_follow_symlink_in_prepare_staging(tmp_path): + openkb_dir = tmp_path / ".openkb" + prepare_root = openkb_dir / "staging" / "prepare" + prepare_root.mkdir(parents=True) + # A symlink inside prepare/ must never be followed by rmtree (its target + # must be left intact). + target = tmp_path / "secret" + target.mkdir() + (target / "keep.txt").write_text("do-not-delete", encoding="utf-8") + link = prepare_root / "000004-link-dead0000" + link.symlink_to(target, target_is_directory=True) + + with kb_ingest_lock(openkb_dir): + pass + + assert (target / "keep.txt").exists() From 6969833f69c89ffc6b6b4507666fb4037488e9df Mon Sep 17 00:00:00 2001 From: Guohao Zhang Date: Sat, 4 Jul 2026 14:24:30 +0800 Subject: [PATCH 2/2] fix(mutation): harden prepare reaper and retarget for Windows Three cross-platform defects in the lock-free prepare path (ee22914), each with a POSIX-simulated regression test (CI is Linux-only): - reaper stalled the KB on a locked loose file: the file branch's unlink(missing_ok=True) only swallows FileNotFoundError, so a PermissionError (AV/indexer on Windows) escaped kb_lock and crashed every exclusive-lock command. Now wrapped in try/except OSError. - read-only staging never self-healed: copy2 preserves a read-only source's bit, and rmtree(ignore_errors=True) can't delete read-only entries on Windows, so the orphan re-logged 'Could not fully reap' on every lock acquisition. rmtree now passes an onerror that clears the read-only bit and retries. - retarget rewrote line endings: write_text without newline= translated \n to \r\n on Windows, so a collision-renamed source became CRLF while all others stayed LF. Switched to atomic_write_text (LF-preserving). --- openkb/cli.py | 5 ++- openkb/locks.py | 38 ++++++++++++++++++- tests/test_add_prepare.py | 48 ++++++++++++++++++++++++ tests/test_locks.py | 77 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 165 insertions(+), 3 deletions(-) diff --git a/openkb/cli.py b/openkb/cli.py index 0fa8e252..e698a41e 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -535,7 +535,10 @@ def _retarget_prepared_document_artifacts(prepared, doc_name: str) -> None: if old_source.exists(): text = old_source.read_text(encoding="utf-8") text = text.replace(f"sources/images/{old_doc_name}/", f"sources/images/{doc_name}/") - old_source.write_text(text, encoding="utf-8") + # LF-preserving rewrite: write_text without newline= would translate \n + # to \r\n on Windows, leaving a collision-renamed source CRLF while every + # other source (written via atomic_write_text in convert) stays LF. + atomic_write_text(old_source, text) old_source.rename(new_source) result.source_path = new_source # Defensive: if prepare ever writes source_path off the .md diff --git a/openkb/locks.py b/openkb/locks.py index a7abf4f1..f47c47ad 100644 --- a/openkb/locks.py +++ b/openkb/locks.py @@ -13,6 +13,7 @@ import logging import os import shutil +import stat import tempfile import threading from pathlib import Path @@ -127,6 +128,32 @@ def _drain_pending_journals(openkb_dir: Path) -> None: _reap_prepare_staging(openkb_dir) +def _reap_prepare_staging_onerror(func, path, exc_info) -> None: + """``shutil.rmtree`` onerror: clear a read-only bit so the reap self-heals. + + ``shutil.copy2`` preserves a read-only source's attribute into staging; on + Windows ``os.unlink``/``os.rmdir`` deny a read-only entry, so rmtree would + leave the orphan behind on every reap ("Could not fully reap …" forever, + re-logged on each exclusive-lock acquisition). Adding the owner-write bit + clears ``FILE_ATTRIBUTE_READONLY`` and the retry succeeds (POSIX is + unaffected — a read-only bit never blocks unlink there). Any other error is + swallowed to preserve the best-effort reap this branch has always had; a + still-stuck orphan is reported via the ``orphan.exists()`` check below. + ``path`` is the absolute path shutil reports (``_rmtree_safe_fd`` → + ``onexc`` → here), so ``chmod`` and the retry resolve correctly. + """ + exc = exc_info[1] if exc_info else None + if isinstance(exc, PermissionError) and func in (os.unlink, os.rmdir): + try: + os.chmod(path, os.stat(path).st_mode | stat.S_IWUSR) + except OSError: + return + try: + func(path) + except OSError: + return + + def _reap_prepare_staging(openkb_dir: Path) -> None: """Remove orphaned prepare-staging dirs left by interrupted prepares. @@ -152,9 +179,16 @@ def _reap_prepare_staging(openkb_dir: Path) -> None: log.warning("Skipping symlink in prepare staging (not followed): %s", orphan) continue if orphan.is_dir(): - shutil.rmtree(orphan, ignore_errors=True) + shutil.rmtree(orphan, onerror=_reap_prepare_staging_onerror) else: - orphan.unlink(missing_ok=True) + # Best-effort: a loose file an AV/indexer holds open on Windows + # raises PermissionError (missing_ok only swallows FileNotFoundError). + # Swallow OSError so one unreachable orphan can't escape the reap, + # run up through kb_lock, and stall every exclusive-lock command. + try: + orphan.unlink(missing_ok=True) + except OSError as exc: + log.debug("Could not unlink orphaned prepare staging %s: %s", orphan, exc) if orphan.exists(): log.warning("Could not fully reap orphaned prepare staging: %s", orphan) else: diff --git a/tests/test_add_prepare.py b/tests/test_add_prepare.py index 176c0a98..5dc36532 100644 --- a/tests/test_add_prepare.py +++ b/tests/test_add_prepare.py @@ -115,6 +115,54 @@ def test_commit_prepared_document_resolves_final_name_under_serial_owner(tmp_pat assert (kb_dir / "wiki" / "sources" / f"{renamed[0]['doc_name']}.md").exists() +def test_retarget_preserves_lf_line_endings(tmp_path, monkeypatch): + """Renaming a prepared source on collision must keep it LF. + + The convert phase writes via atomic_write_text (binary, LF preserved), but + retarget used Path.write_text without newline=, which on Windows translates + \\n to \\r\\n — so a collision-renamed source ends up CRLF while every other + source stays LF (inconsistent KB, noisy git diffs, stray \\r in \\n-split + parsers). POSIX write_text already leaves LF, so we force the Windows + translation to prove the contract holds regardless of platform. + """ + from openkb.add_prepare import PreparedDocument + from openkb.cli import _retarget_prepared_document_artifacts + from openkb.converter import ConvertResult + + staging = tmp_path / "staging" + (staging / "raw").mkdir(parents=True) + sources = staging / "wiki" / "sources" + sources.mkdir(parents=True) + + old_name = "orig" + new_name = "orig-deadbeef" + old_source = sources / f"{old_name}.md" + old_source.write_text("# Title\n\nsources/images/orig/x.png\n", encoding="utf-8") + old_raw = staging / "raw" / f"{old_name}.md" + old_raw.write_text("raw\n", encoding="utf-8") + + prepared = PreparedDocument( + input_index=0, + source_path=Path(f"{old_name}.md"), + staging_dir=staging, + result=ConvertResult(doc_name=old_name, raw_path=old_raw, source_path=old_source), + ) + + real_write_text = Path.write_text + + def windows_translate(self, data, *args, **kwargs): + # Mimic Windows default text-mode write (newline=None): \n -> \r\n. + return real_write_text(self, data.replace("\n", "\r\n"), *args, **kwargs) + + monkeypatch.setattr(Path, "write_text", windows_translate) + + _retarget_prepared_document_artifacts(prepared, new_name) + + new_source = sources / f"{new_name}.md" + assert new_source.exists() + assert b"\r\n" not in new_source.read_bytes() + + def test_commit_prepared_document_requires_serial_owner_lock(tmp_path): import pytest diff --git a/tests/test_locks.py b/tests/test_locks.py index 34ef1741..d2a96588 100644 --- a/tests/test_locks.py +++ b/tests/test_locks.py @@ -3,8 +3,10 @@ from __future__ import annotations import json +import os import stat import threading +from pathlib import Path import pytest @@ -209,3 +211,78 @@ def test_reaper_does_not_follow_symlink_in_prepare_staging(tmp_path): pass assert (target / "keep.txt").exists() + + +def test_reaper_survives_denied_unlink_of_loose_file_in_prepare_staging(tmp_path, monkeypatch): + """A loose file (not a dir) in staging/prepare/ whose unlink is denied must + not escape kb_lock and stall the whole KB. + + On Windows an AV/indexer holding such a file makes os.unlink raise + PermissionError; ``missing_ok=True`` only swallows FileNotFoundError, so the + error currently propagates out of every exclusive-lock acquisition + (add/remove/recompile/chat). The directory branch uses rmtree and is safe; + this guards the asymmetric file branch. + """ + openkb_dir = tmp_path / ".openkb" + prepare_root = openkb_dir / "staging" / "prepare" + prepare_root.mkdir(parents=True) + loose = prepare_root / "stray.dat" + loose.write_text("x", encoding="utf-8") + + real_unlink = Path.unlink + + def deny_unlink(self, *args, **kwargs): + if Path(self) == loose: + raise PermissionError(13, "Access is denied", str(self)) + return real_unlink(self, *args, **kwargs) + + monkeypatch.setattr(Path, "unlink", deny_unlink) + + # Must not raise: the reap is best-effort and must not stall the lock holder. + with kb_ingest_lock(openkb_dir): + pass + + +def test_reaper_self_heals_readonly_dir_under_prepare_staging(tmp_path, monkeypatch): + """A read-only file inside an orphaned prepare dir must be reaped, not left + behind forever. + + shutil.copy2 preserves a read-only source's attribute into staging; on + Windows os.unlink denies a read-only file and rmtree(ignore_errors=True) + leaves the tree behind, so the orphan resurfaces as "Could not fully reap" + on every lock acquisition and never self-heals. POSIX deletes read-only + files fine, so we simulate the Windows denial: deny once, then let the + retry (after the handler clears the read-only bit) succeed. + """ + openkb_dir = tmp_path / ".openkb" + prepare_root = openkb_dir / "staging" / "prepare" + prepare_root.mkdir(parents=True) + orphan = prepare_root / "000005-doc-11223344" + orphan.mkdir() + readonly = orphan / "readonly.md" + readonly.write_text("locked", encoding="utf-8") + + real_unlink = os.unlink + attempts = {"n": 0} + + def deny_once_then_succeed(*args, **kwargs): + # shutil's POSIX fast path calls os.unlink(entry_name, dir_fd=topfd) — a + # relative name — so identify the read-only file by basename. Deny the + # first attempt (Windows Access-denied on a read-only file), then let the + # handler's retry (after it clears the read-only bit) succeed. + name = args[0] if args else kwargs.get("path") + if Path(str(name)).name == readonly.name: + attempts["n"] += 1 + if attempts["n"] == 1: + raise PermissionError(13, "Access is denied", str(name)) + return real_unlink(*args, **kwargs) + + # shutil.rmtree calls os.unlink internally; patching the shared os module + # makes its first attempt on the read-only file raise (Windows behaviour). + monkeypatch.setattr(os, "unlink", deny_once_then_succeed) + + with kb_ingest_lock(openkb_dir): + pass + + assert not orphan.exists() # fully reaped, no residue to warn about forever + assert attempts["n"] == 2 # denied once, recovered on the retry