# pylint: disable=line-too-long
from abc import abstractmethod
from contextlib import contextmanager
from contextvars import ContextVar, Token
from typing import Any, Generator, Type, TypeVar, Union, overload
from web3 import AsyncWeb3, Web3
from credmark.cmf.engine.web3.batch_fallback import Web3Batch, Web3BatchFallback
from credmark.cmf.engine.web3.registry import Web3Registry
from credmark.cmf.types import BlockNumber, Network
from credmark.dto import DTOType
from .errors import ModelNoContextError
from .ledger import Ledger
from .models import Models
from .utils.historical_util import HistoricalUtil
DTOT = TypeVar('DTOT')
MaybeModelContext = Union['ModelContext', None]
[docs]class ModelContext:
Model contexts class. It holds the current context (chain id
and block number) as well as helpers for getting ledger data,
running other models both individually and historically over a
series of blocks, looking up contracts, and accessing a web3
Current context uses contextvars to ensure setting and getting
context is safe when using concurrent code. It can be used with
either threads or asyncio.
You can access an instance of this class from a model
as ``self.context``.
__current_context: ContextVar[MaybeModelContext] = ContextVar(
'context', default=None)
[docs] @classmethod
def get_current_context(cls) -> MaybeModelContext:
Get the current context, which could be None.
Normally you should use current_context() instead.
return cls.__current_context.get()
[docs] @classmethod
def set_current_context(cls, context: MaybeModelContext) -> Token[MaybeModelContext]:
Set the current context, which could be None.
Normally you should not use this method.
return cls.__current_context.set(context)
[docs] @classmethod
def reset_current_context(cls, token: Token[MaybeModelContext]):
Reset the context to the value it had before
`ModelContext.set_current_context` was called.
Pass the token returned by `ModelContext.set_current_context`
[docs] @classmethod
def current_context(cls) -> 'ModelContext':
Get the current context and raise a ModelNoContextError
exception if there is no current context.
context = cls.get_current_context()
if context is None:
raise ModelNoContextError("No current ModelContext")
return context
def __init__(self, chain_id: int, block_number: BlockNumber,
web3_registry: Web3Registry):
self._chain_id = chain_id
self._block_number = block_number
self._web3_registry = web3_registry
self._web3 = None
self._web3_async = None
self._web3_batch = None
self._ledger = None
self._historical_util = None
self._models = None
[docs] @contextmanager
def fork(self,
chain_id: Union[int, None] = None,
block_number: Union[BlockNumber, int, None] = None
) -> Generator['ModelContext', Any, None]:
The ``context.forked`` method can be use to create a temporary context.
It inherits all the properties from the `current_context`. Some properties
like `block_number` and `chain_id` can be changed.
if isinstance(block_number, int):
block_number = BlockNumber(block_number)
yield ModelContext(self.chain_id if chain_id is None else chain_id,
self.block_number if block_number is None else block_number,
def models(self) -> Models:
The ``context.models`` attribute can be used to run models with a method
call, with any ``-`` in the model slug replaced with ``_``.
For example:
- ``context.run_model('example.model')`` becomes ``context.models.example.model()``
- ``context.run_model('example.ledger-blocks')`` becomes
- ``context.run_model('var-model')`` becomes ``context.models.var_model()``
The input that you pass to ``context.run_model()`` can be
passed to the method call as keyword (named) args, for example::
output = context.models.rpc.get_blocknumber(timestamp=1438270017)
Or as an input ``DTO`` or dict::
output = context.models.example.model(input_dto)
You can use the run output to create an output DTO::
output = EchoDto(**context.models.example.model(input_dto))
You can run a model with a context of a block number (it must be lower than the
block number of the current context) by calling the ``models`` instance with a
``block_number`` arg::
output = context.models(block_number=123).example.model(input_dto)
if self._models is None:
# The models instance can be used to run models like a method
# We don't pass the block_number so it uses the default
# (our context) block number.
self._models = Models(self)
return self._models
def _model_manifests(self, underscore_slugs=False) -> dict:
# Context implementation will override this to return
# a dict of slug to manifest dict containing available models.
def _class_for_model(self, slug: str, version: Union[str, None] = None):
# Context implementation will override this to return
# the model class for a slug. If the model is not available locally
# it will return None.
def chain_id(self) -> int:
Context chain id as an integer
return self._chain_id
def network(self) -> Network:
Context chain id as an integer
return Network(self._chain_id)
def block_number(self) -> BlockNumber:
Context block number. A credmark.cmf.types.BlockNumber instance.
return self._block_number
def block_number(self, block_number: int):
if block_number != self._block_number:
self._block_number = BlockNumber(block_number)
self._web3 = None
self._web3_async = None
self._ledger = None
self._historical_util = None
def web3(self) -> Web3:
A configured web3 instance
if self._web3 is None:
self._web3 = self._web3_registry.web3_for_chain_id(self.chain_id)
self._web3.eth.default_block = self.block_number if self.block_number is not None else 'latest'
return self._web3
def web3_async(self) -> AsyncWeb3:
A configured web3 instance
if self._web3_async is None:
self._web3_async = self._web3_registry.async_web3_for_chain_id(self.chain_id)
self._web3_async.eth.default_block = self.block_number if self.block_number is not None else 'latest'
return self._web3_async
def ledger(self) -> Ledger:
A :class:`~credmark.cmf.model.ledger.Ledger` instance which can be
used to query the ledger for data.
if self._ledger is None:
self._ledger = Ledger()
return self._ledger
def web3_batch(self) -> Web3Batch:
A :class:`~credmark.cmf.engine.web3.web3_multicall.Web3Multicall` instance which can be
used to batch query web3 using multicall.
if self._web3_batch is None:
self._web3_batch = Web3BatchFallback()
return self._web3_batch
def historical(self) -> HistoricalUtil:
A :class:`~credmark.cmf.model.utils.historical_util.HistoricalUtil`
instance which can be used to run a model over a series of blocks based
on time or block intervals.
if self._historical_util is None:
self._historical_util = HistoricalUtil()
return self._historical_util
def run_model(self,
slug: str,
input: Union[dict, DTOType],
return_type: Type[DTOT],
block_number: Union[int, None] = None,
version: Union[str, None] = None,
local: bool = False
) -> DTOT: ...
def run_model(self,
slug: str,
input: Union[dict, DTOType],
return_type: Union[Type[dict], None] = None,
block_number: Union[int, None] = None,
version: Union[str, None] = None,
local: bool = False
) -> dict: ...
[docs] @abstractmethod
def run_model(self, # type: ignore
local: bool = False
) -> Any:
Run a model by slug and optional version.
In order for models to be consistently deterministic, the **ONLY** type
of error a model should catch and handle from a call to ``run_model()``
is a ``ModelDataError``, which is considered a permanent error for the
given context. All other errors are considered transient, coding errors,
or conditions that may change in the future.
slug (str): the slug of the model
input (dict | DTO): an optional dictionary or DTO instance of
input data that will be passed to the model when it is run.
block_number (int | None): optional block number to use as context.
If None, the block_number of the current context will be used.
version (str | None): optional version of the model.
If version is None, the latest version of
the model is used. Use of this parameter is NOT recommended.
return_type (DTO Type | None): optional class to use for the
returned output data. If not specified, returned value is a dict.
If a DTO specified, the returned value will be an instance
of that class if the output data is compatible with it. If its not,
an exception will be raised.
The output returned by the model's run() method as a dict
or a DTO instance if return_type is specified.
ModelDataError: A catch-able permanent error.
ModelRunError: A non-permanent run error. Should not be caught.
ModelNotFoundError: Requested model was not found. Should not be caught.
ModelBaseError: other subclasses of ``ModelBaseError`` that should not be caught.