diff --git a/openkb/agent/compiler.py b/openkb/agent/compiler.py index af3ac1bd..29a47066 100644 --- a/openkb/agent/compiler.py +++ b/openkb/agent/compiler.py @@ -36,6 +36,7 @@ resolve_entity_types, ) from openkb.lint import list_existing_wiki_targets, strip_ghost_wikilinks +from openkb.locks import atomic_write_text from openkb.schema import INDEX_SEED, get_agents_md logger = logging.getLogger(__name__) @@ -779,7 +780,7 @@ def _write_summary(wiki_dir: Path, doc_name: str, summary: str, fm_lines.append(f"doc_type: {doc_type}") fm_lines.append(_yaml_kv_line("full_text", f"sources/{doc_name}.{ext}")) fm_block = "---\n" + "\n".join(fm_lines) + "\n---\n\n" - (summaries_dir / f"{doc_name}.md").write_text(fm_block + summary, encoding="utf-8") + atomic_write_text(summaries_dir / f"{doc_name}.md", fm_block + summary) _SAFE_NAME_RE = re.compile(r'[^\w\-]') @@ -839,7 +840,7 @@ def _write_concept(wiki_dir: Path, name: str, content: str, source_file: str, is if brief: fm_lines.append(_yaml_kv_line("description", brief)) existing = frontmatter.block(fm_lines) + clean - path.write_text(existing, encoding="utf-8") + atomic_write_text(path, existing) return # Guarantee type + refresh description on update; remove legacy brief:. ex_parts2 = frontmatter.split(existing) @@ -851,7 +852,7 @@ def _write_concept(wiki_dir: Path, name: str, content: str, source_file: str, is # Drop legacy brief: lines (migrated to description:). fm_block = frontmatter.drop_line(fm_block, "brief") existing = fm_block + body - path.write_text(existing, encoding="utf-8") + atomic_write_text(path, existing) else: clean_parts = frontmatter.split(content) if clean_parts is not None: @@ -863,7 +864,7 @@ def _write_concept(wiki_dir: Path, name: str, content: str, source_file: str, is if brief: fm_lines.append(_yaml_kv_line("description", brief)) fm_block = "---\n" + "\n".join(fm_lines) + "\n---\n\n" - path.write_text(fm_block + content, encoding="utf-8") + atomic_write_text(path, fm_block + content) def _write_entity( @@ -927,10 +928,10 @@ def _build_entity_frontmatter(sources: list[str]) -> str: break merged = [source_file] + [s for s in recovered if s != source_file] existing = _build_entity_frontmatter(merged) + clean - path.write_text(existing, encoding="utf-8") + atomic_write_text(path, existing) return - path.write_text(_build_entity_frontmatter([source_file]) + clean, encoding="utf-8") + atomic_write_text(path, _build_entity_frontmatter([source_file]) + clean) _set_fm_line = frontmatter.set_line @@ -1041,7 +1042,7 @@ def _add_related_link( text = _prepend_source_to_frontmatter(text, source_file) text += f"\n\nSee also: {link}" - path.write_text(text, encoding="utf-8") + atomic_write_text(path, text) return True @@ -1068,7 +1069,7 @@ def _backlink_summary_pages( _ensure_h2_section(lines, section, quiet=True) for slug in reversed(missing): _insert_section_entry(lines, section, f"- [[{page_dir}/{slug}]]") - summary_path.write_text("\n".join(lines), encoding="utf-8") + atomic_write_text(summary_path, "\n".join(lines)) def _backlink_pages( @@ -1089,7 +1090,7 @@ def _backlink_pages( lines = text.split("\n") _ensure_h2_section(lines, "## Related Documents", quiet=True) _insert_section_entry(lines, "## Related Documents", f"- {link}") - path.write_text("\n".join(lines), encoding="utf-8") + atomic_write_text(path, "\n".join(lines)) def _backlink_summary(wiki_dir: Path, doc_name: str, concept_slugs: list[str]) -> None: @@ -1195,7 +1196,7 @@ def _remove_doc_from_pages( path.unlink() deleted.append(path.stem) elif new_text != text: - path.write_text(new_text, encoding="utf-8") + atomic_write_text(path, new_text) modified.append(path.stem) return {"modified": modified, "deleted": deleted} @@ -1291,7 +1292,7 @@ def remove_doc_from_index(wiki_dir: Path, doc_name: str, concept_slugs_deleted: while _remove_section_entry(lines, "## Entities", entity_link): pass - index_path.write_text("\n".join(lines), encoding="utf-8") + atomic_write_text(index_path, "\n".join(lines)) def _update_index( @@ -1315,7 +1316,7 @@ def _update_index( index_path = wiki_dir / "index.md" if not index_path.exists(): - index_path.write_text(INDEX_SEED, encoding="utf-8") + atomic_write_text(index_path, INDEX_SEED) lines = index_path.read_text(encoding="utf-8").split("\n") @@ -1361,7 +1362,7 @@ def _update_index( else: _insert_section_entry(lines, "## Entities", entry) - index_path.write_text("\n".join(lines), encoding="utf-8") + atomic_write_text(index_path, "\n".join(lines)) # --------------------------------------------------------------------------- @@ -2035,7 +2036,7 @@ async def compile_long_doc( updated = fm_block + body if updated != summary_content: summary_content = updated - summary_path.write_text(summary_content, encoding="utf-8") + atomic_write_text(summary_path, summary_content) # Base context A. cache_control marker on the doc message creates a # cache breakpoint covering (system + doc) for every concept call. diff --git a/openkb/cli.py b/openkb/cli.py index 28694987..ff4e2228 100644 --- a/openkb/cli.py +++ b/openkb/cli.py @@ -13,6 +13,7 @@ import shutil import sys import time +import uuid from functools import wraps from pathlib import Path from typing import Literal @@ -47,10 +48,11 @@ def filter(self, record: logging.LogRecord) -> bool: resolve_extra_headers, set_extra_headers, resolve_timeout, set_timeout, resolve_litellm_settings, ) -from openkb.converter import _registry_path, convert_document -from openkb.indexer import import_cloud_document +from openkb.converter import _registry_path, _sanitize_stem, convert_document +from openkb.indexer import _write_long_doc_artifacts, prepare_cloud_import from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock from openkb.log import append_log +from openkb.mutation import MutationSnapshot, publish_staged_tree, snapshot_paths from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS # Suppress warnings after all imports — markitdown overrides filters at import time @@ -332,13 +334,83 @@ def _clear_existing_skill_dir(kb_dir: Path, name: str) -> None: shutil.rmtree(target) -def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]: +def _staging_dir_for(kb_dir: Path, file_path: Path) -> Path: + safe = _sanitize_stem(file_path.stem) + path = kb_dir / ".openkb" / "staging" / f"add-{safe}-{uuid.uuid4().hex[:8]}" + path.mkdir(parents=True, exist_ok=False) + return path + + +def _cleanup_staging(path: Path | None) -> None: + if path is not None: + shutil.rmtree(path, ignore_errors=True) + + +def _final_artifact_paths(result, kb_dir: Path) -> tuple[Path | None, Path | None]: + final_raw = None + final_source = None + if result.raw_path is not None: + final_raw = kb_dir / "raw" / result.raw_path.name + if result.source_path is not None: + final_source = kb_dir / "wiki" / "sources" / result.source_path.name + return final_raw, final_source + + +def _snapshot_add_paths( + kb_dir: Path, + doc_name: str, + final_raw: Path | None, + final_source: Path | None, +) -> list[Path]: + paths = [ + kb_dir / ".openkb" / "hashes.json", + kb_dir / ".openkb" / "pageindex.db", + kb_dir / ".openkb" / "pageindex.db-wal", + kb_dir / ".openkb" / "pageindex.db-shm", + kb_dir / ".openkb" / "pageindex.db-journal", + kb_dir / ".openkb" / "files", + kb_dir / "wiki" / "summaries" / f"{doc_name}.md", + kb_dir / "wiki" / "sources" / f"{doc_name}.json", + kb_dir / "wiki" / "sources" / "images" / doc_name, + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + kb_dir / "wiki" / "index.md", + kb_dir / "wiki" / "log.md", + ] + if final_raw is not None: + paths.append(final_raw) + if final_source is not None: + paths.append(final_source) + return paths + + +def _run_compile_with_retry(coro_factory, label: str) -> None: + click.echo(f" {label}...") + for attempt in range(2): + try: + asyncio.run(coro_factory()) + return + except Exception as exc: + if attempt == 0: + click.echo(" Retrying compilation in 2s...") + time.sleep(2) + else: + click.echo(f" [ERROR] Compilation failed: {exc}") + logger.debug("Compilation traceback:", exc_info=True) + raise + + +def add_single_file( + file_path: Path, kb_dir: Path, *, stage: bool = True +) -> Literal["added", "skipped", "failed"]: """Convert, index, and compile a single document under the KB mutation lock.""" with kb_ingest_lock(kb_dir / ".openkb"): - return _add_single_file_locked(file_path, kb_dir) + return _add_single_file_locked(file_path, kb_dir, stage=stage) -def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]: +def _add_single_file_locked( + file_path: Path, kb_dir: Path, *, stage: bool = True +) -> Literal["added", "skipped", "failed"]: """Convert, index, and compile a single document into the knowledge base. Steps: @@ -363,129 +435,145 @@ def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", " _setup_llm_key(kb_dir) model: str = config.get("model", DEFAULT_CONFIG["model"]) - # 2. Convert document + staging_dir = _staging_dir_for(kb_dir, file_path) if stage else None + snapshot: MutationSnapshot | None = None + + # 2. Convert document into staging when possible. click.echo(f"Adding: {file_path.name}") try: - result = convert_document(file_path, kb_dir) + result = convert_document(file_path, kb_dir, staging_dir=staging_dir) except Exception as exc: click.echo(f" [ERROR] Conversion failed: {exc}") logger.debug("Conversion traceback:", exc_info=True) + _cleanup_staging(staging_dir) return "failed" if result.skipped: click.echo(f" [SKIP] Already in knowledge base: {file_path.name}") + _cleanup_staging(staging_dir) return "skipped" doc_name = result.doc_name or file_path.stem index_result = None # populated only on the long-doc branch - # 3/4. Index and compile - if result.is_long_doc: - click.echo(f" Long document detected — indexing with PageIndex...") - try: - from openkb.indexer import index_long_document - index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name) - except Exception as exc: - click.echo(f" [ERROR] Indexing failed: {exc}") - logger.debug("Indexing traceback:", exc_info=True) - return "failed" - - summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" - click.echo(f" Compiling long doc (doc_id={index_result.doc_id})...") - for attempt in range(2): + final_raw, final_source = _final_artifact_paths(result, kb_dir) + try: + snapshot = snapshot_paths( + kb_dir, + _snapshot_add_paths(kb_dir, doc_name, final_raw, final_source), + operation="add", + details={ + "file_hash": result.file_hash, + "name": file_path.name, + "doc_name": doc_name, + }, + hardlink_dirs={ + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + kb_dir / ".openkb" / "files", + }, + ) + publish_staged_tree(staging_dir, kb_dir) + if final_raw is not None: + result.raw_path = final_raw + if final_source is not None: + result.source_path = final_source + + # 3/4. Index and compile + if result.is_long_doc: + if result.raw_path is None: + raise RuntimeError(f"Converted long document has no raw artifact: {file_path.name}") + click.echo(" Long document detected — indexing with PageIndex...") try: - asyncio.run( - compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model, - doc_description=index_result.description) + from openkb.indexer import index_long_document + + index_result = index_long_document( + result.raw_path, kb_dir, doc_name=doc_name ) - break except Exception as exc: - if attempt == 0: - click.echo(f" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - return "failed" - else: - click.echo(f" Compiling short doc...") - for attempt in range(2): - try: - asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model)) - break - except Exception as exc: - if attempt == 0: - click.echo(f" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - return "failed" - - # Register hash only after successful compilation - if result.file_hash: - # Construct the registry NOW, not earlier: convert_document may have - # backfilled a legacy entry (doc_name/path) on disk via its own - # instance, and an earlier snapshot would clobber that backfill on - # the full rewrite in add(). - registry = HashRegistry(openkb_dir / "hashes.json") - doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") - meta = { - "name": file_path.name, - "doc_name": doc_name, - "type": doc_type, - "path": _registry_path(file_path, kb_dir), - } - if result.raw_path is not None: - meta["raw_path"] = _registry_path(result.raw_path, kb_dir) - if result.source_path is not None: - meta["source_path"] = _registry_path(result.source_path, kb_dir) - # For long PDFs we also persist the PageIndex doc_id so `openkb - # remove` can later call ``Collection.delete_document(doc_id)`` - # to free the managed PDF copy + SQLite row. - if index_result is not None: - meta["doc_id"] = index_result.doc_id - # An edited document arrives with a new content hash; drop the - # stale entry for the same doc_name so the registry keeps exactly - # one entry per document. - registry.remove_by_doc_name(doc_name) - registry.add(result.file_hash, meta) + click.echo(f" [ERROR] Indexing failed: {exc}") + logger.debug("Indexing traceback:", exc_info=True) + raise + + summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" + _run_compile_with_retry( + lambda: compile_long_doc( + doc_name, + summary_path, + index_result.doc_id, + kb_dir, + model, + doc_description=index_result.description, + ), + label=f"Compiling long doc (doc_id={index_result.doc_id})", + ) + else: + if result.source_path is None: + raise RuntimeError(f"Converted document has no source artifact: {file_path.name}") + source_path = result.source_path + _run_compile_with_retry( + lambda: compile_short_doc(doc_name, source_path, kb_dir, model), + label="Compiling short doc", + ) + + # Register hash only after successful compilation. + if result.file_hash: + registry = HashRegistry(openkb_dir / "hashes.json") + doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".") + meta = { + "name": file_path.name, + "doc_name": doc_name, + "type": doc_type, + "path": _registry_path(file_path, kb_dir), + } + if result.raw_path is not None: + meta["raw_path"] = _registry_path(result.raw_path, kb_dir) + if result.source_path is not None: + meta["source_path"] = _registry_path(result.source_path, kb_dir) + if index_result is not None: + meta["doc_id"] = index_result.doc_id + registry.remove_by_doc_name(doc_name) + for existing_hash, existing_meta in list(registry.all_entries().items()): + if ( + existing_hash != result.file_hash + and not existing_meta.get("doc_name") + and existing_meta.get("name") == file_path.name + ): + registry.remove_by_hash(existing_hash) + registry.add(result.file_hash, meta) + + snapshot.mark_committed() + except Exception: + if snapshot is None: + click.echo(f" [ERROR] Failed to prepare mutation snapshot for {file_path.name}.") + _cleanup_staging(staging_dir) + return "failed" + rollback_error = snapshot.rollback_best_effort() + if rollback_error is None: + snapshot.discard_best_effort() + else: + click.echo( + " [ERROR] Rollback failed; mutation journal retained for recovery: " + f"{snapshot.journal_path}" + ) + _cleanup_staging(staging_dir) + return "failed" + finally: + _cleanup_staging(staging_dir) - append_log(kb_dir / "wiki", "ingest", file_path.name) + try: + append_log(kb_dir / "wiki", "ingest", file_path.name) + except Exception as exc: + logger.warning("Failed to append ingest log for %s: %s", file_path.name, exc) + cleanup_error = snapshot.discard_best_effort() + if cleanup_error is not None: + click.echo( + f" [WARN] {file_path.name} added, but mutation journal cleanup failed: {cleanup_error}" + ) click.echo(f" [OK] {file_path.name} added to knowledge base.") return "added" -def _cleanup_failed_cloud_import(kb_dir: Path, doc_name: str) -> None: - """Best-effort wiki cleanup after a cloud import whose compilation failed. - - import_cloud_document writes the summary + per-page JSON source before - compile, and compile_long_doc writes concept/entity pages incrementally — so - a compile failure (which happens before the registry entry is added) would - otherwise strand wiki artifacts that ``openkb remove`` cannot reach. Mirror - remove's wiki cleanup (by doc_name, idempotent) but touch neither the - registry (no entry was added) nor PageIndex (the cloud doc is the user's). - """ - from openkb.agent.compiler import ( - remove_doc_from_concept_pages, - remove_doc_from_entity_pages, - remove_doc_from_index, - ) - - wiki_dir = kb_dir / "wiki" - (wiki_dir / "summaries" / f"{doc_name}.md").unlink(missing_ok=True) - (wiki_dir / "sources" / f"{doc_name}.json").unlink(missing_ok=True) - images_dir = wiki_dir / "sources" / "images" / doc_name - if images_dir.is_dir(): - shutil.rmtree(images_dir, ignore_errors=True) - concept_result = remove_doc_from_concept_pages(wiki_dir, doc_name, keep_empty=False) - entity_result = remove_doc_from_entity_pages(wiki_dir, doc_name, keep_empty=False) - remove_doc_from_index( - wiki_dir, doc_name, concept_result["deleted"], - entity_slugs_deleted=entity_result["deleted"], - ) - - def import_from_pageindex_cloud( doc_id: str, kb_dir: Path ) -> Literal["added", "skipped", "failed"]: @@ -514,61 +602,87 @@ def import_from_pageindex_cloud( return "skipped" click.echo(f"Importing from PageIndex Cloud: {doc_id}") + snapshot: MutationSnapshot | None = None + doc_name = "" try: - import_result = import_cloud_document(doc_id, kb_dir, path_key) - except Exception as exc: - click.echo(f" [ERROR] Import failed: {exc}") - logger.debug("Cloud import traceback:", exc_info=True) - return "failed" - - doc_name = import_result.doc_name - summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md" - click.echo(f" Compiling imported doc (doc_id={doc_id})...") - compiled = False - for attempt in range(2): try: - asyncio.run( - compile_long_doc( - doc_name, summary_path, doc_id, kb_dir, model, - doc_description=import_result.description, - ) - ) - compiled = True - break + cloud = prepare_cloud_import(doc_id, kb_dir, path_key) except Exception as exc: - if attempt == 0: - click.echo(" Retrying compilation in 2s...") - time.sleep(2) - else: - click.echo(f" [ERROR] Compilation failed: {exc}") - logger.debug("Compilation traceback:", exc_info=True) - if not compiled: - # No registry entry exists yet, so `openkb remove` can't reach the - # summary/source/concept/entity artifacts already written; clean them - # best-effort so a failed import leaves no orphans and a retry is clean. - try: - _cleanup_failed_cloud_import(kb_dir, doc_name) - except Exception: - logger.debug("Cleanup after failed cloud import errored:", exc_info=True) - return "failed" + click.echo(f" [ERROR] Import failed: {exc}") + logger.debug("Cloud import traceback:", exc_info=True) + return "failed" - # Register the raw-less cloud entry only after successful compilation. - registry = HashRegistry(openkb_dir / "hashes.json") - meta = { - "name": import_result.name, - "doc_name": doc_name, - "type": "pageindex_cloud", - "origin": "cloud", - "path": path_key, - "source_path": _registry_path( - kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir - ), - "doc_id": doc_id, - } - registry.remove_by_doc_name(doc_name) - registry.add(synthetic_hash, meta) + doc_name = cloud.doc_name + snapshot = snapshot_paths( + kb_dir, + _snapshot_add_paths(kb_dir, doc_name, None, None), + operation="cloud_import", + details={"doc_id": doc_id, "doc_name": doc_name}, + hardlink_dirs={ + kb_dir / "wiki" / "concepts", + kb_dir / "wiki" / "entities", + kb_dir / ".openkb" / "files", + }, + ) + summary_path = _write_long_doc_artifacts( + cloud.tree, + cloud.all_pages, + doc_name, + doc_id, + kb_dir, + description=cloud.description, + ) + _run_compile_with_retry( + lambda: compile_long_doc( + doc_name, + summary_path, + doc_id, + kb_dir, + model, + doc_description=cloud.description, + ), + label=f"Compiling imported doc (doc_id={doc_id})", + ) + + # Register the raw-less cloud entry only after successful compilation. + registry = HashRegistry(openkb_dir / "hashes.json") + meta = { + "name": cloud.cloud_name, + "doc_name": doc_name, + "type": "pageindex_cloud", + "origin": "cloud", + "path": path_key, + "source_path": _registry_path( + kb_dir / "wiki" / "sources" / f"{doc_name}.json", kb_dir + ), + "doc_id": doc_id, + } + registry.remove_by_doc_name(doc_name) + registry.add(synthetic_hash, meta) + snapshot.mark_committed() + except Exception: + if snapshot is None: + click.echo(f" [ERROR] Failed to prepare mutation snapshot for cloud import {doc_id}.") + return "failed" + rollback_error = snapshot.rollback_best_effort() + if rollback_error is None: + snapshot.discard_best_effort() + else: + click.echo( + " [ERROR] Rollback failed; mutation journal retained for recovery: " + f"{snapshot.journal_path}" + ) + return "failed" - append_log(kb_dir / "wiki", "ingest", doc_name) + try: + append_log(kb_dir / "wiki", "ingest", doc_name) + except Exception as exc: + logger.warning("Failed to append ingest log for cloud import %s: %s", doc_id, exc) + cleanup_error = snapshot.discard_best_effort() + if cleanup_error is not None: + click.echo( + f" [WARN] {doc_name} imported, but mutation journal cleanup failed: {cleanup_error}" + ) click.echo(f" [OK] {doc_name} imported from PageIndex Cloud.") return "added" @@ -847,7 +961,7 @@ def add(ctx, path, from_pageindex_cloud): fetched = fetch_url_to_raw(path, kb_dir) if fetched is None: return - outcome = add_single_file(fetched, kb_dir) + outcome = add_single_file(fetched, kb_dir, stage=False) # Only clean up on dedup-skip. On "failed" we keep the file so # the user can retry (e.g. transient LLM error during compile) # without re-downloading — and so they don't lose data when diff --git a/openkb/converter.py b/openkb/converter.py index 4b9246ec..fc3c96b5 100644 --- a/openkb/converter.py +++ b/openkb/converter.py @@ -14,6 +14,7 @@ from openkb.config import load_config from openkb.images import copy_relative_images, extract_base64_images, convert_pdf_with_images +from openkb.locks import atomic_write_text, kb_ingest_lock from openkb.state import HashRegistry logger = logging.getLogger(__name__) @@ -70,7 +71,13 @@ def _name_taken(candidate: str, registry: HashRegistry) -> bool: return False -def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str: +def resolve_doc_name( + src: Path, + kb_dir: Path, + registry: HashRegistry, + *, + persist_legacy: bool = True, +) -> str: """Resolve the stable wiki name for ``src`` (Scheme A). Identity is keyed by path: a source we've seen before (same path, even @@ -93,9 +100,10 @@ def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str: file_hash, meta = legacy meta = dict(meta) name = meta.get("doc_name") or Path(meta.get("name", "")).stem - meta["doc_name"] = name - meta["path"] = path_key - registry.add(file_hash, meta) # backfill + persist + if persist_legacy: + meta["doc_name"] = name + meta["path"] = path_key + registry.add(file_hash, meta) # backfill + persist return name return resolve_doc_name_from_key(src.stem, path_key, registry) @@ -130,7 +138,12 @@ def get_pdf_page_count(path: Path) -> int: return doc.page_count -def convert_document(src: Path, kb_dir: Path) -> ConvertResult: +def convert_document( + src: Path, + kb_dir: Path, + *, + staging_dir: Path | None = None, +) -> ConvertResult: """Convert a document and integrate it into the knowledge base. Steps: @@ -141,86 +154,93 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult: 5. Otherwise — run MarkItDown, extract base64 images, save to ``wiki/sources/``. 6. Register hash in the registry. """ - # ------------------------------------------------------------------ - # Load config & state - # ------------------------------------------------------------------ - openkb_dir = kb_dir / ".openkb" - config = load_config(openkb_dir / "config.yaml") - threshold: int = config.get("pageindex_threshold", 20) - registry = HashRegistry(openkb_dir / "hashes.json") - - # ------------------------------------------------------------------ - # 1. Hash check + identity resolution - # ------------------------------------------------------------------ - file_hash = HashRegistry.hash_file(src) - if registry.is_known(file_hash): - logger.info("Skipping already-known file: %s", src.name) - stored = registry.get(file_hash) or {} - return ConvertResult( - skipped=True, - file_hash=file_hash, - doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem, - ) - doc_name = resolve_doc_name(src, kb_dir, registry) - - # ------------------------------------------------------------------ - # 2. Copy to raw/ - # ------------------------------------------------------------------ - raw_dir = kb_dir / "raw" - raw_dir.mkdir(parents=True, exist_ok=True) - if src.resolve().is_relative_to(raw_dir.resolve()): - # Watch mode: the file already lives in raw/ — don't copy/rename. - raw_dest = src - else: - raw_dest = raw_dir / f"{doc_name}{src.suffix.lower()}" - shutil.copy2(src, raw_dest) - - # ------------------------------------------------------------------ - # 3. PDF long-doc detection - # ------------------------------------------------------------------ - if src.suffix.lower() == ".pdf": - page_count = get_pdf_page_count(src) - if page_count >= threshold: - logger.info( - "Long PDF detected (%d pages >= %d threshold): %s", - page_count, - threshold, - src.name, - ) + with kb_ingest_lock(kb_dir / ".openkb"): + # ------------------------------------------------------------------ + # Load config & state + # ------------------------------------------------------------------ + openkb_dir = kb_dir / ".openkb" + config = load_config(openkb_dir / "config.yaml") + threshold: int = config.get("pageindex_threshold", 20) + artifact_root = staging_dir if staging_dir is not None else kb_dir + registry = HashRegistry(openkb_dir / "hashes.json") + + # ------------------------------------------------------------------ + # 1. Hash check + identity resolution + # ------------------------------------------------------------------ + file_hash = HashRegistry.hash_file(src) + if registry.is_known(file_hash): + logger.info("Skipping already-known file: %s", src.name) + stored = registry.get(file_hash) or {} return ConvertResult( - raw_path=raw_dest, - is_long_doc=True, + skipped=True, file_hash=file_hash, - doc_name=doc_name, + doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem, ) + doc_name = resolve_doc_name( + src, + kb_dir, + registry, + persist_legacy=staging_dir is None, + ) - # ------------------------------------------------------------------ - # 4/5. Convert to Markdown - # ------------------------------------------------------------------ - sources_dir = kb_dir / "wiki" / "sources" - sources_dir.mkdir(parents=True, exist_ok=True) - images_dir = kb_dir / "wiki" / "sources" / "images" / doc_name - images_dir.mkdir(parents=True, exist_ok=True) - - if src.suffix.lower() == ".md": - markdown = src.read_text(encoding="utf-8") - markdown = copy_relative_images(markdown, src.parent, doc_name, images_dir) - elif src.suffix.lower() == ".pdf": - # Use pymupdf dict-mode for PDFs: text + images inline at correct positions - markdown = convert_pdf_with_images(src, doc_name, images_dir) - else: - # Non-PDF, non-MD: use markitdown (docx, pptx, html, etc.) - mid = MarkItDown() - result = mid.convert(str(src)) - markdown = result.text_content - markdown = extract_base64_images(markdown, doc_name, images_dir) - - dest_md = sources_dir / f"{doc_name}.md" - dest_md.write_text(markdown, encoding="utf-8") - - return ConvertResult( - raw_path=raw_dest, - source_path=dest_md, - file_hash=file_hash, - doc_name=doc_name, - ) + # ------------------------------------------------------------------ + # 2. Copy to raw/ + # ------------------------------------------------------------------ + raw_dir = artifact_root / "raw" + raw_dir.mkdir(parents=True, exist_ok=True) + if staging_dir is None and src.resolve().is_relative_to(raw_dir.resolve()): + # Watch mode: the file already lives in raw/ — don't copy/rename. + raw_dest = src + else: + raw_dest = raw_dir / f"{doc_name}{src.suffix.lower()}" + shutil.copy2(src, raw_dest) + + # ------------------------------------------------------------------ + # 3. PDF long-doc detection + # ------------------------------------------------------------------ + if src.suffix.lower() == ".pdf": + page_count = get_pdf_page_count(src) + if page_count >= threshold: + logger.info( + "Long PDF detected (%d pages >= %d threshold): %s", + page_count, + threshold, + src.name, + ) + return ConvertResult( + raw_path=raw_dest, + is_long_doc=True, + file_hash=file_hash, + doc_name=doc_name, + ) + + # ------------------------------------------------------------------ + # 4/5. Convert to Markdown + # ------------------------------------------------------------------ + sources_dir = artifact_root / "wiki" / "sources" + sources_dir.mkdir(parents=True, exist_ok=True) + images_dir = artifact_root / "wiki" / "sources" / "images" / doc_name + images_dir.mkdir(parents=True, exist_ok=True) + + if src.suffix.lower() == ".md": + markdown = src.read_text(encoding="utf-8") + markdown = copy_relative_images(markdown, src.parent, doc_name, images_dir) + elif src.suffix.lower() == ".pdf": + # Use pymupdf dict-mode for PDFs: text + images inline at correct positions + markdown = convert_pdf_with_images(src, doc_name, images_dir) + else: + # Non-PDF, non-MD: use markitdown (docx, pptx, html, etc.) + mid = MarkItDown() + result = mid.convert(str(src)) + markdown = result.text_content + markdown = extract_base64_images(markdown, doc_name, images_dir) + + dest_md = sources_dir / f"{doc_name}.md" + atomic_write_text(dest_md, markdown) + + return ConvertResult( + raw_path=raw_dest, + source_path=dest_md, + file_hash=file_hash, + doc_name=doc_name, + ) diff --git a/openkb/indexer.py b/openkb/indexer.py index 691a126c..fd50a38c 100644 --- a/openkb/indexer.py +++ b/openkb/indexer.py @@ -5,7 +5,7 @@ import logging from dataclasses import dataclass -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import Any import os @@ -37,6 +37,30 @@ class CloudImportResult: description: str +@dataclass +class CloudImportData: + """A fetched cloud doc + its resolved wiki name, before any KB write. + + Returned by :func:`prepare_cloud_import` so the caller can snapshot this + doc's specific paths (O(1)) before :func:`_write_long_doc_artifacts` writes + them — instead of copying the whole summaries/sources trees on every import. + """ + + doc_id: str + doc_name: str # collision-resistant wiki slug (resolved, not yet written) + cloud_name: str # cloud display name (original filename in the cloud) + description: str + tree: dict + all_pages: list + + +def _cloud_display_stem(cloud_name: str, fallback: str) -> str: + """Return a platform-independent stem for a PageIndex Cloud display name.""" + normalized = cloud_name.replace("\\", "/").rstrip("/") + leaf = normalized.rsplit("/", 1)[-1] if normalized else "" + return PurePosixPath(leaf).stem or fallback + + def _normalize_page_content(raw_pages: Any) -> list[dict[str, Any]]: """Normalize PageIndex/local PDF page content into OpenKB's JSON shape.""" if not isinstance(raw_pages, list): @@ -246,14 +270,13 @@ def _fetch_cloud_pages(col, doc_id: str) -> list[dict[str, Any]]: return pages -def import_cloud_document(doc_id: str, kb_dir: Path, path_key: str) -> CloudImportResult: - """Import an already-indexed PageIndex Cloud document by ``doc_id``. +def prepare_cloud_import(doc_id: str, kb_dir: Path, path_key: str) -> CloudImportData: + """Fetch a PageIndex Cloud doc and resolve its wiki name WITHOUT writing. - Fetches structure + OCR'd page content from the cloud (no local PDF) and - writes the same wiki artifacts as :func:`index_long_document`. Requires - ``PAGEINDEX_API_KEY``. ``path_key`` is the synthetic identity key - (``pageindex-cloud:``) used to resolve a collision-resistant - wiki name. + Cloud fetch + collision-resistant name resolution only — no KB mutation — + so the caller knows ``doc_name`` before writing and can snapshot just this + doc's paths instead of copying the whole summaries/sources trees. Name + resolution reads the registry but does not mutate it. """ from openkb.converter import resolve_doc_name_from_key from openkb.state import HashRegistry @@ -274,7 +297,7 @@ def import_cloud_document(doc_id: str, kb_dir: Path, path_key: str) -> CloudImpo structure: list = doc.get("structure", []) registry = HashRegistry(kb_dir / ".openkb" / "hashes.json") - stem = Path(cloud_name).stem or doc_id + stem = _cloud_display_stem(cloud_name, doc_id) doc_name = resolve_doc_name_from_key(stem, path_key, registry) tree = { @@ -289,7 +312,32 @@ def import_cloud_document(doc_id: str, kb_dir: Path, path_key: str) -> CloudImpo f"No page content returned from PageIndex Cloud for doc_id={doc_id}" ) - _write_long_doc_artifacts(tree, all_pages, doc_name, doc_id, kb_dir, description=description) + return CloudImportData( + doc_id=doc_id, doc_name=doc_name, cloud_name=cloud_name, + description=description, tree=tree, all_pages=all_pages, + ) + + +def import_cloud_document(doc_id: str, kb_dir: Path, path_key: str) -> CloudImportResult: + """Import an already-indexed PageIndex Cloud document by ``doc_id``. + + Fetches structure + OCR'd page content from the cloud (no local PDF) and + writes the same wiki artifacts as :func:`index_long_document`. Requires + ``PAGEINDEX_API_KEY``. ``path_key`` is the synthetic identity key + (``pageindex-cloud:``) used to resolve a collision-resistant + wiki name. + + Writes immediately. Callers that need to snapshot before writing (e.g. the + crash-safe CLI path) should call :func:`prepare_cloud_import` then + :func:`_write_long_doc_artifacts`, so the snapshot can cover only this + doc's paths. + """ + cloud = prepare_cloud_import(doc_id, kb_dir, path_key) + _write_long_doc_artifacts( + cloud.tree, cloud.all_pages, cloud.doc_name, cloud.doc_id, kb_dir, + description=cloud.description, + ) return CloudImportResult( - doc_id=doc_id, doc_name=doc_name, name=cloud_name, description=description, + doc_id=cloud.doc_id, doc_name=cloud.doc_name, + name=cloud.cloud_name, description=cloud.description, ) diff --git a/openkb/lint.py b/openkb/lint.py index 8b7674b7..ce695ecc 100644 --- a/openkb/lint.py +++ b/openkb/lint.py @@ -16,6 +16,7 @@ import yaml from openkb import frontmatter +from openkb.locks import atomic_write_text from openkb.schema import PAGE_CONTENT_DIRS # Matches [[wikilink]] or [[subdir/link]] @@ -249,7 +250,7 @@ def fix_broken_links( text, known_targets, norm_index=norm_index, ) if cleaned != text: - md.write_text(cleaned, encoding="utf-8") + atomic_write_text(md, cleaned) files_changed += 1 ghosts_stripped += len(ghosts) return files_changed, ghosts_stripped diff --git a/openkb/locks.py b/openkb/locks.py index 2fa2815a..72966fc1 100644 --- a/openkb/locks.py +++ b/openkb/locks.py @@ -9,6 +9,7 @@ import contextlib import json +import logging import os import tempfile import threading @@ -97,6 +98,32 @@ def _local_lock(lock_path: Path) -> _LocalRwLock: return lock +def _drain_pending_journals(openkb_dir: Path) -> None: + """Roll back any mutation journals an interrupted process left behind. + + Draining recovery is part of *taking* the mutation lock, not part of any + one command: a process that acquires the exclusive lock must restore the + KB to a known state before mutating it. Wiring this into ``kb_lock`` means + every exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``, + ``chat`` — drains on first acquisition, so an ``add`` that crashed mid- + commit cannot leave an active journal on disk that a later ``add`` rolls + back over the top of an intervening ``remove``/``recompile`` (clobbering + those edits). ``openkb_dir`` is the ``kb_dir/.openkb`` directory callers + pass to ``kb_lock``; the journal lives at ``openkb_dir/journal``, so the + KB root is ``openkb_dir.parent``. + + The delayed import breaks the ``locks`` ↔ ``mutation`` cycle (``mutation`` + imports atomic-write helpers from this module at top level). Called only + on first OS-lock acquisition (the reentrant branch above returns early), + never on a read lock, so queries pay nothing. + """ + from openkb.mutation import recover_pending_journals + + log = logging.getLogger(__name__) + for message in recover_pending_journals(openkb_dir.parent): + log.warning(message) + + @contextlib.contextmanager def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]: """Hold a KB-level advisory lock.""" @@ -134,6 +161,8 @@ def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]: flock(fh, exclusive=exclusive) held[resolved] = (1, 0) if exclusive else (0, 1) try: + if exclusive: + _drain_pending_journals(openkb_dir) yield finally: held.pop(resolved, None) diff --git a/openkb/mutation.py b/openkb/mutation.py new file mode 100644 index 00000000..cb1353b5 --- /dev/null +++ b/openkb/mutation.py @@ -0,0 +1,431 @@ +"""Transactional helpers for KB mutation paths.""" +from __future__ import annotations + +import errno +import json +import logging +import os +import shutil +import tempfile +import uuid +from dataclasses import dataclass, field +from pathlib import Path + +from openkb.locks import _fsync_directory, _target_mode, atomic_write_json + +logger = logging.getLogger(__name__) + +# Cap how many times recover_pending_journals retries an active journal whose +# rollback keeps failing. Without a cap, a deterministically-failing rollback +# (e.g. persistent ENOSPC) is retried on every lock acquisition forever, +# re-doing the failed work and never releasing the backup dir + journal. +MAX_ROLLBACK_ATTEMPTS = 5 + + +def _apply_mode(path: Path, mode: int) -> None: + """Set ``path``'s permission bits (no-op where ``os.chmod`` is absent).""" + if hasattr(os, "chmod"): + os.chmod(path, mode) + + +def _fsync_file(path: Path) -> None: + """Best-effort fsync of a file's data, for durability after a rename. + + Opens read+write so ``FlushFileBuffers`` works on Windows (a read-only + handle can be denied). Best-effort: a failure here only weakens durability + of already-written bytes (the OS write-back still flushes them); it must + not fail the publish. + """ + try: + with open(path, "r+b") as fh: + os.fsync(fh.fileno()) + except OSError: + pass + + +def _hardlink_or_copy(src: str, dst: str) -> None: + """``copytree`` copy_function that hardlinks (O(1), shares the inode). + + Used for directory backups the caller has marked hardlink-safe — trees + whose writers all go through atomic temp+replace (so the live file moves + to a new inode) or that are append-only across documents. The hardlink + backup then keeps pointing at the old inode while the live tree is + mutated, so rollback restores the pre-mutation bytes without copying them + up front. Falls back to a real copy on EXDEV/EPERM/EACCES — cross-device, + a filesystem that forbids hardlinks, or (Windows) an ACL / cloud-sync + folder (OneDrive/Dropbox) that blocks CREATE_HARD_LINK. If the copy also + fails it surfaces the real error. + """ + src_path = Path(src) + dst_path = Path(dst) + try: + os.link(src_path, dst_path) + except OSError as exc: + if exc.errno not in (errno.EXDEV, errno.EPERM, errno.EACCES): + raise + shutil.copy2(src_path, dst_path) + + +def _copy_file_atomic(src: Path, dest: Path) -> None: + """Stream ``src`` to ``dest`` through a temp file, then atomically replace. + + Streams (never buffers the whole file) so copying a large raw PDF does + not spike peak memory. The temp-file + ``os.replace`` means a torn + intermediate state can never be observed at ``dest``. Used by snapshot + backup creation, rollback restore, and the cross-filesystem fallback of + :func:`_publish_staged_file` — so every byte copy in this module shares + one atomic, streaming, durable semantic: the parent directory is fsynced + and the result carries the umask mode (not ``mkstemp``'s 0600). + """ + dest.parent.mkdir(parents=True, exist_ok=True) + # Capture the destination mode before the temp file shadows it: a brand- + # new file gets the process umask mode (0o666 & ~umask), an existing file + # keeps its current mode — the same rule ``atomic_write_bytes`` applies. + mode = _target_mode(dest) + fd, tmp_name = tempfile.mkstemp(prefix=f".{dest.name}.", suffix=".tmp", dir=dest.parent) + tmp_path = Path(tmp_name) + try: + with os.fdopen(fd, "wb") as out, src.open("rb") as inp: + shutil.copyfileobj(inp, out) + out.flush() + os.fsync(out.fileno()) + os.replace(tmp_path, dest) + _apply_mode(dest, mode) + _fsync_directory(dest.parent) + finally: + tmp_path.unlink(missing_ok=True) + + +def _publish_staged_file(src: Path, dest: Path) -> None: + """Publish one staged file into its live-KB location. + + Staging sits on the same filesystem as ``raw/`` and ``wiki/sources/``, so + an O(1) atomic ``os.replace`` (rename) is used instead of streaming the + bytes — a full copy + fsync per published file was the old per-file cost. + Only on ``EXDEV`` (staging and the live KB genuinely on different devices) + does it fall back to :func:`_copy_file_atomic`. Both branches leave the + result durable (file data + parent dir fsynced) and at the umask mode. + """ + dest.parent.mkdir(parents=True, exist_ok=True) + mode = _target_mode(dest) + try: + os.replace(src, dest) + except OSError as exc: + if exc.errno != errno.EXDEV: + raise + _copy_file_atomic(src, dest) # already fsyncs data + dir + sets mode + return + _apply_mode(dest, mode) + # Parity with _copy_file_atomic: the renamed inode's data may still be in + # the page cache. Without this, a crash right after publish can leave a + # 0-byte / stale raw or source file that committed metadata points at, + # even though the directory entry (fsynced below) survived. + _fsync_file(dest) + _fsync_directory(dest.parent) + + +@dataclass +class MutationSnapshot: + """Snapshot of final KB paths touched by a mutation attempt.""" + + kb_dir: Path + backup_dir: Path + journal_path: Path + operation: str + details: dict = field(default_factory=dict) + entries: dict[Path, Path | None] = field(default_factory=dict) + attempts: int = 0 + # Dirs whose backup was hardlinked (in-process only; not persisted, so a + # crash-rebuilt snapshot leaves this empty and rollback falls back to the + # safe full-copy path). Drives O(touched) rollback via inode-diff restore. + hardlinked_dirs: set[Path] = field(default_factory=set) + + def _journal_data(self, status: str) -> dict: + return { + "version": 1, + "operation": self.operation, + "status": status, + "kb_dir": str(self.kb_dir), + "backup_dir": str(self.backup_dir), + "details": self.details, + "attempts": self.attempts, + "entries": [ + { + "target": str(target), + "backup": str(backup) if backup is not None else None, + } + for target, backup in self.entries.items() + ], + } + + def write_journal(self, status: str) -> None: + self.journal_path.parent.mkdir(parents=True, exist_ok=True) + atomic_write_json(self.journal_path, self._journal_data(status)) + + def mark_committed(self) -> None: + """Mark the journal committed without removing the backup. + + Call this the instant the mutation is durably applied (e.g. the + registry write has landed) so a subsequent + :func:`recover_pending_journals` discards the journal instead of + rolling it back. This is the commit signal; :meth:`discard` is the + post-commit cleanup that also removes the backup dir and journal + file and must itself be best-effort — it runs *after* the commit + point and its failure must never trigger a rollback. + """ + self.write_journal("committed") + + def rollback(self) -> None: + # Restore children before parents so directory deletes cannot remove + # paths that still need to be restored from a more specific backup. + for target, backup in sorted( + self.entries.items(), + key=lambda item: len(item[0].parts), + reverse=True, + ): + # A hardlinked dir backup supports an O(touched) inode-diff restore + # (leave untouched shared-inode files, only touch changed ones) — + # do NOT rmtree it first, which would discard those shared inodes. + if target.is_dir() and target in self.hardlinked_dirs: + if backup is not None and backup.is_dir(): + _restore_hardlinked_dir(backup, target) + else: + shutil.rmtree(target, ignore_errors=True) # new dir, no backup + continue + # Non-hardlinked (file, or copied dir): unconditional remove + restore. + if target.is_dir(): + shutil.rmtree(target, ignore_errors=True) + else: + target.unlink(missing_ok=True) + if backup is None: + continue + target.parent.mkdir(parents=True, exist_ok=True) + if backup.is_dir(): + shutil.copytree(backup, target) + else: + _copy_file_atomic(backup, target) + self.write_journal("rolled_back") + + def rollback_best_effort(self) -> Exception | None: + try: + self.rollback() + except Exception as exc: + logger.warning("Mutation rollback failed: %s", exc) + return exc + return None + + def discard(self) -> None: + # Best-effort post-commit/post-rollback cleanup: callers have already + # written a terminal status (mark_committed or rollback), so there is + # nothing to re-write here — doing so would be dead work and would + # silently downgrade a "rolled_back" journal to "committed" moments + # before it is deleted. + shutil.rmtree(self.backup_dir, ignore_errors=True) + self.journal_path.unlink(missing_ok=True) + + def discard_best_effort(self) -> Exception | None: + try: + self.discard() + except Exception as exc: + logger.warning("Mutation journal cleanup failed: %s", exc) + return exc + return None + + +def _restore_hardlinked_dir(backup: Path, target: Path) -> None: + """O(touched) restore for a hardlinked directory backup. + + The backup was built with ``os.link``, so live files the mutation never + touched still share the backup's inode — leave them. Only files the + mutation changed need work: new files (no backup counterpart) are removed, + modified files (atomic temp+replace → new inode) and deleted files are + restored from the backup's pre-mutation bytes. This avoids recopying the + whole tree on rollback — the cost that bit ``.openkb/files`` (the blob + store) and large concept/entity trees on every failed add. + + Degrades gracefully to a full copy if the backup isn't actually hardlinked + (e.g. the EXDEV/EACCES fallback fired at snapshot time): every file then has + a different inode, so every file is treated as modified and recopied. + """ + def _file_key(path: Path) -> tuple[int, int]: + st = path.stat() # follows symlinks; these trees hold regular files only + return (st.st_dev, st.st_ino) + + backup_files = {p.relative_to(backup): p for p in backup.rglob("*") if p.is_file()} + + # Pass 1: remove new + modified live regular files; leave untouched ones + # (they share the backup inode) in place. + if target.exists(): + for live in list(target.rglob("*")): + if not live.is_file(): + continue + counterpart = backup_files.get(live.relative_to(target)) + if counterpart is None or _file_key(live) != _file_key(counterpart): + live.unlink() + + # Pass 2: restore modified + deleted files from backup. + for rel, src in backup_files.items(): + dest = target / rel + if not dest.exists() or _file_key(dest) != _file_key(src): + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src, dest) + + # Pass 3: prune directories the mutation created that are now empty. + if target.exists(): + for d in sorted((p for p in target.rglob("*") if p.is_dir()), + key=lambda p: len(p.parts), reverse=True): + if not (backup / d.relative_to(target)).exists() and not any(d.iterdir()): + d.rmdir() + + +def snapshot_paths( + kb_dir: Path, + paths: list[Path], + *, + operation: str, + details: dict | None = None, + hardlink_dirs: set[Path] | None = None, +) -> MutationSnapshot: + """Snapshot final KB paths before a mutation starts. + + ``hardlink_dirs`` marks directories whose backup may be hardlinks instead + of copies (O(1), no per-file byte copy). A directory is only safe to list + here if every writer into it is either atomic temp+replace (new inode, so + the hardlink backup keeps the old bytes) or append-only. This is the + required caller contract for hardlinked dirs; any in-place writer into one + of those trees would silently corrupt the backup and make rollback a no-op + for that file. + """ + kb_dir = kb_dir.resolve() + hardlink_resolved = {p.resolve() for p in (hardlink_dirs or ())} + journal_id = uuid.uuid4().hex + backup_dir = kb_dir / ".openkb" / "staging" / f"rollback-{journal_id}" + backup_dir.mkdir(parents=True, exist_ok=False) + snapshot = MutationSnapshot( + kb_dir=kb_dir, + backup_dir=backup_dir, + journal_path=kb_dir / ".openkb" / "journal" / f"{journal_id}.json", + operation=operation, + details=details or {}, + ) + try: + for path in paths: + target = path.resolve() + if target in snapshot.entries: + continue + if not target.exists(): + snapshot.entries[target] = None + continue + rel = target.relative_to(kb_dir) + backup = backup_dir / rel + backup.parent.mkdir(parents=True, exist_ok=True) + if target.is_dir(): + if target in hardlink_resolved: + shutil.copytree(target, backup, copy_function=_hardlink_or_copy) + snapshot.hardlinked_dirs.add(target) + else: + shutil.copytree(target, backup) + else: + _copy_file_atomic(target, backup) + snapshot.entries[target] = backup + # The active journal is the recovery signal: once this exists, a future + # process can restore every recorded target even if the current one exits. + snapshot.write_journal("active") + except Exception: + # Partial snapshot: backup_dir exists on disk but no journal was + # written. recover_pending_journals only scans journals, so remove the + # orphan backup here — otherwise it leaks forever with nothing able to + # reach or clean it. + shutil.rmtree(backup_dir, ignore_errors=True) + raise + return snapshot + + +def _snapshot_from_journal(path: Path, data: dict) -> MutationSnapshot: + snapshot = MutationSnapshot( + kb_dir=Path(data["kb_dir"]), + backup_dir=Path(data["backup_dir"]), + journal_path=path, + operation=data.get("operation", "mutation"), + details=data.get("details") or {}, + ) + snapshot.entries = { + Path(item["target"]): Path(item["backup"]) if item.get("backup") else None + for item in data.get("entries", []) + } + snapshot.attempts = int(data.get("attempts", 0) or 0) + return snapshot + + +def recover_pending_journals(kb_dir: Path) -> list[str]: + """Rollback active journals left by an interrupted process.""" + journal_dir = kb_dir / ".openkb" / "journal" + if not journal_dir.is_dir(): + return [] + messages: list[str] = [] + for journal_path in sorted(journal_dir.glob("*.json")): + snapshot: MutationSnapshot | None = None + try: + data = json.loads(journal_path.read_text(encoding="utf-8")) + snapshot = _snapshot_from_journal(journal_path, data) + status = data.get("status", "active") + if status in {"committed", "rolled_back"}: + snapshot.discard() + messages.append(f"Cleaned terminal mutation journal {journal_path.name}.") + continue + snapshot.rollback() + snapshot.discard() + messages.append( + f"Rolled back interrupted {snapshot.operation} journal {journal_path.name}." + ) + except Exception as exc: + if snapshot is None: + # The journal couldn't be read or reconstructed (corrupt/empty/ + # stray .json, or missing the kb_dir/backup_dir keys recovery + # needs). There is nothing to roll back or retry — and leaving + # it in place would re-trigger this failure on every future lock + # acquisition (draining runs on first exclusive acquisition), + # bricking add/remove/recompile/chat for the whole KB. Best-effort + # remove the unrecoverable journal and log loudly; any backup_dir + # it referenced is unreachable now and may leak. + journal_path.unlink(missing_ok=True) + messages.append( + f"Unrecoverable mutation journal {journal_path.name} " + f"({type(exc).__name__}: {exc}); removed so it can't block " + f"recovery. The KB may need manual review." + ) + continue + # Rollback failed. Retry a bounded number of times across recovery + # runs (a later attempt may succeed once the cause clears, e.g. disk + # space freed), then give up: discard the journal + backup and log + # loudly so it can't leak forever re-doing the same failing rollback. + snapshot.attempts += 1 + if snapshot.attempts >= MAX_ROLLBACK_ATTEMPTS: + snapshot.discard() + messages.append( + f"GAVE UP on {snapshot.operation} journal {journal_path.name} after " + f"{snapshot.attempts} failed rollback(s): {type(exc).__name__}: {exc}. " + f"The KB may be in a partially-rolled-back state — manual review needed." + ) + else: + snapshot.write_journal("active") # persist incremented attempts + messages.append( + f"Rollback of {snapshot.operation} journal {journal_path.name} failed " + f"(attempt {snapshot.attempts}/{MAX_ROLLBACK_ATTEMPTS}): " + f"{type(exc).__name__}: {exc}; retained for retry." + ) + return messages + + +def publish_staged_tree(staging_dir: Path | None, kb_dir: Path) -> None: + """Move staged raw/source artifacts into their final KB locations.""" + if staging_dir is None or not staging_dir.exists(): + return + for rel in ("raw", "wiki/sources"): + src_root = staging_dir / rel + if not src_root.exists(): + continue + for src in src_root.rglob("*"): + if not src.is_file(): + continue + _publish_staged_file(src, kb_dir / rel / src.relative_to(src_root)) diff --git a/tests/test_add_command.py b/tests/test_add_command.py index f819900f..8dacec26 100644 --- a/tests/test_add_command.py +++ b/tests/test_add_command.py @@ -72,6 +72,24 @@ def test_add_single_file_calls_helper(self, tmp_path): runner.invoke(cli, ["add", str(doc)]) mock_add.assert_called_once_with(doc, kb_dir) + def test_add_single_file_compile_failure_rolls_back_converted_artifacts(self, tmp_path): + from openkb.cli import add_single_file + from openkb.state import HashRegistry + + kb_dir = self._setup_kb(tmp_path) + doc = tmp_path / "notes.md" + doc.write_text("# Notes\n\nBody", encoding="utf-8") + + with patch("openkb.agent.compiler.compile_short_doc", side_effect=RuntimeError("boom")), \ + patch("openkb.cli.time.sleep"), \ + patch("openkb.cli._setup_llm_key"): + outcome = add_single_file(doc, kb_dir) + + assert outcome == "failed" + assert not (kb_dir / "raw" / "notes.md").exists() + assert not (kb_dir / "wiki" / "sources" / "notes.md").exists() + assert HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries() == {} + def test_add_directory_calls_helper_for_each_file(self, tmp_path): kb_dir = self._setup_kb(tmp_path) docs_dir = tmp_path / "docs" @@ -188,11 +206,15 @@ def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path): doc = tmp_path / "notes.md" doc.write_text("# Notes, edited") # new content hash != "old-hash" - # Compilation mocked out (asyncio.run), but convert_document REAL so + # Compilation mocked out, but convert_document REAL so # the legacy backfill actually happens on disk mid-pipeline. + def close_coro(coro): + if hasattr(coro, "close"): + coro.close() + runner = CliRunner() with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \ - patch("openkb.cli.asyncio.run"): + patch("openkb.cli.asyncio.run", side_effect=close_coro): result = runner.invoke(cli, ["add", str(doc)]) assert "OK" in result.output @@ -255,19 +277,31 @@ def _setup_kb(self, tmp_path): (openkb_dir / "hashes.json").write_text(json.dumps({})) return tmp_path + def _cloud_data(self, doc_name="Cloud-Paper"): + from openkb.indexer import CloudImportData + + return CloudImportData( + doc_id="cloud-1", + doc_name=doc_name, + cloud_name="Cloud Paper.pdf", + description="desc", + tree={ + "doc_name": "Cloud Paper.pdf", + "doc_description": "desc", + "structure": [], + }, + all_pages=[{"page": 1, "content": "Cloud page", "images": []}], + ) + def test_registers_rawless_cloud_entry(self, tmp_path): import hashlib from openkb.cli import import_from_pageindex_cloud - from openkb.indexer import CloudImportResult from openkb.state import HashRegistry kb_dir = self._setup_kb(tmp_path) - result = CloudImportResult( - doc_id="cloud-1", doc_name="Cloud-Paper", name="Cloud Paper.pdf", - description="desc", - ) + cloud = self._cloud_data() - with patch("openkb.cli.import_cloud_document", return_value=result), \ + with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ patch("openkb.cli.compile_long_doc", return_value=None) as mock_compile, \ patch("openkb.cli._setup_llm_key"): outcome = import_from_pageindex_cloud("cloud-1", kb_dir) @@ -286,29 +320,25 @@ def test_registers_rawless_cloud_entry(self, tmp_path): def test_second_import_is_skipped(self, tmp_path): from openkb.cli import import_from_pageindex_cloud - from openkb.indexer import CloudImportResult kb_dir = self._setup_kb(tmp_path) - result = CloudImportResult( - doc_id="cloud-1", doc_name="Cloud-Paper", name="Cloud Paper.pdf", - description="desc", - ) + cloud = self._cloud_data() - with patch("openkb.cli.import_cloud_document", return_value=result) as mock_import, \ + with patch("openkb.cli.prepare_cloud_import", return_value=cloud) as mock_prepare, \ patch("openkb.cli.compile_long_doc", return_value=None), \ patch("openkb.cli._setup_llm_key"): import_from_pageindex_cloud("cloud-1", kb_dir) second = import_from_pageindex_cloud("cloud-1", kb_dir) assert second == "skipped" - assert mock_import.call_count == 1 # not fetched again + assert mock_prepare.call_count == 1 # not fetched again def test_import_failure_returns_failed_and_registers_nothing(self, tmp_path): from openkb.cli import import_from_pageindex_cloud from openkb.state import HashRegistry kb_dir = self._setup_kb(tmp_path) - with patch("openkb.cli.import_cloud_document", side_effect=RuntimeError("boom")), \ + with patch("openkb.cli.prepare_cloud_import", side_effect=RuntimeError("boom")), \ patch("openkb.cli._setup_llm_key"): outcome = import_from_pageindex_cloud("cloud-9", kb_dir) @@ -322,21 +352,15 @@ def test_compile_failure_cleans_up_orphan_artifacts(self, tmp_path): `openkb remove` couldn't reach them otherwise — and nothing is registered (so a retry isn't skipped).""" from openkb.cli import import_from_pageindex_cloud - from openkb.indexer import CloudImportResult from openkb.state import HashRegistry kb_dir = self._setup_kb(tmp_path) (kb_dir / "wiki" / "entities").mkdir(parents=True, exist_ok=True) (kb_dir / "wiki" / "index.md").write_text("# Index\n", encoding="utf-8") doc_name = "Cloud-Paper" - # Simulate the artifacts import_cloud_document writes before compile. - (kb_dir / "wiki" / "summaries" / f"{doc_name}.md").write_text("---\n---\n# s\n") - (kb_dir / "wiki" / "sources" / f"{doc_name}.json").write_text("[]") - result = CloudImportResult( - doc_id="cloud-1", doc_name=doc_name, name="Cloud Paper.pdf", description="d", - ) + cloud = self._cloud_data(doc_name=doc_name) - with patch("openkb.cli.import_cloud_document", return_value=result), \ + with patch("openkb.cli.prepare_cloud_import", return_value=cloud), \ patch("openkb.cli.compile_long_doc", side_effect=RuntimeError("boom")), \ patch("openkb.cli.time.sleep"), \ patch("openkb.cli._setup_llm_key"): diff --git a/tests/test_mutation.py b/tests/test_mutation.py new file mode 100644 index 00000000..f3dbf614 --- /dev/null +++ b/tests/test_mutation.py @@ -0,0 +1,520 @@ +from __future__ import annotations + +import errno +import os +from pathlib import Path + +import pytest + +from openkb.mutation import publish_staged_tree, recover_pending_journals, snapshot_paths + + +def test_recover_pending_add_journal_rolls_back_files(tmp_path): + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + new_file = kb_dir / "wiki" / "sources" / "doc.md" + + snapshot_paths( + kb_dir, + [target, new_file], + operation="add", + details={"doc_name": "doc"}, + ) + target.write_text("after", encoding="utf-8") + new_file.parent.mkdir(parents=True) + new_file.write_text("new", encoding="utf-8") + + messages = recover_pending_journals(kb_dir) + + assert any("Rolled back interrupted add journal" in message for message in messages) + assert target.read_text(encoding="utf-8") == "before" + assert not new_file.exists() + assert not any((openkb_dir / "journal").glob("*.json")) + + +def test_mark_committed_prevents_recovery_rollback(tmp_path): + """A snapshot marked committed must be discarded (not rolled back) by + recovery — the commit signal that protects a completed mutation from + being undone when post-commit cleanup fails. + """ + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [target], operation="add", details={"doc_name": "doc"} + ) + target.write_text("after", encoding="utf-8") # the "committed" mutation + snapshot.mark_committed() + + messages = recover_pending_journals(kb_dir) + + assert any("Cleaned terminal mutation journal" in m for m in messages) + assert target.read_text(encoding="utf-8") == "after" # NOT rolled back + assert not any((openkb_dir / "journal").glob("*.json")) + + +def test_snapshot_paths_cleans_backup_dir_on_failure(tmp_path): + """A partially-created snapshot must not leak its backup dir: on any + failure before the journal is written, snapshot_paths removes the + rollback dir it created (recover_pending_journals only scans journals + and could never reach it otherwise). + """ + kb_dir = tmp_path / "kb" + kb_dir.mkdir() + # A target that resolves OUTSIDE kb_dir makes relative_to(kb_dir) raise + # mid-loop, after backup_dir was already mkdir'd. + outside = tmp_path / "outside.txt" + outside.write_text("hi", encoding="utf-8") + + with pytest.raises(ValueError): + snapshot_paths(kb_dir, [outside], operation="add", details={}) + + staging = kb_dir / ".openkb" / "staging" + if staging.exists(): + assert not any(staging.iterdir()) # no orphan rollback- dir + + +def test_exclusive_lock_drains_active_journal_before_yielding(tmp_path): + """Recovery runs on every exclusive-lock acquisition, not just the add path. + + ``recover_pending_journals`` is wired into ``kb_lock``'s first exclusive + acquisition, so any mutation command — ``remove``/``recompile``/``lint``/ + ``chat``, all of which take ``kb_ingest_lock`` directly — drains a crashed + predecessor's active journal before it mutates. This is the regression + guard for the bug where an ``add`` crash left an active journal that an + intervening ``remove`` ignored and a later ``add`` then rolled back over + the remove's edits. + """ + from openkb.locks import kb_ingest_lock + + kb_dir = tmp_path + openkb_dir = kb_dir / ".openkb" + openkb_dir.mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + + # Simulate a crashed add: snapshot taken, file mutated, but mark_committed + # never ran — an ACTIVE journal is left on disk. + snapshot_paths(kb_dir, [target], operation="add", details={"doc_name": "doc"}) + target.write_text("after", encoding="utf-8") + + # Any exclusive-lock holder drains before its body runs. + with kb_ingest_lock(openkb_dir): + assert target.read_text(encoding="utf-8") == "before" + + assert target.read_text(encoding="utf-8") == "before" + assert not any((openkb_dir / "journal").glob("*.json")) + + +# --- publish_staged_tree: O(1) rename + durability (review #2) ------------- + +def _staged_raw(staging: Path, name: str, payload: bytes) -> Path: + src = staging / "raw" / name + src.parent.mkdir(parents=True, exist_ok=True) + src.write_bytes(payload) + return src + + +def test_publish_moves_staged_files_on_same_filesystem(tmp_path): + """Publish must rename staged files into place (O(1)) when staging and + the live KB share a filesystem, not stream-copy them. The surest + observable signal: after publish the staged source is GONE (moved), + whereas a copy leaves it behind. + """ + kb_dir = tmp_path / "kb" + staging = kb_dir / ".openkb" / "staging" / "add-x" + src = _staged_raw(staging, "doc.pdf", b"%PDF-1.4 payload") + + publish_staged_tree(staging, kb_dir) + + published = kb_dir / "raw" / "doc.pdf" + assert published.read_bytes() == b"%PDF-1.4 payload" + assert not src.exists() # moved, not copied + + +def test_published_files_keep_umask_mode_not_0600(tmp_path): + """Published artifacts must be created at the process umask mode, not + inherit tempfile.mkstemp's 0600. 0600 would make the KB's published + files owner-only and inconsistent with atomic_write_bytes. + """ + prev_umask = os.umask(0o022) + try: + kb_dir = tmp_path / "kb" + staging = kb_dir / ".openkb" / "staging" / "add-y" + _staged_raw(staging, "doc.pdf", b"data") + + publish_staged_tree(staging, kb_dir) + + from openkb.locks import _default_file_mode + + published = kb_dir / "raw" / "doc.pdf" + assert (published.stat().st_mode & 0o777) == _default_file_mode() + finally: + os.umask(prev_umask) + + +def test_publish_falls_back_to_copy_on_cross_filesystem(tmp_path, monkeypatch): + """When staging and the live KB are on different filesystems, the publish + rename raises EXDEV; publish must fall back to a durable copy and still + land the file with correct content at the destination. + + Only the cross-device publish rename raises EXDEV — the fallback copy's + own temp-file rename is on the destination's filesystem and must succeed, + so the fake raises exactly once then delegates to the real ``os.replace``. + """ + import openkb.mutation as mut + + kb_dir = tmp_path / "kb" + staging = kb_dir / ".openkb" / "staging" / "add-z" + _staged_raw(staging, "doc.pdf", b"cross-fs payload") + + real_replace = os.replace + calls = {"n": 0} + + def fake_replace(src, dst, *args, **kwargs): + calls["n"] += 1 + if calls["n"] == 1: + raise OSError(errno.EXDEV, "cross-device link") + return real_replace(src, dst, *args, **kwargs) + + monkeypatch.setattr(mut.os, "replace", fake_replace) + + publish_staged_tree(staging, kb_dir) + + assert calls["n"] >= 2 # publish rename failed, fallback copy renamed + assert (kb_dir / "raw" / "doc.pdf").read_bytes() == b"cross-fs payload" + + +# --- snapshot_paths: hardlinked dir backups (review #1) -------------------- + +def test_snapshot_hardlinks_marked_directory_trees(tmp_path): + """Directory snapshots the caller marks hardlink-safe must hardlink the + live files into the backup (shared inode) — O(1), no per-file byte copy — + instead of streaming a fresh copy. This is what makes per-file concept / + entity / PageIndex-blob snapshots cheap on a large KB. + """ + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + existing = concepts / "old.md" + existing.write_text("old", encoding="utf-8") + live_inode = existing.stat().st_ino + + snapshot = snapshot_paths( + kb_dir, + [concepts], + operation="add", + details={}, + hardlink_dirs={concepts}, + ) + try: + backup_file = snapshot.backup_dir / "wiki" / "concepts" / "old.md" + assert backup_file.exists() + assert backup_file.stat().st_ino == live_inode # hardlink, not copy + finally: + snapshot.discard_best_effort() + + +def test_hardlinked_dir_rollback_correct_after_atomic_writes(tmp_path): + """With a hardlinked dir backup, an atomic (temp+replace) rewrite of an + existing page and creation of a new page must still roll back correctly: + existing page restored to its pre-snapshot content, new page removed. + + This is the correctness invariant hardlinking relies on — the wiki + writers must go through atomic temp+replace so the hardlink backup keeps + pointing at the old inode while the live file moves to a new one. + """ + from openkb.locks import atomic_write_text + + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + existing = concepts / "old.md" + existing.write_text("old-content", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + # Mirror the (now atomic) compiler writers: rewrite the existing page via + # atomic temp+replace, and add a brand-new page the doc creates. + atomic_write_text(existing, "rewritten-content") + (concepts / "new.md").write_text("new", encoding="utf-8") + + snapshot.rollback() + snapshot.discard_best_effort() + + assert existing.read_text(encoding="utf-8") == "old-content" + assert not (concepts / "new.md").exists() + + +def test_openkb_files_tree_is_hardlinked(tmp_path): + """The PageIndex blob store (.openkb/files) is append-only across docs — + each add creates new {doc_id} blobs and never modifies existing ones — so + it is hardlink-safe and must be snapshotted via hardlinks, not copied. + """ + kb_dir = tmp_path + blobs = kb_dir / ".openkb" / "files" / "col" + blobs.mkdir(parents=True) + existing = blobs / "an-existing-doc.pdf" + existing.write_bytes(b"existing-blob") + live_inode = existing.stat().st_ino + + snapshot = snapshot_paths( + kb_dir, [kb_dir / ".openkb" / "files"], operation="add", + details={}, hardlink_dirs={kb_dir / ".openkb" / "files"}, + ) + try: + backup = ( + snapshot.backup_dir / ".openkb" / "files" / "col" / "an-existing-doc.pdf" + ) + assert backup.stat().st_ino == live_inode + finally: + snapshot.discard_best_effort() + + +def test_concept_writer_is_atomic_so_hardlink_rollback_restores(tmp_path): + """Regression guard for the hardlink invariant: the wiki page writers must + go through atomic temp+replace (new inode). If any regresses to in-place + ``write_text`` (same inode), the hardlinked snapshot backup aliases that + inode and rollback restores the MUTATED content instead of the original. + + Exercises _write_concept's update path — the canonical in-place modify — + through a real hardlinked snapshot + rollback. + """ + from openkb.agent.compiler import _write_concept + + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + existing = concepts / "topic.md" + existing.write_text("---\nsources: []\n---\n\noriginal body", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + # The compiler rewrites the concept page as part of the doc ingest. If this + # write is in-place, the hardlink backup is corrupted and rollback fails. + _write_concept(kb_dir / "wiki", "topic", "rewritten body", "summaries/doc.md", is_update=True) + + snapshot.rollback() + snapshot.discard_best_effort() + + restored = existing.read_text(encoding="utf-8") + assert "original body" in restored + assert "rewritten body" not in restored + + +def test_fix_broken_links_is_atomic_so_hardlink_rollback_restores(tmp_path): + """Regression guard for lint --fix/remove cleanup writers. + + ``fix_broken_links`` rewrites concept/entity pages outside the add path. If + it writes in place, a hardlinked snapshot aliases the live inode and rollback + restores the cleaned content instead of the original page. + """ + from openkb.lint import fix_broken_links + + kb_dir = tmp_path + wiki = kb_dir / "wiki" + concepts = wiki / "concepts" + concepts.mkdir(parents=True) + page = concepts / "topic.md" + page.write_text("# Topic\n\nGhost [[concepts/missing]] link.\n", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + fix_broken_links(wiki, restrict_to=[page]) + + snapshot.rollback() + snapshot.discard_best_effort() + + restored = page.read_text(encoding="utf-8") + assert "[[concepts/missing]]" in restored + assert "Ghost link" not in restored + + +def test_hardlink_falls_back_to_copy_on_eacces(tmp_path, monkeypatch): + """A hardlink blocked by a Windows ACL / OneDrive sync folder surfaces as + EACCES, not EXDEV/EPERM. _hardlink_or_copy must fall back to a real copy so + the snapshot still succeeds — otherwise the POSIX-oriented errno set aborts + the whole add on Windows where a plain copy would have worked. + """ + import openkb.mutation as mut + + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + (concepts / "page.md").write_text("content", encoding="utf-8") + + def link_eacces(src, dst, *args, **kwargs): + raise OSError(errno.EACCES, "simulated Windows ACL hardlink block") + monkeypatch.setattr(mut.os, "link", link_eacces) + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + try: + backup = snapshot.backup_dir / "wiki" / "concepts" / "page.md" + assert backup.read_text(encoding="utf-8") == "content" # copy fallback landed + # It is a real copy, not a hardlink (distinct inode). + assert backup.stat().st_ino != (concepts / "page.md").stat().st_ino + finally: + snapshot.discard_best_effort() + + +# --- recover_pending_journals: bounded retry (pre-existing issue) ---------- + +def test_recovery_gives_up_on_persistently_failing_journal(tmp_path, monkeypatch): + """A journal whose rollback keeps failing (e.g. persistent ENOSPC) must + not be retried forever — otherwise the backup dir + journal leak and every + future lock acquisition re-attempts the same failing rollback. After + MAX_ROLLBACK_ATTEMPTS failed attempts recovery discards it with a loud + message so a human can intervene, bounding the on-disk retention. + """ + import openkb.mutation as mut + + kb_dir = tmp_path + (kb_dir / ".openkb").mkdir() + target = kb_dir / "wiki" / "summaries" / "doc.md" + target.parent.mkdir(parents=True) + target.write_text("before", encoding="utf-8") + # Leave an ACTIVE journal (simulating a crashed add). + snapshot_paths(kb_dir, [target], operation="add", details={}) + target.write_text("after", encoding="utf-8") + + # Make rollback deterministically fail. + def boom(self): + raise OSError("persistent rollback failure") + monkeypatch.setattr(mut.MutationSnapshot, "rollback", boom) + + for _ in range(mut.MAX_ROLLBACK_ATTEMPTS + 1): + recover_pending_journals(kb_dir) + + # Given up + discarded, not retained forever. + journal_dir = kb_dir / ".openkb" / "journal" + assert not any(journal_dir.glob("*.json")) + + +@pytest.mark.parametrize( + "payload", + [ + "", # empty file -> JSONDecodeError + "{not json", # truncated/invalid -> JSONDecodeError + '{"status": "active"}', # valid JSON missing kb_dir/backup_dir -> KeyError + '{"not": "a journal"}', # valid JSON, wrong shape -> KeyError + ], +) +def test_recover_skips_malformed_journal_without_bricking_lock(tmp_path, payload): + """A corrupt/empty/stray .json in journal/ must not crash recovery. + + ``snapshot`` is assigned inside the try (after json.loads / + _snapshot_from_journal), but the except block referenced it unconditionally + — so a single malformed journal raised NameError out of recovery, and thus + out of every exclusive kb_lock acquisition (draining runs on first + acquisition), bricking add/remove/recompile/chat for the whole KB. Recovery + must instead drop the unrecoverable journal, log loudly, and keep going so + the lock still acquires. + """ + from openkb.locks import kb_ingest_lock + + kb_dir = tmp_path + journal_dir = kb_dir / ".openkb" / "journal" + journal_dir.mkdir(parents=True) + (journal_dir / "deadbeef.json").write_text(payload, encoding="utf-8") + + messages = recover_pending_journals(kb_dir) # must not raise NameError + assert any("Unrecoverable mutation journal" in m for m in messages) + assert not any(journal_dir.glob("*.json")) # bad journal removed, not retained + + # The whole point: the KB's mutation lock still acquires afterwards. + with kb_ingest_lock(kb_dir / ".openkb"): + pass + + +# --- O(touched) rollback for hardlinked dirs (pre-existing issue) ---------- + +def test_hardlinked_dir_rollback_leaves_untouched_files_in_place(tmp_path): + """O(touched) rollback: an untouched file in a hardlinked dir shares the + backup's inode, so rollback must leave it in place (same inode) instead + of delete + recopy. A full-copy rollback would give it a new inode — this + is the regression driver for the inode-aware restore. + """ + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + keep = concepts / "keep.md" + keep.write_text("keep", encoding="utf-8") + keep_inode = keep.stat().st_ino + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + # keep.md is not mutated — it stays shared-inode with the backup. + snapshot.rollback() + snapshot.discard_best_effort() + + assert keep.exists() + assert keep.read_text(encoding="utf-8") == "keep" + assert keep.stat().st_ino == keep_inode # NOT recopied + + +def test_hardlinked_dir_rollback_removes_new_and_restores_modified(tmp_path): + from openkb.locks import atomic_write_text + + kb_dir = tmp_path + concepts = kb_dir / "wiki" / "concepts" + concepts.mkdir(parents=True) + (concepts / "old.md").write_text("old", encoding="utf-8") + page = concepts / "page.md" + page.write_text("original", encoding="utf-8") + + snapshot = snapshot_paths( + kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts}, + ) + # Commit created a new page and atomically rewrote an existing one. + (concepts / "new.md").write_text("new", encoding="utf-8") + atomic_write_text(page, "rewritten") + + snapshot.rollback() + snapshot.discard_best_effort() + + assert (concepts / "old.md").read_text(encoding="utf-8") == "old" + assert page.read_text(encoding="utf-8") == "original" + assert not (concepts / "new.md").exists() + + +def test_hardlinked_dir_rollback_prunes_new_nested_blob_dirs(tmp_path): + """PageIndex blob-store scenario: an existing blob is untouched (shared + inode, left in place), while a new doc's blob + its nested images subdir + are removed on rollback — including the now-empty newdoc/ directory. + """ + kb_dir = tmp_path + files = kb_dir / ".openkb" / "files" + (files / "col").mkdir(parents=True) + existing = files / "col" / "existing.pdf" + existing.write_bytes(b"existing") + existing_inode = existing.stat().st_ino + + snapshot = snapshot_paths( + kb_dir, [files], operation="add", details={}, hardlink_dirs={files}, + ) + (files / "col" / "newdoc.pdf").write_bytes(b"new") + (files / "col" / "newdoc" / "images").mkdir(parents=True) + (files / "col" / "newdoc" / "images" / "p1.png").write_bytes(b"png") + + snapshot.rollback() + snapshot.discard_best_effort() + + assert existing.read_bytes() == b"existing" + assert existing.stat().st_ino == existing_inode # untouched, not recopied + assert not (files / "col" / "newdoc.pdf").exists() + assert not (files / "col" / "newdoc").exists() # empty new dir pruned