Source code for credmark.cmf.types.ledger_query

# pylint:disable=no-member, line-too-long

import contextlib
from enum import Enum
from typing import Optional, Tuple, Union

import credmark.cmf.model

from .ledger import ColumnField, JoinAllTypes, LedgerAggregate, LedgerJoin, LedgerModelOutput
from .ledger_errors import InvalidQueryException


# pylint: disable=locally-disabled,invalid-name, too-many-arguments
[docs]class LedgerQueryBase(contextlib.AbstractContextManager): # Duplicated from credmark/cmf/types/ledger.py for easy access from context class JoinType(str, Enum): INNER = "inner" LEFT_OUTER = "leftOuter" RIGHT_OUTER = "rightOuter" FULL_OUTER = "fullOuter" CROSS = "cross" NATURAL = "natural" def __enter__(self): return self def __exit__(self, *_exc): return None def __init__(self): pass @classmethod def field(cls, value): return ColumnField(value) # pylint: disable=too-many-locals def _gen_model_input( self, model_slug: str, originator: str, columns: Optional[Union[list[str], list[ColumnField]]] = None, joins: Optional[list[JoinAllTypes]] = None, where: Optional[str] = None, group_by: Optional[Union[list[str], list[ColumnField]]] = None, order_by: Optional[Union[str, ColumnField]] = None, limit: Optional[int] = None, offset: Optional[int] = None, aggregates: Optional[list[Tuple[str, str]]] = None, having: Optional[str] = None, analytics_mode: Optional[bool] = None, omit_group_by_columns: Optional[bool] = None, ) -> dict: aggregates_names = [agg[0] for agg in aggregates] if aggregates else [] if group_by is not None and not omit_group_by_columns: if columns == []: columns = None if columns is not None: raise InvalidQueryException( model_slug, ( f"{model_slug} call with group_by will need the columns to be " "empty [] or None." ), ) columns = [c for c in group_by if c in self.columns and c not in aggregates_names] # type: ignore if not columns and not aggregates: raise InvalidQueryException( model_slug, f"{model_slug} call must have at least one column or aggregate." ) if columns is None: columns = [] elif not isinstance(columns, list): raise InvalidQueryException(model_slug, f"{columns=} needs to be a list of string.") else: self._validate_columns( # type: ignore model_slug, columns ) if where is None and limit is None and not aggregates: raise InvalidQueryException( model_slug, f"{model_slug} call must have a where or limit value for non-aggregate queries.", ) if limit is not None and (not isinstance(limit, int) or limit < 0): raise InvalidQueryException(model_slug, f"{limit=} needs to be a positive integer.") if offset is not None and (not isinstance(offset, int) or offset < 0): raise InvalidQueryException(model_slug, f"{offset=} needs to be a positive integer.") # Fix for contract ledger # Customized columns needs to be converted to string to avoid losing precision. # https://github.com/credmark/credmark-model-runner-api/issues/50 cols_customized = [(f"{c}::TEXT", c) for c in columns if c in self.bigint_cols] # type: ignore columns = [c for c in columns if c not in self.bigint_cols] # type: ignore aggregates_list = ([] if aggregates is None else aggregates) + cols_customized aggregates_value = ( None if len(aggregates_list) == 0 else [LedgerAggregate(expression=agg[0], asName=agg[1]) for agg in aggregates_list] ) joins_value = ( [ LedgerJoin( tableKey=table.table_key, alias=table.alias, on=on, type=(type_list[0] if type_list else None), ) # type: ignore for (*type_list, table, on) in joins ] if joins is not None else None ) return { "alias": getattr(self, "alias", None), "columns": columns, "joins": joins_value, "aggregates": aggregates_value, "where": where, "groupBy": ",".join(group_by) if group_by is not None else None, "having": having, "orderBy": order_by, "limit": str(limit) if limit is not None else None, "offset": str(offset) if offset is not None else None, "originator": originator, "analyticsMode": analytics_mode, }
# pylint: disable=too-many-arguments, protected-access
[docs]class LedgerQuery(LedgerQueryBase): def __init__(self, **kwargs): super().__init__() self._cwgo_query = kwargs["cwgo_query_table"]
[docs] def select( self, columns: Optional[Union[list[str], list[ColumnField]]] = None, joins: Optional[list[JoinAllTypes]] = None, where: Optional[str] = None, group_by: Optional[Union[list[str], list[ColumnField]]] = None, order_by: Optional[Union[str, ColumnField]] = None, limit: Optional[int] = None, offset: Optional[int] = None, aggregates: Optional[list[tuple[str, str]]] = None, having: Optional[str] = None, bigint_cols: Optional[list[str]] = None, analytics_mode: Optional[bool] = None, omit_group_by_columns: Optional[bool] = None, ) -> LedgerModelOutput: """ Query data from the table. """ context = credmark.cmf.model.ModelContext.current_context() model_input = self._gen_model_input( model_slug=self._cwgo_query, originator=context.__dict__["slug"], columns=columns, joins=joins, where=where, group_by=group_by, order_by=order_by, limit=limit, offset=offset, aggregates=aggregates, having=having, analytics_mode=analytics_mode, omit_group_by_columns=omit_group_by_columns, ) ledger_out = context.run_model( slug=self._cwgo_query, input=model_input, return_type=LedgerModelOutput ) ledger_out.set_bigint_cols( self.bigint_cols # type: ignore + ([] if bigint_cols is None else bigint_cols) ) return ledger_out