Source code for credmark.cmf.types.contract_web3

# pylint:disable=too-many-locals, too-many-arguments, invalid-name


import asyncio
import math
from typing import Optional, Sequence, Union, cast

from web3 import AsyncWeb3, Web3
from web3._utils.events import get_event_data
from web3._utils.filters import construct_event_filter_params
from web3.contract.base_contract import BaseContractEvent
from web3.types import ABIEvent, FilterParams
from web3.utils.abi import get_abi_input_names

# from web3._utils.abi import get_constructor_abi, merge_args_and_kwargs
# from web3._utils.contracts import encode_abi


[docs]async def fetch_events_async(async_web3: AsyncWeb3, event_filter_params: FilterParams, from_block: int, to_block: int, num_of_chunk: int = 10): """ events = asyncio.run(fetch_events_async(async_web3, event_filter_params, from_block, to_block, num_of_chunk)) # type: ignore """ size_per_chunk = (to_block - from_block + 1) // num_of_chunk if size_per_chunk > 0: tasks = [] for i in range(num_of_chunk): _from_block = from_block + i * size_per_chunk if i == num_of_chunk - 1: _to_block = to_block else: _to_block = _from_block + size_per_chunk - 1 event_filter = event_filter_params.copy() event_filter['fromBlock'] = _from_block event_filter['toBlock'] = _to_block tasks.append(async_web3.eth.get_logs(event_filter)) else: event_filter = event_filter_params.copy() event_filter['fromBlock'] = from_block event_filter['toBlock'] = to_block tasks = [async_web3.eth.get_logs(event_filter)] events = [] for result in asyncio.as_completed(tasks): evts = await result events.extend(evts) return events
# pylint: disable=too-many-branches
[docs]def fetch_events( event: BaseContractEvent, argument_filters=None, from_block=None, to_block=None, address=None, topics=None, contract_address=None, argument_names: Optional[Sequence[str]] = None, by_range: Optional[int] = None, async_web3: Optional[AsyncWeb3] = None, async_worker: int = 10): """Get events using eth_getLogs API. This is a stateless method, as opposite to createFilter and works with stateless nodes like QuickNode and Infura. :param event: Event instance from your contract.events :param argument_filters: :param from_block: Start block. Use 0 for all history/ :param to_block: Fetch events until this contract :param address: :param topics: :return: """ if from_block is None: raise TypeError( "Missing mandatory keyword argument to getLogs: from_Block") w3: Union[Web3, AsyncWeb3] = event.w3 if isinstance(w3, AsyncWeb3): raise TypeError( "Contract was initiailized with async_web3. fetch_events only support web3." ) if to_block is None: to_block = w3.eth.block_number # TODO: existing code reports error with multiple events with same name, different input # abi = event._get_event_abi() # pylint:disable=protected-access # TODO: Use manual filtering now: below code is adapted from web3 # from web3._utils.contracts import find_matching_event_abi event_abis = [abi for abi in event.contract_abi if 'name' in abi and abi['name'] == event.event_name and (argument_names is None or set(get_abi_input_names(abi)) == set(argument_names))] if len(event_abis) > 1: raise ValueError( 'Multiple events found with same name and argument names') event_abi = cast(ABIEvent, event_abis[0]) abi_codec = w3.codec # Set up any indexed event filters if needed argument_filters = {} if argument_filters is None else argument_filters _filters = {**argument_filters} _data_filter_set, event_filter_params = construct_event_filter_params( event_abi, abi_codec, contract_address=event.address if contract_address is None else contract_address, argument_filters=_filters, fromBlock=from_block, toBlock=to_block, address=address, topics=topics, ) if async_web3 is not None: events = asyncio.run(fetch_events_async(async_web3, event_filter_params, from_block, to_block, async_worker)) # type: ignore events = sorted(events, key=lambda x: (x.blockNumber, x.logIndex)) for entry in events: data = {**get_event_data(abi_codec, event_abi, entry)} args = data['args'] yield {**data, **args} # type: ignore else: if by_range is None: # Call node over JSON-RPC API events = w3.eth.get_logs(event_filter_params) # Convert raw binary event data to easily manipulable Python objects for entry in events: data = {**get_event_data(abi_codec, event_abi, entry)} args = data['args'] yield {**data, **args} # type: ignore else: n_range_upper = int(math.ceil((int(to_block) - int(from_block) + 1) / by_range)) for n_range in range(n_range_upper): _from_block = from_block + n_range * by_range if n_range == n_range_upper - 1: _to_block = to_block else: _to_block = _from_block + by_range - 1 event_filter_params |= {'fromBlock': _from_block, 'toBlock': _to_block} # Call node over JSON-RPC API events = w3.eth.get_logs(event_filter_params) # Convert raw binary event data to easily manipulable Python objects for entry in events: data = {**get_event_data(abi_codec, event_abi, entry)} args = data['args'] yield {**data, **args} # type: ignore