Source code for credmark.cmf.types.ledger_query

# pylint:disable=no-member, line-too-long

import contextlib
from enum import Enum
from typing import Optional, Tuple, Union

import credmark.cmf.model

from .ledger import ColumnField, JoinAllTypes, LedgerAggregate, LedgerJoin, LedgerModelOutput
from .ledger_errors import InvalidQueryException


# pylint: disable=locally-disabled,invalid-name, too-many-arguments
[docs]class LedgerQueryBase(contextlib.AbstractContextManager): # Duplicated from credmark/cmf/types/ledger.py for easy access from context class JoinType(str, Enum): INNER = 'inner' LEFT_OUTER = 'leftOuter' RIGHT_OUTER = 'rightOuter' FULL_OUTER = 'fullOuter' CROSS = 'cross' NATURAL = 'natural' def __enter__(self): return self def __exit__(self, *_exc): return None def __init__(self): pass @classmethod def field(cls, value): return ColumnField(value) # pylint: disable=too-many-locals def _gen_model_input(self, model_slug: str, originator: str, columns: Optional[Union[list[str], list[ColumnField]]] = None, joins: Optional[list[JoinAllTypes]] = None, where: Optional[str] = None, group_by: Optional[Union[list[str], list[ColumnField]]] = None, order_by: Optional[Union[str, ColumnField]] = None, limit: Optional[int] = None, offset: Optional[int] = None, aggregates: Optional[list[Tuple[str, str]]] = None, having: Optional[str] = None, analytics_mode: Optional[bool] = None) -> dict: aggregates_names = [agg[0] for agg in aggregates] if aggregates else [] if group_by is not None: if columns == []: columns = None if columns is not None: raise InvalidQueryException( model_slug, (f'{model_slug} call with group_by will need the columns to be ' 'empty [] or None.')) columns = [c for c in group_by if c in self.columns and c not in aggregates_names] # type: ignore if not columns and not aggregates: raise InvalidQueryException( model_slug, f'{model_slug} call must have at least one column or aggregate.') if columns is None: columns = [] elif not isinstance(columns, list): raise InvalidQueryException( model_slug, f'{columns=} needs to be a list of string.') else: self._validate_columns( # type: ignore model_slug, columns) if where is None and limit is None and not aggregates: raise InvalidQueryException( model_slug, f'{model_slug} call must have a where or limit value for non-aggregate queries.') if limit is not None and (not isinstance(limit, int) or limit < 0): raise InvalidQueryException( model_slug, f'{limit=} needs to be a positive integer.') if offset is not None and (not isinstance(offset, int) or offset < 0): raise InvalidQueryException( model_slug, f'{offset=} needs to be a positive integer.') # Fix for contract ledger # Customized columns needs to be converted to string to avoid losing precision. # https://github.com/credmark/credmark-model-runner-api/issues/50 cols_customized = [(f'{c}::TEXT', c) for c in columns if c in self.bigint_cols] # type: ignore columns = [c for c in columns if c not in self.bigint_cols] # type: ignore aggregates_list = ( [] if aggregates is None else aggregates) + cols_customized aggregates_value = (None if len(aggregates_list) == 0 else [LedgerAggregate(expression=agg[0], asName=agg[1]) for agg in aggregates_list]) joins_value = [LedgerJoin(tableKey=table.table_key, alias=table.alias, on=on, type=(type_list[0] if type_list else None)) # type: ignore for (*type_list, table, on) in joins] if joins is not None else None return {'alias': getattr(self, 'alias', None), 'columns': columns, 'joins': joins_value, 'aggregates': aggregates_value, 'where': where, 'groupBy': ','.join(group_by) if group_by is not None else None, 'having': having, 'orderBy': order_by, 'limit': str(limit) if limit is not None else None, 'offset': str(offset) if offset is not None else None, 'originator': originator, 'analyticsMode': analytics_mode }
# pylint: disable=too-many-arguments, protected-access
[docs]class LedgerQuery(LedgerQueryBase): def __init__(self, **kwargs): super().__init__() self._cwgo_query = kwargs['cwgo_query_table']
[docs] def select(self, columns: Optional[Union[list[str], list[ColumnField]]] = None, joins: Optional[list[JoinAllTypes]] = None, where: Optional[str] = None, group_by: Optional[Union[list[str], list[ColumnField]]] = None, order_by: Optional[Union[str, ColumnField]] = None, limit: Optional[int] = None, offset: Optional[int] = None, aggregates: Optional[list[tuple[str, str]]] = None, having: Optional[str] = None, bigint_cols: Optional[list[str]] = None, analytics_mode: Optional[bool] = None) -> LedgerModelOutput: """ Query data from the table. """ context = credmark.cmf.model.ModelContext.current_context() model_input = self._gen_model_input( model_slug=self._cwgo_query, originator=context.__dict__['slug'], columns=columns, joins=joins, where=where, group_by=group_by, order_by=order_by, limit=limit, offset=offset, aggregates=aggregates, having=having, analytics_mode=analytics_mode) ledger_out = context.run_model(slug=self._cwgo_query, input=model_input, return_type=LedgerModelOutput) ledger_out.set_bigint_cols( self.bigint_cols + # type: ignore ([] if bigint_cols is None else bigint_cols)) return ledger_out