Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions openkb/add_prepare.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from __future__ import annotations

import shutil
import uuid
from dataclasses import dataclass
from pathlib import Path

from openkb.converter import ConvertResult, _sanitize_stem, convert_document_for_prepare


@dataclass(slots=True)
class PreparedDocument:
input_index: int
source_path: Path
staging_dir: Path
result: ConvertResult

@property
def doc_name_candidate(self) -> str | None:
return self.result.doc_name


def _prepare_staging_dir(kb_dir: Path, input_index: int, source: Path) -> Path:
safe = _sanitize_stem(source.stem)
path = (
kb_dir
/ ".openkb"
/ "staging"
/ "prepare"
/ f"{input_index:06d}-{safe}-{uuid.uuid4().hex[:8]}"
)
path.mkdir(parents=True, exist_ok=False)
return path


def prepare_document(source: Path, kb_dir: Path, *, input_index: int) -> PreparedDocument:
"""Convert ``source`` into private staging without the KB mutation lock.

Coordinator-internal: callers must run this under the serial batch owner's
held ``kb_ingest_lock`` (the ``add`` command acquires it via
``@_with_kb_lock``). The reaper reclaims any staging present at the owner's
first lock acquisition, so once this runs under the held lock the staging
tree is private to this batch.
"""
staging_dir = _prepare_staging_dir(kb_dir, input_index, source)
try:
result = convert_document_for_prepare(source, kb_dir, staging_dir=staging_dir)
return PreparedDocument(
input_index=input_index,
source_path=source,
staging_dir=staging_dir,
result=result,
)
except BaseException:
shutil.rmtree(staging_dir, ignore_errors=True)
raise


def _clear_staging_artifacts(staging_dir: Path) -> None:
"""Drop convertible artifacts (raw/, wiki/) from a prepare staging dir.

Used before re-converting into the same staging dir at commit time so stale
images/raw from the prior prepare don't leak into the re-convert.
"""
for sub in ("raw", "wiki"):
shutil.rmtree(staging_dir / sub, ignore_errors=True)
181 changes: 167 additions & 14 deletions openkb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,25 @@ def filter(self, record: logging.LogRecord) -> bool:
)
from openkb.add_coordinator import _cleanup_staging_dirs
from openkb.converter import (
ConvertResult,
_registry_path,
_sanitize_stem,
convert_document,
resolve_doc_name,
resolve_doc_name_from_key,
)
from openkb.indexer import (
_cloud_display_stem,
_write_long_doc_artifacts,
prepare_cloud_import,
)
from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock
from openkb.locks import (
atomic_write_json,
atomic_write_text,
kb_ingest_lock,
kb_ingest_lock_held,
kb_read_lock,
)
from openkb.log import append_log
from openkb.mutation import publish_staged_tree
from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS
Expand Down Expand Up @@ -452,8 +460,116 @@ def add_single_file(
return _add_single_file_locked(file_path, kb_dir, stage=stage)


def commit_prepared_document(prepared, kb_dir: Path) -> Literal["added", "skipped", "failed"]:
"""Commit a prepared document under the serial KB mutation owner.

Coordinator-internal: called only by the serial batch owner
(:func:`add_directory_serial`), which already holds ``kb_ingest_lock`` for
the whole prepare+commit batch. The reentrant acquire below is only safe
because of that. Propagates :class:`DirtyRollbackError` if the mutation's
rollback failed, so the batch owner can stop on dirty state.
"""
openkb_dir = kb_dir / ".openkb"
if not kb_ingest_lock_held(openkb_dir):
raise RuntimeError("commit_prepared_document requires the caller to hold kb_ingest_lock")
with kb_ingest_lock(openkb_dir):
return _add_single_file_locked(
prepared.source_path,
kb_dir,
stage=True,
prepared=prepared,
)


def add_directory_serial(files: list[Path], kb_dir: Path) -> None:
"""Serially prepare and commit each file in input order.

Stage 4 batch coordinator for ``openkb add <dir>``. Must be called with
``kb_ingest_lock`` already held — the ``add`` command's ``@_with_kb_lock``
decorator holds it for the whole batch. Prepares are lock-free and run under
that held lock, so the reaper (which fires once at the owner's first lock
acquisition, before this batch's staging exists) cannot collide with a live
prepare. Stage 5 will add a parallel-prepare variant alongside this one.

A per-file prepare/commit failure is logged and the batch continues; a
:class:`DirtyRollbackError` propagates so the caller can stop the batch on
dirty state.
"""
from openkb.add_prepare import prepare_document

total = len(files)
for i, f in enumerate(files, 1):
click.echo(f"\n[{i}/{total}] ", nl=False)
try:
prepared = prepare_document(f, kb_dir, input_index=i - 1)
except Exception as exc:
click.echo(f" [ERROR] Prepare failed: {exc}")
logger.debug("Prepare traceback:", exc_info=True)
continue
commit_prepared_document(prepared, kb_dir)


def _retarget_prepared_document_artifacts(prepared, doc_name: str) -> None:
result = prepared.result
old_doc_name = prepared.doc_name_candidate or prepared.source_path.stem
if old_doc_name == doc_name:
return

suffix = prepared.source_path.suffix.lower()
raw_dir = prepared.staging_dir / "raw"
old_raw = raw_dir / f"{old_doc_name}{suffix}"
new_raw = raw_dir / f"{doc_name}{suffix}"
if old_raw.exists():
old_raw.rename(new_raw)
result.raw_path = new_raw
# Defensive: if prepare ever writes raw_path off the <doc_name><suffix>
# convention, retarget via the recorded path rather than silently no-op.
elif result.raw_path is not None and result.raw_path.exists():
new_raw = result.raw_path.with_name(f"{doc_name}{result.raw_path.suffix}")
result.raw_path.rename(new_raw)
result.raw_path = new_raw

sources_dir = prepared.staging_dir / "wiki" / "sources"
old_source = sources_dir / f"{old_doc_name}.md"
new_source = sources_dir / f"{doc_name}.md"
if old_source.exists():
text = old_source.read_text(encoding="utf-8")
text = text.replace(f"sources/images/{old_doc_name}/", f"sources/images/{doc_name}/")
# LF-preserving rewrite: write_text without newline= would translate \n
# to \r\n on Windows, leaving a collision-renamed source CRLF while every
# other source (written via atomic_write_text in convert) stays LF.
atomic_write_text(old_source, text)
old_source.rename(new_source)
result.source_path = new_source
# Defensive: if prepare ever writes source_path off the <doc_name>.md
# convention, retarget via the recorded path rather than silently no-op.
elif result.source_path is not None and result.source_path.exists():
new_source = result.source_path.with_name(f"{doc_name}{result.source_path.suffix}")
result.source_path.rename(new_source)
result.source_path = new_source

images_dir = sources_dir / "images"
old_images = images_dir / old_doc_name
new_images = images_dir / doc_name
if old_images.exists() and old_images != new_images:
old_images.rename(new_images)

result.doc_name = doc_name


def _convert_or_fail(src: Path, kb_dir: Path, staging_dir: Path | None) -> ConvertResult | None:
"""Convert ``src``; on failure log, clean staging, and return ``None``."""
try:
return convert_document(src, kb_dir, staging_dir=staging_dir)
except Exception as exc:
click.echo(f" [ERROR] Conversion failed: {exc}")
logger.debug("Conversion traceback:", exc_info=True)
_cleanup_staging_dirs([staging_dir])
return None


def _add_single_file_locked(
file_path: Path, kb_dir: Path, *, stage: bool = True
file_path: Path, kb_dir: Path, *, stage: bool = True, prepared=None
) -> Literal["added", "skipped", "failed"]:
"""Convert, index, and compile a single document into the knowledge base.

Expand All @@ -478,18 +594,50 @@ def _add_single_file_locked(
config = load_config(openkb_dir / "config.yaml")
_setup_llm_key(kb_dir)
model: str = config.get("model", DEFAULT_CONFIG["model"])
# One registry instance covers the prepared-branch re-validation and the
# commit-time registration; the held kb_ingest_lock means nothing else
# mutates hashes.json in between, so reloading would just re-parse JSON.
registry = HashRegistry(openkb_dir / "hashes.json")

staging_dir = _staging_dir_for(kb_dir, file_path) if stage else None
staging_dir = (
prepared.staging_dir
if prepared is not None
else (_staging_dir_for(kb_dir, file_path) if stage else None)
)

# 2. Convert document into staging when possible.
click.echo(f"Adding: {file_path.name}")
try:
result = convert_document(file_path, kb_dir, staging_dir=staging_dir)
except Exception as exc:
click.echo(f" [ERROR] Conversion failed: {exc}")
logger.debug("Conversion traceback:", exc_info=True)
_cleanup_staging_dirs([staging_dir])
return "failed"
result: ConvertResult
if prepared is None:
converted = _convert_or_fail(file_path, kb_dir, staging_dir)
if converted is None:
return "failed"
result = converted
else:
result = prepared.result
# Prepare ran without the lock: re-decide skip from live registry state
# (not the cached prepare-time decision) and re-validate the source. A
# hash removed between prepare and commit, or a source edited in that
# window, leaves the staged artifacts missing/stale — re-convert.
result.skipped = bool(result.file_hash and registry.is_known(result.file_hash))
if not result.skipped:
source_intact = (
result.file_hash is not None
and file_path.exists()
and HashRegistry.hash_file(file_path) == result.file_hash
and result.raw_path is not None
)
if source_intact:
doc_name = resolve_doc_name(file_path, kb_dir, registry, persist_legacy=False)
_retarget_prepared_document_artifacts(prepared, doc_name)
else:
from openkb.add_prepare import _clear_staging_artifacts

_clear_staging_artifacts(prepared.staging_dir)
converted = _convert_or_fail(file_path, kb_dir, staging_dir)
if converted is None:
return "failed"
result = converted

if result.skipped:
click.echo(f" [SKIP] Already in knowledge base: {file_path.name}")
Expand Down Expand Up @@ -569,7 +717,6 @@ def commit_body(snapshot) -> None:

# Register hash only after successful compilation.
if result.file_hash:
registry = HashRegistry(openkb_dir / "hashes.json")
doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".")
meta = {
"name": file_path.name,
Expand Down Expand Up @@ -1056,9 +1203,15 @@ def add(ctx, path, from_pageindex_cloud):
return
total = len(files)
click.echo(f"Found {total} supported file(s) in {path}.")
for i, f in enumerate(files, 1):
click.echo(f"\n[{i}/{total}] ", nl=False)
add_single_file(f, kb_dir)
from openkb.add_coordinator import DirtyRollbackError

try:
add_directory_serial(files, kb_dir)
except DirtyRollbackError as exc:
# A normal "failed"/"skipped" outcome returns and the batch
# continues; a dirty rollback is fatal and stops the batch.
click.echo(f" [ERROR] {exc}")
ctx.exit(1)
else:
if target.suffix.lower() not in SUPPORTED_EXTENSIONS:
click.echo(
Expand Down
Loading
Loading