"""Client — owns the psycopg pool and constructs TimeDBClient.
UUID identity model:
* A node is uniquely identified by its ``uuid`` (UUID7, set on the EDM
Element at construction).
* An edge is uniquely identified by its ``uuid`` (also UUID7).
* Path-based addressing (``client.get_node("Europe", "Sweden")``) is
preserved as a user-friendly fluent CLI; resolution walks
``(parent_uuid, name)`` via one indexed recursive CTE.
* Edge endpoints in storage are ``from_node_uuid`` / ``to_node_uuid`` —
no path resolution at write or read time.
API split:
* ``register_tree`` — structure (nodes, edges, series declarations). Create-only;
raises if any UUID in the payload already exists, or on inline timeseries data.
* ``write`` / ``read`` — bulk timeseries data via manifest DataFrames.
* ``get_node`` / ``get_edge`` — fluent scope entry points. Reads like
English: ``client.get_node("p").where(type="WindTurbine").read()``.
Terminate with ``.get()`` to fetch the EDM object eagerly.
"""
from __future__ import annotations
import os
from datetime import datetime
from typing import TYPE_CHECKING, Any
from uuid import UUID
if TYPE_CHECKING:
from energydb._transaction import Transaction
import pandas as pd
import polars as pl
from psycopg_pool import ConnectionPool
from sqlalchemy import create_engine
from timedatamodel import DataType, TimeSeries, TimeSeriesType
from timedb import TimeDBClient, profiling
from energydb import runs as runs_mod
from energydb._frames import Backend, Output, to_backend, to_polars
from energydb._io import read_manifest, read_relative_manifest, write_manifest
from energydb._join import EdgeSeriesKey, SeriesKey
from energydb._persist import create_edge, register_tree_under
from energydb.diff import TreeDiff
from energydb.models import Base
from energydb.paths import (
Path,
_like_escape,
build_filter_conditions,
resolve_node_uuid,
resolve_subtree_uuids,
)
from energydb.scope import EdgeScope, NodeScope, _coerce_path
from energydb.serialization import reconstruct_edge, reconstruct_node
_SEARCH_PATH = "SET search_path TO energydb, public"
[docs]
class Client:
"""Client for energy assets, hierarchy, and time series.
Owns the psycopg connection pool (used for all PG ops) and constructs
a :class:`TimeDBClient` for ClickHouse I/O.
"""
[docs]
def __init__(
self,
*,
pg_conninfo: str | None = None,
ch_url: str | None = None,
):
"""Construct a client."""
conninfo = pg_conninfo or os.environ.get("TIMEDB_PG_DSN") or os.environ.get("DATABASE_URL")
if not conninfo:
raise ValueError("PostgreSQL connection not configured. Pass pg_conninfo or set TIMEDB_PG_DSN.")
if "://" not in conninfo:
raise ValueError(
"pg_conninfo must be a URI (e.g. postgresql://user:pass@host/db); "
"key=value DSNs are not supported here because the schema-create path "
"needs a SQLAlchemy URL."
)
self._dsn = conninfo
def _configure(conn):
conn.execute(_SEARCH_PATH)
# ``prepare_threshold=1`` makes psycopg cache a server-side
# prepared statement after the first execution of each SQL text.
# Saves ~4-8ms on the repeated 6000-uuid resolve query at scale=200
# (PG parse+plan stage skipped on subsequent calls).
conn.prepare_threshold = 1
conn.commit()
self._pool = ConnectionPool(
conninfo=conninfo,
min_size=1,
max_size=10,
open=True,
configure=_configure,
)
self.td = TimeDBClient(ch_url=ch_url)
[docs]
def __repr__(self) -> str:
"""Repr with credentials stripped from the DSN.
Shows scheme + host(:port) + db; the userinfo segment (user:pass) is
replaced with ``***``. Pure formatting — no I/O.
"""
dsn = self._dsn
if "://" in dsn:
scheme, rest = dsn.split("://", 1)
if "@" in rest:
_userinfo, hostpart = rest.split("@", 1)
safe = f"{scheme}://***@{hostpart}"
else:
safe = dsn
else:
safe = dsn
return f"Client(pg={safe!r})"
# ------------------------------------------------------------------
# Schema management
# ------------------------------------------------------------------
[docs]
def create(self) -> None:
"""Create PG schema + CH tables.
Schema is defined entirely by SQLAlchemy models in :mod:`energydb.models`
— including the ``energydb`` schema, the partial unique index on root
names, and the immutability trigger on ``series``. No raw SQL.
"""
engine = create_engine(self._sqlalchemy_url())
try:
Base.metadata.create_all(engine, checkfirst=True)
finally:
engine.dispose()
self.td.create()
[docs]
def delete(self) -> None:
"""Drop PG schema (CASCADE) and CH tables."""
with self._pool.connection() as conn:
conn.execute("DROP SCHEMA IF EXISTS energydb CASCADE")
conn.commit()
self.td.delete()
def close(self) -> None:
self._pool.close()
# ------------------------------------------------------------------
# Fluent entry — scopes for navigation & single-element ops
# ------------------------------------------------------------------
[docs]
def get_node(self, *names_or_path, uuid: UUID | None = None) -> NodeScope:
"""Return a :class:`NodeScope` for a node or subtree.
``client.get_node("P/Site/T01")`` — canonical ``/``-joined string
``client.get_node("P", "Site", "T01")`` — variadic — equivalent
``client.get_node(("P", "Site", "T01"))`` — tuple/list path
``client.get_node(uuid=...)`` — absolute by uuid
``/`` is reserved as the path separator; names containing ``/``
are rejected at registration time. Empty segments (leading,
trailing, or doubled ``/``) raise ``ValueError``.
Terminate the chain with ``.get()`` to fetch the EDM object,
``.read()`` for time-series data, ``.where(...)`` to filter a
subtree, etc.
"""
if uuid is not None:
if names_or_path:
raise ValueError("Pass either uuid= or names, not both.")
return NodeScope(self, node_uuid=uuid)
if not names_or_path:
raise ValueError("Provide a path or uuid=.")
return NodeScope(self, path=_coerce_path(names_or_path))
[docs]
def get_edge(
self,
from_path: Path | list[str] | str | None = None,
to_path: Path | list[str] | str | None = None,
*,
type: str | None = None,
uuid: UUID | None = None,
) -> EdgeScope:
"""Return an :class:`EdgeScope` by uuid or by ``(from_path, to_path, type)``.
``from_path`` / ``to_path`` accept the canonical ``/``-joined string
form (``"P/Site/T01"``) or a tuple/list of segments. Terminate with
``.get()`` to fetch the EDM edge eagerly.
"""
if uuid is not None:
if from_path is not None or to_path is not None or type is not None:
raise ValueError("Pass uuid= alone, or (from_path, to_path, type=) — not both.")
return EdgeScope(self, edge_uuid=uuid)
if from_path is None or to_path is None or type is None:
raise ValueError("Provide uuid= or (from_path, to_path, type=).")
return EdgeScope(
self,
from_path=_coerce_path((), kwarg=from_path),
to_path=_coerce_path((), kwarg=to_path),
edge_type=type,
)
# ------------------------------------------------------------------
# Transactions
# ------------------------------------------------------------------
[docs]
def transaction(self) -> Transaction:
"""Open an atomic batch of scope mutations.
Returns a :class:`Transaction` context manager. Mutations executed
through ``txn.get_node(...)`` / ``txn.get_edge(...)`` /
``txn.register_tree(...)`` apply immediately to the open
transaction's connection but are not committed until
:meth:`Transaction.commit` is called explicitly. Exit without
commit raises and rolls back.
Time-series I/O (``scope.write(df, ...)`` / ``scope.read(...)``)
inside a transaction does **not** participate in atomicity — it
executes immediately against the pool / ClickHouse.
"""
from energydb._transaction import Transaction
return Transaction(self)
# ------------------------------------------------------------------
# Structure — register_tree, edges, queries
# ------------------------------------------------------------------
[docs]
def register_tree(
self,
edm_obj,
*,
under: Path | list[str] | str | None = None,
dry_run: bool = False,
) -> UUID | TreeDiff:
"""Persist an EDM tree's structure: nodes, edges, series declarations.
Create-only. Raises :class:`ValueError` if any UUID in the payload
already exists in the DB; modify existing rows via scope mutators
(:meth:`NodeScope.rename`, ``.update``, ``.delete``, ``.move_to``)
or batch them with :meth:`transaction`.
``dry_run=True`` returns the computed :class:`TreeDiff` without
committing — the transaction is rolled back so no DB state changes.
Inline ``TimeSeries.df`` data is rejected: write data separately
via :meth:`write` against a manifest. ``under`` selects the parent
under which the tree's root is grafted; ``None`` means create at
root. Raises if ``under`` points at a non-existent parent.
Series declarations attached to nodes/edges on the tree **are**
registered alongside their owners but are not represented in the
returned :class:`TreeDiff`. Adding a series to a node that
already exists in the DB is not supported here (the create-only
pre-check rejects the whole payload); use
:meth:`NodeScope.register_series` /
:meth:`EdgeScope.register_series` instead.
Returns the ``uuid`` of the tree's root, except when
``dry_run=True`` (which returns the :class:`TreeDiff`).
"""
with self._pool.connection() as conn:
parent_uuid = resolve_node_uuid(conn, _coerce_path((), kwarg=under)) if under is not None else None
root_uuid, diff = register_tree_under(
conn,
edm_obj,
parent_uuid=parent_uuid,
dry_run=dry_run,
)
if dry_run:
conn.rollback()
else:
conn.commit()
if dry_run:
return diff
return root_uuid
[docs]
def query_nodes(
self,
*,
type: str | None = None,
within: Path | list[str] | str | UUID | None = None,
**property_filters,
) -> list:
"""Return matching nodes as a flat list of EDM objects.
``within`` accepts a ``/``-joined string (``"P/Site"``), a path
tuple/list of segments, or a :class:`UUID`.
"""
where_filters: dict[str, Any] = dict(property_filters)
if type is not None:
where_filters["node_type"] = type
with self._pool.connection() as conn:
filter_conds, filter_params = build_filter_conditions(where_filters, type_col="node_type")
conditions: list[str] = list(filter_conds)
params: list[Any] = list(filter_params)
if within is not None:
within_uuid = (
within if isinstance(within, UUID) else resolve_node_uuid(conn, _coerce_path((), kwarg=within))
)
conditions.append("uuid = ANY(%s)")
params.append(resolve_subtree_uuids(conn, within_uuid))
where = " AND ".join(conditions) if conditions else "TRUE"
rows = conn.execute(
f"SELECT uuid, node_type, name, data FROM energydb.node WHERE {where} ORDER BY name",
params,
).fetchall()
return [reconstruct_node({"uuid": r[0], "node_type": r[1], "name": r[2], "data": r[3]}) for r in rows]
[docs]
def create_edge(self, edm_obj) -> UUID:
"""Upsert an edge between two existing nodes. Idempotent.
The edge's :class:`Reference` endpoints (``from_element`` /
``to_element``) carry the endpoint UUIDs directly — no path
resolution. The endpoints must already exist as nodes; the FK
constraint will fail otherwise.
For edges that are part of a tree, prefer :meth:`register_tree` —
it walks the structure and validates endpoints against the tree's
index in one pass.
"""
with self._pool.connection() as conn:
edge_uuid = create_edge(conn, edm_obj, tree_root=None)
conn.commit()
return edge_uuid
[docs]
def query_edges(
self,
*,
type: str | None = None,
within: Path | list[str] | str | UUID | None = None,
**property_filters,
) -> list:
"""Return matching edges as a flat list of EDM objects.
``within`` (``/``-joined string ``"P/Site"``, path tuple/list of
segments, or a :class:`UUID`) restricts to edges where either
endpoint is in that subtree.
"""
where_filters: dict[str, Any] = dict(property_filters)
if type is not None:
where_filters["edge_type"] = type
with self._pool.connection() as conn:
filter_conds, filter_params = build_filter_conditions(where_filters, type_col="edge_type")
conditions: list[str] = list(filter_conds)
params: list[Any] = list(filter_params)
if within is not None:
within_uuid = (
within if isinstance(within, UUID) else resolve_node_uuid(conn, _coerce_path((), kwarg=within))
)
subtree = resolve_subtree_uuids(conn, within_uuid)
conditions.append("(from_node_uuid = ANY(%s) OR to_node_uuid = ANY(%s))")
params.append(subtree)
params.append(subtree)
where = " AND ".join(conditions) if conditions else "TRUE"
rows = conn.execute(
f"SELECT uuid, edge_type, name, data, from_node_uuid, to_node_uuid "
f"FROM energydb.edge WHERE {where} ORDER BY name NULLS LAST",
params,
).fetchall()
if not rows:
return []
return [
reconstruct_edge(
{
"uuid": r[0],
"edge_type": r[1],
"name": r[2],
"data": r[3],
"from_node_uuid": r[4],
"to_node_uuid": r[5],
}
)
for r in rows
]
# ------------------------------------------------------------------
# Tree reconstruction
# ------------------------------------------------------------------
[docs]
def get_tree(
self,
*names_or_path,
uuid: UUID | None = None,
include_series: bool = False,
):
"""Reconstruct the full EDM subtree rooted at the given node.
With ``include_series=True``, every reconstructed node has its
registered series attached as metadata-only :class:`TimeSeries`
entries (``df=None``) on ``timeseries``.
**Edges are intentionally not attached to the returned tree.**
The result is a node-only subtree walked via ``parent_uuid``.
Edges (and their series) live alongside nodes in the schema but
outside the tree shape — query them separately with
:meth:`get_edge` or :meth:`query_edges`.
"""
with self._pool.connection() as conn:
if uuid is not None:
root_uuid = uuid
elif names_or_path:
root_uuid = resolve_node_uuid(conn, _coerce_path(names_or_path))
else:
raise ValueError("Provide a path or uuid=.")
# Two-step: fetch the root's path, then LIKE with the escaped
# prefix as a bind param so PG picks Index Scan on
# ``ix_node_path_prefix``. A column-source LIKE would Seq Scan.
root_path_row = conn.execute(
"SELECT path FROM energydb.node WHERE uuid = %s",
(root_uuid,),
).fetchone()
if root_path_row is None:
raise ValueError(f"Node not found: uuid={root_uuid}")
root_path = root_path_row[0]
rows = conn.execute(
r"""
SELECT uuid, node_type, name, data, parent_uuid
FROM energydb.node
WHERE path = %s OR path LIKE %s || '/%%' ESCAPE '\'
""",
(root_path, _like_escape(root_path)),
).fetchall()
nodes: dict[UUID, Any] = {}
parent_map: dict[UUID, UUID | None] = {}
for r in rows:
node_uuid = r[0]
parent_map[node_uuid] = r[4]
nodes[node_uuid] = reconstruct_node({"uuid": r[0], "node_type": r[1], "name": r[2], "data": r[3]})
if include_series:
series_rows = conn.execute(
"SELECT node_uuid, data_type, name, canonical_unit, timeseries_type, description "
"FROM energydb.series WHERE node_uuid = ANY(%s)",
(list(nodes.keys()),),
).fetchall()
for nid, dt, sname, unit, tstype, desc in series_rows:
node_obj = nodes.get(nid)
if node_obj is None:
continue
series = TimeSeries(
df=None,
name=sname,
unit=unit or "dimensionless",
data_type=DataType(dt.upper()) if dt else None,
timeseries_type=TimeSeriesType(tstype) if tstype else TimeSeriesType.FLAT,
description=desc,
)
if node_obj.timeseries is None:
node_obj.timeseries = []
node_obj.timeseries.append(series)
# Attach children to their parents (flat pass — uuid-based, order-agnostic).
for node_uuid, parent_uuid in parent_map.items():
if parent_uuid is not None and parent_uuid in nodes:
nodes[parent_uuid].add_child(nodes[node_uuid])
return nodes[root_uuid]
# ------------------------------------------------------------------
# Bulk timeseries I/O — manifest DataFrames only
# ------------------------------------------------------------------
[docs]
def write(
self,
df: pl.DataFrame | pd.DataFrame,
*,
knowledge_time: datetime | None = None,
run_id: int | None = None,
workflow_id: str | None = None,
model_name: str | None = None,
run_start_time: datetime | None = None,
run_finish_time: datetime | None = None,
run_params: dict | None = None,
) -> int:
"""Bulk-write timeseries data via a routing manifest.
``df`` is a pandas or polars DataFrame carrying one routing column
(``node_uuid``, ``edge_uuid``, or ``path`` as ``Utf8`` joined with
``/``, e.g. ``"my-portfolio/Offshore-1/T01"``), plus ``data_type``,
``name``, and the timedb data columns (``valid_time``, ``value``,
optional ``knowledge_time``). Optional ``unit`` column triggers
per-row unit conversion to each series's canonical unit.
Series must already be registered (typically via
:meth:`register_tree`). Returns the ``run_id`` used.
"""
with profiling._phase(profiling.PHASE_EDB_OUTPUT_CONVERT):
df_pl = to_polars(df)
return write_manifest(
self._pool,
self.td,
df_pl,
knowledge_time=knowledge_time,
run_id=run_id,
workflow_id=workflow_id,
model_name=model_name,
run_start_time=run_start_time,
run_finish_time=run_finish_time,
run_params=run_params,
)
[docs]
def read(
self,
df: pl.DataFrame | pd.DataFrame,
*,
unit: str | None = None,
start_valid: datetime | None = None,
end_valid: datetime | None = None,
start_known: datetime | None = None,
end_known: datetime | None = None,
include_updates: bool = False,
include_knowledge_time: bool = False,
output: Output = "frame",
backend: Backend = "polars",
) -> (
pl.DataFrame
| pd.DataFrame
| dict[SeriesKey, pl.DataFrame]
| dict[SeriesKey, pd.DataFrame]
| dict[EdgeSeriesKey, pl.DataFrame]
| dict[EdgeSeriesKey, pd.DataFrame]
):
"""Bulk read via manifest. Detects edge vs node routing automatically.
Accepts pandas or polars on input. Output shape:
* ``output="frame"`` (default): a single DataFrame with columns
``(path, data_type, name, valid_time, value, …)`` for node-routed
reads, or ``(from_path, to_path, edge_type, data_type, name,
valid_time, value, …)`` for edge-routed reads. ``path`` /
``from_path`` / ``to_path`` are ``Utf8`` joined with ``/``.
Optional columns appear when ``include_knowledge_time`` /
``include_updates`` are set.
* ``output="by_path"``: a ``dict`` keyed by
:class:`SeriesKey` (node-routed: ``path``, ``data_type``, ``name``)
or :class:`EdgeSeriesKey` (edge-routed: ``from_path``, ``to_path``,
``edge_type``, ``data_type``, ``name``) with per-series DataFrames
carrying only the data columns (``valid_time``, ``value``, plus
opt-in time/audit columns). Keys are NamedTuples — positional
access (``result[(path, dt, name)]``) and attribute access
(``key.path``) both work. Each sub-frame is sorted by
``valid_time`` ascending; secondary sort keys are
``knowledge_time`` and/or ``change_time`` when requested.
``backend="polars"`` (default) returns polars frames;
``backend="pandas"`` converts at the boundary. Internal
identifiers (``series_id``, ``node_uuid``, ``edge_uuid``) are
never exposed on the result.
"""
with profiling._phase(profiling.PHASE_EDB_OUTPUT_CONVERT):
manifest = to_polars(df)
result = read_manifest(
self._pool,
self.td,
manifest,
unit=unit,
start_valid=start_valid,
end_valid=end_valid,
start_known=start_known,
end_known=end_known,
include_updates=include_updates,
include_knowledge_time=include_knowledge_time,
output=output,
)
with profiling._phase(profiling.PHASE_EDB_OUTPUT_CONVERT):
return to_backend(result, backend)
[docs]
def read_relative(
self,
df: pl.DataFrame | pd.DataFrame,
*,
unit: str | None = None,
output: Output = "frame",
backend: Backend = "polars",
**td_kwargs,
) -> (
pl.DataFrame
| pd.DataFrame
| dict[SeriesKey, pl.DataFrame]
| dict[SeriesKey, pd.DataFrame]
| dict[EdgeSeriesKey, pl.DataFrame]
| dict[EdgeSeriesKey, pd.DataFrame]
):
"""Bulk relative read via manifest.
See :meth:`read` for the ``output`` / ``backend`` contract.
``**td_kwargs`` are forwarded to :meth:`timedb.TimeDBClient.read_relative`;
see that signature for accepted arguments (window selectors, etc.).
"""
with profiling._phase(profiling.PHASE_EDB_OUTPUT_CONVERT):
manifest = to_polars(df)
result = read_relative_manifest(
self._pool,
self.td,
manifest,
unit=unit,
output=output,
**td_kwargs,
)
with profiling._phase(profiling.PHASE_EDB_OUTPUT_CONVERT):
return to_backend(result, backend)
# ------------------------------------------------------------------
# Runs
# ------------------------------------------------------------------
[docs]
def read_runs_for_series(self, *, series_id: int) -> list[dict[str, Any]]:
"""Return runs that wrote data for a given series_id, latest first."""
run_ids = self.td.read_run_series(series_id=series_id)
if not run_ids:
return []
with self._pool.connection() as conn:
return runs_mod.get_runs(conn, run_ids)
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _sqlalchemy_url(self) -> str:
return f"postgresql+psycopg://{self._dsn.split('://', 1)[-1]}"