Source code for energydb._transaction

"""Transaction — atomic batch of scope mutations with preview support.

A :class:`Transaction` owns one borrowed pool connection for its whole
lifetime. Mutations executed through ``txn.get_node(...)`` /
``txn.get_edge(...)`` apply immediately to the open transaction's
connection but are not committed until :meth:`Transaction.commit` is
called explicitly. Exit without commit raises and rolls back.

Each mutator records a :class:`NodeChange` / :class:`EdgeChange` snapshot
pair into the txn's internal log so :meth:`preview` can return a
:class:`TreeDiff` aggregating everything done so far.
"""

from __future__ import annotations

from typing import TYPE_CHECKING
from uuid import UUID

from energydb._persist import register_tree_under
from energydb.diff import EdgeChange, NodeChange, TreeDiff
from energydb.paths import resolve_node_uuid

if TYPE_CHECKING:
    from energydb.client import Client
    from energydb.scope import EdgeScope, NodeScope, Path


[docs] class Transaction: """Context manager wrapping a single pool connection for atomic batches. Mid-transaction reads see the transaction's own uncommitted writes (single physical connection). Time-series I/O — ``scope.write(df, ...)`` / ``scope.read(...)`` — does not participate in the PG transaction and is **rejected with a RuntimeError** on a txn-bound scope: call :meth:`Client.write` / :meth:`Client.read` directly outside the transaction instead. """ def __init__(self, client: Client): self._client = client self._pool_cm = None self._conn = None self._committed = False self._node_changes: list[NodeChange] = [] self._edge_changes: list[EdgeChange] = []
[docs] def __repr__(self) -> str: """Plain-text repr — no I/O. Shows state + pending-change counts.""" if self._committed: state = "committed" elif self._conn is not None: state = "open" else: state = "unopened" return f"Transaction({state}, {len(self._node_changes)} node + {len(self._edge_changes)} edge changes pending)"
def __enter__(self) -> Transaction: self._pool_cm = self._client._pool.connection() self._conn = self._pool_cm.__enter__() return self def __exit__(self, exc_type, exc, tb): assert self._conn is not None and self._pool_cm is not None try: if exc_type is not None: # User code raised — rollback and let the exception propagate. self._conn.rollback() return False if not self._committed: # Clean exit without commit — rollback and raise loudly. self._conn.rollback() raise RuntimeError( "Transaction exited without .commit(); changes rolled back. " "Call txn.commit() before leaving the with-block, or raise to abort." ) return False finally: self._pool_cm.__exit__(exc_type, exc, tb) # ------------------------------------------------------------------ # Scope factories — mirror Client surface, scopes bound to this txn. # ------------------------------------------------------------------ def get_node(self, *names_or_path, uuid: UUID | None = None) -> NodeScope: scope = self._client.get_node(*names_or_path, uuid=uuid) return scope._with_txn(self) def get_edge( self, from_path: Path | list[str] | str | None = None, to_path: Path | list[str] | str | None = None, *, type: str | None = None, uuid: UUID | None = None, ) -> EdgeScope: scope = self._client.get_edge(from_path, to_path, type=type, uuid=uuid) return scope._with_txn(self)
[docs] def register_tree( self, edm_obj, *, under: Path | list[str] | str | None = None, ) -> UUID: """Create a new tree (or subtree) inside this transaction. Mirrors :meth:`Client.register_tree`'s create-only semantics, but reuses the transaction's connection and extends the change log so the inserts show up in :meth:`preview`. """ from energydb.scope import _coerce_path parent_uuid = resolve_node_uuid(self._conn, _coerce_path((), kwarg=under)) if under is not None else None root_uuid, diff = register_tree_under( self._conn, edm_obj, parent_uuid=parent_uuid, dry_run=False, ) self._node_changes.extend(diff.node_changes) self._edge_changes.extend(diff.edge_changes) return root_uuid
# ------------------------------------------------------------------ # Internal recording — called by scope mutators. # ------------------------------------------------------------------ def _record_node(self, old, new) -> None: self._node_changes.append(NodeChange(old=old, new=new)) def _record_edge(self, old, new) -> None: self._edge_changes.append(EdgeChange(old=old, new=new)) # ------------------------------------------------------------------ # Preview + commit # ------------------------------------------------------------------
[docs] def preview(self) -> TreeDiff: """Return a :class:`TreeDiff` aggregating every change so far. Repeated mutations on the same uuid appear as multiple entries — no collapsing is done. The result is read-only; call again to re-snapshot after additional mutations. """ return TreeDiff( node_changes=list(self._node_changes), edge_changes=list(self._edge_changes), )
[docs] def commit(self) -> None: """Commit the transaction. Required before exiting the with-block.""" if self._committed: raise RuntimeError("Transaction already committed.") assert self._conn is not None self._conn.commit() self._committed = True