Source code for energydb.diff

"""TreeDiff — compute and represent the difference produced by a mutation.

Categorizes node/edge changes into insert / update / delete buckets, keyed
by UUID, and renders a tree-shaped preview. Returned by
:meth:`Client.register_tree` (``dry_run=True``), by each scope mutator
when called with ``dry_run=True``, and by :meth:`Transaction.preview`.

With UUID identity, a rename (name changed) and a move (parent_uuid
changed) are *not* delete-then-insert — the row keeps its uuid and just
has its column updated. The diff structure exposes "renamed" and "moved"
booleans on each update so the user-facing print can label changes
accurately.
"""

from __future__ import annotations

import sys
from dataclasses import dataclass, field
from typing import IO, Any
from uuid import UUID

# ---------------------------------------------------------------------------
# Snapshots — the canonical content of one row at one point in time.
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class NodeSnapshot: """One node row's content.""" uuid: UUID node_type: str name: str parent_uuid: UUID | None data: dict[str, Any]
[docs] @dataclass(frozen=True) class EdgeSnapshot: """One edge row's content.""" uuid: UUID edge_type: str name: str | None from_node_uuid: UUID to_node_uuid: UUID data: dict[str, Any]
# --------------------------------------------------------------------------- # Per-row change records # --------------------------------------------------------------------------- @dataclass(frozen=True) class _BaseChange[SnapshotT]: """Shared structure for :class:`NodeChange` and :class:`EdgeChange`. ``old`` is the DB state before the change; ``new`` is the target state after. Inserts have ``old=None``; deletes have ``new=None``. At least one must be set. """ old: SnapshotT | None new: SnapshotT | None def __post_init__(self): if self.old is None and self.new is None: raise ValueError(f"{type(self).__name__} must have at least one of old/new set.") def _present(self) -> SnapshotT: """Return whichever of ``new`` / ``old`` is non-null. For inserts and updates that's ``new``; for deletes it's ``old``.""" return self.new if self.new is not None else self.old # type: ignore[return-value] @property def uuid(self) -> UUID: return self._present().uuid # type: ignore[attr-defined] @property def kind(self) -> str: if self.old is None: return "insert" if self.new is None: return "delete" return "update" @property def data_changed(self) -> bool: return self.kind == "update" and self.old.data != self.new.data # type: ignore[union-attr]
[docs] @dataclass(frozen=True) class NodeChange(_BaseChange[NodeSnapshot]): """A single node-level diff entry.""" @property def display_name(self) -> str: return self._present().name @property def display_type(self) -> str: return self._present().node_type @property def renamed(self) -> bool: return self.kind == "update" and self.old.name != self.new.name # type: ignore[union-attr] @property def moved(self) -> bool: return self.kind == "update" and self.old.parent_uuid != self.new.parent_uuid # type: ignore[union-attr]
[docs] @dataclass(frozen=True) class EdgeChange(_BaseChange[EdgeSnapshot]): """A single edge-level diff entry.""" @property def display_name(self) -> str: snap = self._present() return snap.name or snap.edge_type @property def display_type(self) -> str: return self._present().edge_type @property def endpoints_changed(self) -> bool: return self.kind == "update" and ( self.old.from_node_uuid != self.new.from_node_uuid # type: ignore[union-attr] or self.old.to_node_uuid != self.new.to_node_uuid # type: ignore[union-attr] )
# --------------------------------------------------------------------------- # TreeDiff — the structured result of comparing target vs current state. # ---------------------------------------------------------------------------
[docs] @dataclass class TreeDiff: """Structured diff between a target EDM tree and the persisted subtree. Two flat lists of :class:`NodeChange` / :class:`EdgeChange` records. Convenience properties (``inserts``, ``deletes``, ``renames``, ``moves``, ``updates``) bin the changes by kind for callers that want to render or inspect specific subsets. """ node_changes: list[NodeChange] = field(default_factory=list) edge_changes: list[EdgeChange] = field(default_factory=list) # ------------------------------------------------------------------ # Convenience views (computed on the fly — diff is small) # ------------------------------------------------------------------ @property def has_changes(self) -> bool: """``True`` if any change exists.""" return bool(self.node_changes or self.edge_changes) @property def node_inserts(self) -> list[NodeChange]: return [c for c in self.node_changes if c.kind == "insert"] @property def node_updates(self) -> list[NodeChange]: """All node updates (renames, moves, and/or data edits).""" return [c for c in self.node_changes if c.kind == "update"] @property def node_renames(self) -> list[NodeChange]: return [c for c in self.node_changes if c.kind == "update" and c.renamed] @property def node_moves(self) -> list[NodeChange]: return [c for c in self.node_changes if c.kind == "update" and c.moved] @property def node_data_edits(self) -> list[NodeChange]: """Node updates that changed only `data` (no rename / no move).""" return [c for c in self.node_changes if c.kind == "update" and c.data_changed and not c.renamed and not c.moved] @property def node_deletes(self) -> list[NodeChange]: return [c for c in self.node_changes if c.kind == "delete"] @property def edge_inserts(self) -> list[EdgeChange]: return [c for c in self.edge_changes if c.kind == "insert"] @property def edge_updates(self) -> list[EdgeChange]: return [c for c in self.edge_changes if c.kind == "update"] @property def edge_deletes(self) -> list[EdgeChange]: return [c for c in self.edge_changes if c.kind == "delete"] # ------------------------------------------------------------------ # Pretty-print (tree-shaped) # ------------------------------------------------------------------
[docs] def render(self, file: IO[str] | None = None) -> None: """Render the diff as a tree-shaped textual preview. Output format:: Portfolio P ├── ~ Site OldName → NewName [rename] │ ├── + WindTurbine T03 (capacity=4.0) [insert] │ ├── WindTurbine T01 [unchanged] │ ├── ~ WindTurbine T02 [update: capacity 3.5 → 4.0] │ └── - Battery B1 [delete] └── → Site Other [moved from <old_parent>] edges: + Line 'Cable-1' BusA → BusB [insert] """ out = file if file is not None else sys.stdout # Build a "what should the tree look like AFTER + deletes inline" # view, keyed by uuid. For each uuid we record: # marker — ' ', '+', '~', '-' # name — display name # type — display type # note — bracketed annotation (rename, move, insert, delete, ...) # parent — uuid of parent in the rendered tree view: dict[UUID, _RenderRow] = {} children_by_parent: dict[UUID | None, list[UUID]] = {} for change in self.node_changes: row = _render_row_for_node(change) view[change.uuid] = row children_by_parent.setdefault(row.parent_uuid, []).append(change.uuid) # A change is a render trunk if its parent isn't itself a change — # either parent_uuid is None, or the parent is an unchanged ancestor # (e.g. a renamed Site whose Portfolio parent was not modified). roots = [uid for uid, row in view.items() if row.parent_uuid not in view] if not view and not self.edge_changes: out.write("(no changes)\n") else: for root in roots: _render_subtree(out, view, children_by_parent, root, prefix="", is_last=True, is_root=True) # Edges: flat at the bottom. if self.edge_changes: out.write("edges:\n") for change in self.edge_changes: marker = _MARKER_BY_KIND[change.kind] note = _edge_change_note(change) ftxt, ttxt = _edge_endpoint_str(change) out.write(f" {marker} {change.display_type} {change.display_name!r} {ftxt}{ttxt}{note}\n")
# --------------------------------------------------------------------------- # Render helpers (private) # --------------------------------------------------------------------------- @dataclass class _RenderRow: marker: str label: str note: str parent_uuid: UUID | None _MARKER_BY_KIND = {"insert": "+", "update": "~", "delete": "-"} def _render_row_for_node(change: NodeChange) -> _RenderRow: if change.kind == "insert": snap = change.new assert snap is not None label = f"{snap.node_type} {snap.name!r}" return _RenderRow(marker="+", label=label, note=" [insert]", parent_uuid=snap.parent_uuid) if change.kind == "delete": snap = change.old assert snap is not None label = f"{snap.node_type} {snap.name!r}" return _RenderRow( marker="-", label=label, note=" [delete]", parent_uuid=snap.parent_uuid, ) # update assert change.new is not None and change.old is not None notes: list[str] = [] if change.renamed: notes.append(f"rename {change.old.name!r}{change.new.name!r}") if change.moved: notes.append(f"moved (parent {change.old.parent_uuid}{change.new.parent_uuid})") if change.data_changed: notes.append(_data_diff_summary(change.old.data, change.new.data)) note = " [" + "; ".join(notes) + "]" if notes else "" label = f"{change.new.node_type} {change.new.name!r}" return _RenderRow(marker="~", label=label, note=note, parent_uuid=change.new.parent_uuid) def _data_diff_summary(old: dict, new: dict) -> str: """Human-readable summary of changed keys in two JSONB blobs.""" keys = set(old) | set(new) parts: list[str] = [] for k in sorted(keys): ov = old.get(k, "<unset>") nv = new.get(k, "<unset>") if ov != nv: parts.append(f"{k}: {ov!r}{nv!r}") if not parts: return "data changed" return "; ".join(parts) def _render_subtree( out: IO[str], view: dict[UUID, _RenderRow], children_by_parent: dict[UUID | None, list[UUID]], node_uuid: UUID, *, prefix: str, is_last: bool, is_root: bool, ) -> None: row = view[node_uuid] connector = "" if is_root else ("└── " if is_last else "├── ") out.write(f"{prefix}{connector}{row.marker} {row.label}{row.note}\n") children = children_by_parent.get(node_uuid, []) new_prefix = prefix + ("" if is_root else (" " if is_last else "│ ")) for i, child in enumerate(children): _render_subtree( out, view, children_by_parent, child, prefix=new_prefix, is_last=(i == len(children) - 1), is_root=False, ) def _edge_endpoint_str(change: EdgeChange) -> tuple[str, str]: if change.kind == "delete": snap = change.old assert snap is not None return str(snap.from_node_uuid), str(snap.to_node_uuid) snap = change.new assert snap is not None return str(snap.from_node_uuid), str(snap.to_node_uuid) def _edge_change_note(change: EdgeChange) -> str: if change.kind == "insert": return " [insert]" if change.kind == "delete": return " [delete]" notes: list[str] = [] if change.endpoints_changed: notes.append("endpoints changed") if change.data_changed: notes.append(_data_diff_summary(change.old.data, change.new.data)) # type: ignore[union-attr] return " [" + "; ".join(notes) + "]" if notes else ""