# pylint: disable=line-too-long
import inspect
import logging
import re
from abc import abstractmethod
from copy import deepcopy
from typing import List, Literal, Tuple, Type, Union
from credmark.cmf.engine.cache import CachePolicy
from credmark.cmf.types import BlockNumber
from credmark.cmf.types.series import BlockSeries, ImmutableOutput
from credmark.dto import DTOType, DTOTypesTuple, EmptyInput
from credmark.dto.transform import transform_data_for_dto
from .context import ModelContext
from .errors import ModelBaseError, ModelDataErrorDTO, ModelDefinitionError
[docs]class MissingModelBaseClass(ModelDefinitionError):
pass
[docs]class WrongModelMethodSignature(ModelDefinitionError):
pass
[docs]class InvalidModelCacheKey(ModelDefinitionError):
pass
[docs]class InvalidModelSlug(ModelDefinitionError):
pass
DICT_SCHEMA = {"title": "Object", "type": "object", "properties": {}}
# We could probably get the input and output DTOs from
# the run() method signature instead of passing them as params to
# the decorator but there may be reasons why they want to use
# different types and the DTOs are used to document the schema.
MAX_SLUG_LENGTH = 64
model_slug_re = re.compile(
r'^(([A-Za-z0-9]+\-)*[A-Za-z0-9]+\.)?([A-Za-z0-9]+\-)*[A-Za-z0-9]+$')
[docs]class ModelErrorDesc:
"""
Model error description base class.
"""
@abstractmethod
def schema(self, slug: str, include_definitions=True) -> dict:
pass
[docs]class ModelDataErrorDesc(ModelErrorDesc):
"""
A description of ModelDataErrors raised by a model.
Contains the list of possible codes and their descriptions.
"""
def __init__(self, description: Union[str, None] = None, code: Union[str, None] = None,
code_desc: Union[str, None] = None,
codes: Union[List[Tuple[str, str]], List[str], None] = None):
"""
Create a description of a ModelDataError.
Args:
description: Description of the error
code: Error code
code_desc: Description of error code
codes: List of tuples `(code, code_description)`
"""
self.description = description if description is not None else 'ModelDataError'
self.codes: List[Tuple[str, str]] = []
if codes is not None:
for ct in codes:
self.codes.append(ct if isinstance(ct, tuple) else (ct, ct))
if code is not None:
self.codes.append((code, code_desc)
if code_desc is not None else (code, code))
self.codes.sort()
[docs] def schema(self, slug: str, include_definitions=True):
"""
Returns a schema for the base object without the definitions.
"""
schema = deepcopy(ModelDataErrorDTO.schema())
schema['title'] = f'ModelDataError_{slug.replace(".","_").replace("-", "_")}'
schema['description'] = self.description
schema['properties']['type']['description'] = "ModelDataError"
code_prop = schema['properties']['code']
del code_prop['default']
code_prop['enum'] = [ct[0] for ct in self.codes]
code_desc_list = [f"\'{ct[0]}\' - {ct[1]}" for ct in self.codes]
code_prop['description'] = f'Code values: {"; ".join(code_desc_list)}'
required = schema['required']
if 'code' not in required:
required.append('code')
if not include_definitions:
del schema['definitions']
return schema
[docs]def create_error_schema_for_error_descs(slug: str,
errors: Union[List[ModelErrorDesc], ModelErrorDesc, None]):
try:
base_error_schema = ModelBaseError.base_error_schema()
if isinstance(errors, ModelErrorDesc):
errors = [errors]
if errors is None or len(errors) == 0:
return base_error_schema
else:
general_error = deepcopy(base_error_schema)
definitions = general_error['definitions']
del general_error['definitions']
definitions[general_error['title']] = general_error
combined_schema = {
'title': f'ModelErrors_{slug.replace(".","_").replace("-", "_")}',
'oneOf': [{'$ref': f'#/definitions/{general_error["title"]}'}],
'definitions': definitions
}
oneOf = combined_schema['oneOf']
for i, err_desc in enumerate(errors):
schema = err_desc.schema(slug, False)
title = schema["title"]
if title in definitions:
# handle name clash
title = f'{title}_{i+1}'
schema['title'] = title
oneOf.append({'$ref': f'#/definitions/{title}'})
definitions[title] = schema
return combined_schema
except Exception as err:
raise ModelDefinitionError(
f'Exception processing "errors" in model {slug} describe(): {err}') from None
[docs]def validate_model_slug(slug: str, prefix: Union[str, None] = None):
if prefix is not None and not slug.startswith(prefix):
raise InvalidModelSlug(
f'Slug for model {slug} must start with "{prefix}"')
if model_slug_re.match(slug) is None or len(slug) > MAX_SLUG_LENGTH:
quoted_prefix = f'"{prefix}"' if prefix else ''
raise InvalidModelSlug(
f'Invalid model slug "{slug}". '
f'Following the prefix {quoted_prefix}' if prefix else 'Following a prefix and dot'
', slugs must start and end with a letter or number and may contain hyphens.')
if len(slug) > MAX_SLUG_LENGTH:
raise InvalidModelSlug(
f'Invalid model slug "{slug}". '
'Slugs must be not more than {MAX_SLUG_LENGTH} characters.')
# pylint: disable=too-many-arguments
def _describe(*,
slug: str,
version: str,
display_name: Union[str, None] = None,
description: Union[str, None] = None,
developer: Union[str, None] = None,
category: Union[str, None] = None,
subcategory: Union[str, None] = None,
tags: Union[list[str], None] = None,
cache: CachePolicy = CachePolicy.FULL,
input: Union[Type[DTOType], Type[dict]] = EmptyInput,
output: Union[Type[DTOType], Type[dict], None] = None,
errors: Union[List[ModelErrorDesc], ModelErrorDesc, None] = None):
def wrapper(cls_in): # pylint:disable=too-many-branches
def is_parent(child, parent):
found = parent in child.__bases__
return found or True in [is_parent(pp, parent) for pp in child.__bases__]
if cache is CachePolicy.INCREMENTAL:
base_cls = IncrementalModel
if not is_parent(cls_in, IncrementalModel):
raise MissingModelBaseClass("A model with incremental cache policy should inherit "
"from IncrementalModel imported from "
"credmark.cmf.model.")
elif cache is CachePolicy.IMMUTABLE:
base_cls = ImmutableModel
if not is_parent(cls_in, ImmutableModel):
raise MissingModelBaseClass("A model with immutable cache policy should inherit "
"from ImmutableModel imported from "
"credmark.cmf.model.")
else:
base_cls = Model
if not is_parent(cls_in, Model):
raise MissingModelBaseClass("A model should inherit from Model "
"imported from credmark.cmf.model.")
cls = cls_in
mod_parts = cls.__dict__['__module__'].split('.') # pylint: disable=too-many-arguments
if len(mod_parts) > 1 and mod_parts[1] == 'contrib':
validate_model_slug(slug, 'contrib.')
else:
validate_model_slug(slug)
# 2. has abstract functions as defined in the Model class
method_list = [method for method in dir(base_cls)
if method.startswith('__') is False and
method in base_cls.__dict__ and
'__isabstractmethod__' in base_cls.__dict__[method].__dict__ and
base_cls.__dict__[method].__isabstractmethod__]
for method in method_list:
found_this_method = False
for mro_cls in cls.__mro__:
if mro_cls is base_cls or method not in mro_cls.__dict__:
continue
found_this_method = True
# Since the run method is overloaded, the signatures won't match exactly.
# TODO: Add parameter and return value type compatibility check
break
if not found_this_method:
raise WrongModelMethodSignature(
f'Model {cls.__name__} misses a method {method}() with signature {inspect.signature(base_cls.__dict__[method])}') # pylint: disable=line-too-long
model_desc = description
if model_desc is None:
model_desc = cls.__doc__.strip() if cls.__doc__ is not None else None
attr_value = {
'credmarkModelManifest': 'v1',
'model': {
'slug': slug,
'version': version,
'displayName': display_name,
'description': model_desc,
'developer': developer if developer is not None else '',
'category': category,
'subcategory': subcategory,
'tags': tags,
'cache': cache,
'input': input.schema()
if input is not None and issubclass(input, DTOTypesTuple)
else DICT_SCHEMA,
'output': output.schema()
if output is not None and issubclass(output, DTOTypesTuple)
else DICT_SCHEMA,
'error': create_error_schema_for_error_descs(slug, errors),
'class': cls.__dict__['__module__'] + '.' + cls.__name__,
}
}
attr_name = '_manifest'
def get_attr(self, attr_name=attr_name):
return getattr(self, '_' + attr_name)
attr_prop = property(get_attr, None)
setattr(cls, attr_name, attr_prop)
setattr(cls, '_' + attr_name, attr_value)
cls.slug = slug
cls.version = version
cls.inputDTO = input
cls.outputDTO = output
return cls
return wrapper
[docs]class BaseModel:
# These class variables will be set automatically by
# the loader or decorator
slug: str
version: str
_manifest: dict
inputDTO: Union[Type[DTOType], None]
outputDTO: Union[Type[DTOType], None]
def __init__(self, context: "ModelContext"):
self.context = context
# Configure our logger.
self.logger = logging.getLogger(
'credmark.cmf.model.{0}'.format(self.slug))
self.init()
[docs] def init(self):
"""
Subclasses may override this method to do
any model instance initiation.
"""
# pylint: disable=too-many-arguments
[docs] def convert_dict_to_dto(self,
data: dict,
dto_class: Type[DTOType]):
"""
A model can call this method to convert a dict
of data in a known format into a DTO instance.
"""
return transform_data_for_dto(data, dto_class, self.slug, 'transform')
[docs]class Model(BaseModel):
"""
The base model class.
Models should subclass this class and override the
run() method. They may also override init().
Available instance variables:
- ``logger`` - a logger for messages related to the model
- ``context`` - a ``ModelContext`` instance
"""
# pylint: disable=too-many-arguments
[docs] @classmethod
def describe(cls,
*,
slug: str,
version: str,
display_name: Union[str, None] = None,
description: Union[str, None] = None,
developer: Union[str, None] = None,
category: Union[str, None] = None,
subcategory: Union[str, None] = None,
tags: Union[list[str], None] = None,
cache: Literal[CachePolicy.FULL,
CachePolicy.SKIP,
CachePolicy.IGNORE_BLOCK,
CachePolicy.OFF_CHAIN] = CachePolicy.FULL,
input: Union[Type[DTOType], Type[dict]] = EmptyInput,
output: Union[Type[DTOType], Type[dict], None] = None,
errors: Union[List[ModelErrorDesc],
ModelErrorDesc, None] = None):
"""
Decorator for credmark.cmf.model.Model subclasses to describe the model.
Example usage::
from credmark.cmf.model import Model
@Model.describe(slug='example.echo',
version='1.0',
display_name='Echo',
description="A test model to echo the message property sent in input.",
developer="my_username",
category="financial",
input=EchoDto,
output=EchoDto)
class EchoModel(Model):
Parameters:
slug: slug (short unique name) for the model. Can contain alpha-numeric and
and underscores. Contributor slugs must start with ``"contrib."``
Once submitted, the model slug cannot be changed.
version: version string, ex. ``"1.0"``. The version number can be incremented
when the model code is updated.
display_name: Name for the model
description: Description of the model. If description is not set,
the doc string (``__doc__``) of the model class is used instead.
developer: Name or nickname of the developer
category: Category of the model (ex. "financial", "protocol" etc.)
subcategory: Optional subcategory (ex. "aave")
tags: optional list of string tags describing the model
input: Type that model uses as input; a ``DTO`` subclass or dict.
Defaults to ``EmptyInput`` object.
output: Type that the model run returns; a ``DTO`` subclass or dict.
Defaults to None which will provide no output schema.
errors: If the model raises ``ModelDataError``, set this to a configured
``ModelDataErrorDesc`` instance (or a list of instances) describing the
errors. Defaults to None.
"""
return _describe(
slug=slug,
version=version,
display_name=display_name,
description=description,
developer=developer,
category=category,
subcategory=subcategory,
tags=tags,
cache=cache,
input=input,
output=output,
errors=errors,
)
[docs] @abstractmethod
def run(self, input: Union[dict, DTOType]) -> Union[dict, DTOType]:
"""
Subclasses **must** override this method to perform the work
of running the model.
Model instances may be reused so keep in mind that run()
may be called multiple times. If you are using global
data structures, make sure they are reset or cleared
after each model run.
"""
[docs]class IncrementalModel(BaseModel):
"""
Use `IncrementalModel` for models that only append to output with new blocks.
Behind the scenes it uses `CachePolicy.INCREMENTAL` cache.
These models should always return a BlockSeries. Use additional `from_block` arg
in run method to only return BlockSeries for `from_block` -> `context.block`.
"""
# pylint: disable=too-many-arguments
[docs] @classmethod
def describe(cls,
*,
slug: str,
version: str,
display_name: Union[str, None] = None,
description: Union[str, None] = None,
developer: Union[str, None] = None,
category: Union[str, None] = None,
subcategory: Union[str, None] = None,
tags: Union[list[str], None] = None,
input: Union[Type[DTOType], Type[dict]] = EmptyInput,
output: Union[Type[BlockSeries], None] = BlockSeries,
errors: Union[List[ModelErrorDesc],
ModelErrorDesc, None] = None):
"""
Decorator for credmark.cmf.model.IncrementalModel subclasses to describe the model.
It shall return a parameterized class of `BlockSeries` with below requirements:
1. Results with block number less than or equal to the current block number
2. Results with block number greater than or equal to `from_block`.
3. blockNumber >= 0
BlockSeries object has a method `.to_range(from_block, to_block)` to filter for [from_block, to_block]
Example usage::
from credmark.cmf.model import IncrementalModel
from credmark.cmf.types.series import BlockSeries
@IncrementalModel.describe(slug='example.echo-inc',
version='1.0',
display_name='Echo',
description="A test model to echo the message property sent in input.",
developer="my_username",
category="financial",
input=EchoDto,
output=BlockSeries[int])
class EchoModel(IncrementalModel):
def run(self, input: EchoDto, from_block: BlockNumber) -> BlockSeries[int]:
return BlockSeries(series=[
BlockSeriesRow(
blockNumber=0,
blockTimestamp=0,
sampleTimestamp=0,
output=1
),
BlockSeriesRow(
blockNumber=1,
blockTimestamp=1,
sampleTimestamp=1,
output=2
),
])
Parameters:
slug: slug (short unique name) for the model. Can contain alpha-numeric and
and underscores. Contributor slugs must start with ``"contrib."``
Once submitted, the model slug cannot be changed.
version: version string, ex. ``"1.0"``. The version number can be incremented
when the model code is updated.
display_name: Name for the model
description: Description of the model. If description is not set,
the doc string (``__doc__``) of the model class is used instead.
developer: Name or nickname of the developer
category: Category of the model (ex. "financial", "protocol" etc.)
subcategory: Optional subcategory (ex. "aave")
tags: optional list of string tags describing the model
input: Type that model uses as input; a ``DTO`` subclass or dict.
Defaults to ``EmptyInput`` object.
output: Type that the model run returns; a ``BlockSeries`` subclass.
errors: If the model raises ``ModelDataError``, set this to a configured
``ModelDataErrorDesc`` instance (or a list of instances) describing the
errors. Defaults to None.
"""
return _describe(
slug=slug,
version=version,
display_name=display_name,
description=description,
developer=developer,
category=category,
subcategory=subcategory,
tags=tags,
cache=CachePolicy.INCREMENTAL,
input=input,
output=output,
errors=errors,
)
@abstractmethod
def run(self, input: Union[dict, DTOType], from_block: BlockNumber) -> BlockSeries[DTOType]:
...
[docs]class ImmutableModel(BaseModel):
"""
Subclass with `ImmutableModel` for models whose result is only available after a particular
block and will not change for the future block numbers.
It shall return a derived class of `ImmutableOutput` with field firstResultBlockNumber set.
For blocks before which data is unavailable, the model should throw ModelDataError.
Behind the scenes it uses `CachePolicy.IMMUTABLE` cache.
"""
# pylint: disable=too-many-arguments
[docs] @classmethod
def describe(cls,
*,
slug: str,
version: str,
display_name: Union[str, None] = None,
description: Union[str, None] = None,
developer: Union[str, None] = None,
category: Union[str, None] = None,
subcategory: Union[str, None] = None,
tags: Union[list[str], None] = None,
input: Union[Type[DTOType], Type[dict]] = EmptyInput,
output: Union[Type[ImmutableOutput], None] = ImmutableOutput,
errors: Union[List[ModelErrorDesc],
ModelErrorDesc, None] = None):
"""
Decorator for credmark.cmf.model.ImmutableModel subclasses to describe the model.
1. Immutable model shall raise an error if result is not available before a block
```
if self.context.block_number < 100:
raise ModelDataError('Block number shall be greater than 100')
```
2. Immutable model shall return the type of ImmutableOutput with the field `firstResultBlockNumber`.
Example usage::
from credmark.cmf.model import ImmutableModel
from credmark.cmf.types.series import ImmutableOutput
@ImmutableModel.describe(slug='example.echo-imm',
version='1.0',
display_name='Echo',
description="A test model to echo the message property sent in input.",
developer="my_username",
category="financial",
input=EchoDto,
output=ImmutableOutput)
class EchoModel(ImmutableModel):
def run(self, input: EchoDto, from_block: BlockNumber) -> ImmutableOutput:
return ImmutableOutput(
...
Parameters:
slug: slug (short unique name) for the model. Can contain alpha-numeric and
and underscores. Contributor slugs must start with ``"contrib."``
Once submitted, the model slug cannot be changed.
version: version string, ex. ``"1.0"``. The version number can be incremented
when the model code is updated.
display_name: Name for the model
description: Description of the model. If description is not set,
the doc string (``__doc__``) of the model class is used instead.
developer: Name or nickname of the developer
category: Category of the model (ex. "financial", "protocol" etc.)
subcategory: Optional subcategory (ex. "aave")
tags: optional list of string tags describing the model
input: Type that model uses as input; a ``DTO`` subclass or dict.
Defaults to ``EmptyInput`` object.
output: Type that the model run returns; a ``ImmutableOutput`` subclass.
errors: If the model raises ``ModelDataError``, set this to a configured
``ModelDataErrorDesc`` instance (or a list of instances) describing the
errors. Defaults to None.
"""
return _describe(
slug=slug,
version=version,
display_name=display_name,
description=description,
developer=developer,
category=category,
subcategory=subcategory,
tags=tags,
cache=CachePolicy.IMMUTABLE,
input=input,
output=output,
errors=errors,
)
@abstractmethod
def run(self, input: Union[dict, DTOType]) -> ImmutableOutput:
...