import abc import typing import datetime import schema class Adapter(abc.ABC): schema: schema.Schema def __init__(self, config, *args, **kwargs): _ = args, kwargs self.config = config @classmethod def new(cls, config: dict, *args, **kwargs): return cls(cls.schema.validate(config) , *args, **kwargs) @abc.abstractmethod def login(self) -> bool: raise NotImplementedError() class Source(abc.ABC): @abc.abstractmethod def get_events( self, start: datetime.datetime | None=None, until: datetime.timedelta | None=None, limit=None, ) -> typing.Iterable[dict]: raise NotImplementedError() class Sink(abc.ABC): @abc.abstractmethod def post_events( self, events, start: datetime.datetime | None=None, until: datetime.timedelta | None=None, ) -> bool: raise NotImplementedError()