From 464e7b5c054035c88d8629bf4b94a9499c6ce4e5 Mon Sep 17 00:00:00 2001 From: redxef Date: Tue, 6 Dec 2022 18:54:21 +0100 Subject: [PATCH] Add explicity transaction kwargs to all methods. --- radicale_sql/__init__.py | 383 ++++++++++++++++++++++----------------- 1 file changed, 212 insertions(+), 171 deletions(-) diff --git a/radicale_sql/__init__.py b/radicale_sql/__init__.py index 8ac9ba4..92e935c 100644 --- a/radicale_sql/__init__.py +++ b/radicale_sql/__init__.py @@ -52,7 +52,7 @@ class Collection(BaseCollection): text=row.data.decode(), ) - def get_multi(self, hrefs: Iterable[str]) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]: + def _get_multi(self, hrefs: Iterable[str], *, connection) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]: item_table = self._storage._meta.tables['item'] hrefs_ = list(hrefs) #hrefs_ = [(x,) for x in hrefs] @@ -69,15 +69,18 @@ class Collection(BaseCollection): ), ) l = [] - with self._storage._engine.begin() as connection: - for row in connection.execute(select_stmt): - l += [(row.name, self._row_to_item(row))] + for row in connection.execute(select_stmt): + l += [(row.name, self._row_to_item(row))] hrefs_set = set(hrefs_) hrefs_set_have = set([x[0] for x in l]) l += [(x, None) for x in (hrefs_set - hrefs_set_have)] return l - def get_all(self) -> Iterator["radicale_item.Item"]: + def get_multi(self, hrefs: Iterable[str]) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]: + with self._storage._engine.begin() as c: + return self._get_multi(hrefs=hrefs, connection=c) + + def _get_all(self, *, connection) -> Iterator["radicale_item.Item"]: item_table = self._storage._meta.tables['item'] select_stmt = sa.select( item_table.c, @@ -90,7 +93,13 @@ class Collection(BaseCollection): for row in connection.execute(select_stmt): yield self._row_to_item(row) - def upload(self, href: str, item: "radicale_item.Item") -> "radicale_item.Item": + def get_all(self) -> Iterator["radicale_item.Item"]: + with self._storage._engine.begin() as c: + for i in self._get_all(connection=c): + yield i + + + def _upload(self, href: str, item: "radicale_item.Item", *, connection) -> "radicale_item.Item": item_table = self._storage._meta.tables['item'] item_serialized = item.serialize().encode() @@ -121,16 +130,19 @@ class Collection(BaseCollection): item_table.c.name == href, ), ) - with self._storage._engine.begin() as connection: - if connection.execute(select_stmt).one_or_none() is None: - connection.execute(insert_stmt) - else: - connection.execute(update_stmt) - res = list(self.get_multi([href]))[0][1] + if connection.execute(select_stmt).one_or_none() is None: + connection.execute(insert_stmt) + else: + connection.execute(update_stmt) + res = list(self._get_multi([href], connection=connection))[0][1] assert res is not None return res - def delete(self, href: Optional[str] = None) -> None: + def upload(self, href: str, item: "radicale_item.Item") -> "radicale_item.Item": + with self._storage._engine.begin() as c: + return self._upload(href, item, connection=c) + + def _delete(self, *, connection, href: Optional[str] = None) -> None: collection_table = self._storage._meta.tables['collection'] item_table = self._storage._meta.tables['item'] if href is None: @@ -148,10 +160,13 @@ class Collection(BaseCollection): item_table.c.name == href, ), ) - with self._storage._engine.begin() as connection: - connection.execute(delete_stmt) + connection.execute(delete_stmt) - def get_meta(self, key: Optional[str] = None) -> Union[Mapping[str, str], Optional[str]]: + def delete(self, href: Optional[str] = None) -> None: + with self._storage._engine.begin() as c: + return self._delete(connection=c, href=href) + + def _get_meta(self, *, connection, key: Optional[str] = None) -> Union[Mapping[str, str], Optional[str]]: collection_metadata = self._storage._meta.tables['collection_metadata'] select_meta = sa.select( collection_metadata.c.key, @@ -166,14 +181,17 @@ class Collection(BaseCollection): collection_metadata.c.key == key, ) metadata = {} - with self._storage._engine.begin() as connection: - for row in connection.execute(select_meta): - metadata[row.key] = row.value + for row in connection.execute(select_meta): + metadata[row.key] = row.value if key is not None: return metadata.get(key) return metadata - def set_meta(self, props: Mapping[str, str]) -> None: + def get_meta(self, key: Optional[str] = None) -> Union[Mapping[str, str], Optional[str]]: + with self._storage._engine.begin() as c: + return self._get_meta(connection=c, key=key) + + def _set_meta(self, props: Mapping[str, str], *, connection) -> None: collection_metadata = self._storage._meta.tables['collection_metadata'] delete_stmt = sa.delete( collection_metadata, @@ -183,12 +201,14 @@ class Collection(BaseCollection): insert_stmt = sa.insert( collection_metadata, ).values([dict(collection_id=self._id, key=k, value=v) for k, v in props.items()]) - with self._storage._engine.begin() as connection: - connection.execute(delete_stmt) - connection.execute(insert_stmt) + connection.execute(delete_stmt) + connection.execute(insert_stmt) - @property - def last_modified(self) -> str: + def set_meta(self, props: Mapping[str, str]) -> None: + with self._storage._engine.begin() as c: + return self._set_meta(props, connection=c) + + def _last_modified(self, *, connection) -> str: collection = self._storage._meta.tables['collection'] select_stmt = sa.select( collection.c.modified, @@ -197,11 +217,15 @@ class Collection(BaseCollection): ).where( collection.c.id == self._id, ) - with self._storage._engine.begin() as connection: - c = connection.execute(select_stmt).one() + c = connection.execute(select_stmt).one() return c.modified.strftime('%a, %d %b %Y %H:%M:%S GMT') + + @property + def last_modified(self): + with self._storage._engine.begin() as c: + return self._last_modified(connection=c) - def _update_history_etag(self, href: str, item: Optional["radicale_item.Item"]) -> str: + def _update_history_etag(self, href: str, item: Optional["radicale_item.Item"], *, connection) -> str: item_history_table = self._storage._meta.tables['item_history'] select_etag_stmt = sa.select( item_history_table.c, @@ -214,43 +238,42 @@ class Collection(BaseCollection): ), ) exists: bool - with self._storage._engine.begin() as connection: - item_history = connection.execute(select_etag_stmt).one_or_none() - if item_history is not None: - exists = True - cache_etag = item_history.etag, - history_etag = item_history.history_etag + item_history = connection.execute(select_etag_stmt).one_or_none() + if item_history is not None: + exists = True + cache_etag = item_history.etag, + history_etag = item_history.history_etag + else: + exists = False + cache_etag = '' + history_etag = binascii.hexlify(os.urandom(16)).decode('ascii') + etag = item.etag if item else '' + if etag != cache_etag: + if exists: + upsert = sa.update( + item_history_table, + ).values( + etag=etag, + history_etag=history_etag, + ).where( + sa.and_( + item_history_table.c.collection_id == self._id, + item_history_table.c.name == href, + ), + ) else: - exists = False - cache_etag = '' - history_etag = binascii.hexlify(os.urandom(16)).decode('ascii') - etag = item.etag if item else '' - if etag != cache_etag: - if exists: - upsert = sa.update( - item_history_table, - ).values( - etag=etag, - history_etag=history_etag, - ).where( - sa.and_( - item_history_table.c.collection_id == self._id, - item_history_table.c.name == href, - ), - ) - else: - upsert = sa.insert( - item_history_table, - ).values( - collection_id=self._id, - name=href, - etag=etag, - history_etag=history_etag, - ) - connection.execute(upsert) + upsert = sa.insert( + item_history_table, + ).values( + collection_id=self._id, + name=href, + etag=etag, + history_etag=history_etag, + ) + connection.execute(upsert) return history_etag - def _get_deleted_history_refs(self): + def _get_deleted_history_refs(self, *, connection): item_table = self._storage._meta.tables['item'] item_history_table = self._storage._meta.tables['item_history'] select_stmt = sa.select( @@ -267,17 +290,16 @@ class Collection(BaseCollection): ).where( item_table.c.id == None, ) - with self._storage._engine.begin() as connection: - for row in connection.execute(select_stmt): - yield row.href + for row in connection.execute(select_stmt): + yield row.href - def _delete_history_refs(self): + def _delete_history_refs(self, *, connection): item_history_table = self._storage._meta.tables['item_history'] delete_stmt = sa.delete( item_history_table, ).where( sa.and_( - item_history_table.c.href.in_(list(self._get_deleted_history_refs())), + item_history_table.c.href.in_(list(self._get_deleted_history_refs(connection=connection))), item_history_table.c.collection_id == self._id, item_history_table.c.modified < (datetime.datetime.now() - datetime.timedelta(seconds=self._storage.configuration.get('storage', 'max_sync_token_age'))) ), @@ -285,12 +307,10 @@ class Collection(BaseCollection): with self._storage._engine.begin() as connection: connection.execute(delete_stmt) - def sync(self, old_token: str = '') -> Tuple[str, Iterable[str]]: + def _sync(self, *, connection, old_token: str = '') -> Tuple[str, Iterable[str]]: _prefix = 'http://radicale.org/ns/sync/' collection_state_table = self._storage._meta.tables['collection_state'] def check_token_name(token_name: str) -> bool: - print(token_name) - print(len(token_name)) if len(token_name) != 64: return False for c in token_name: @@ -311,10 +331,10 @@ class Collection(BaseCollection): # compute new state for href, item in itertools.chain( ((item.href, item) for item in self.get_all()), - ((href, None) for href in self._get_deleted_history_refs()) + ((href, None) for href in self._get_deleted_history_refs(connection=connection)) ): assert isinstance(href, str) - history_etag = self._update_history_etag(href, item) + history_etag = self._update_history_etag(href, item, connection=connection) state[href] = history_etag token_name_hash.update((href + '/' + history_etag).encode()) token_name = token_name_hash.hexdigest() @@ -326,31 +346,30 @@ class Collection(BaseCollection): # load old state old_state = {} - with self._storage._engine.begin() as connection: - if old_token_name: - select_stmt = sa.select( - collection_state_table.c, - ).select_from( - collection_state_table, - ).where( - sa.and_( - collection_state_table.c.collection_id == self._id, - collection_state_table.c.name == old_token_name, - ), - ) - state_row = connection.execute(select_stmt).one() - state = json.loads(state_row.state.decode()) - - # store new state - ## should never be a duplicate - insert_stmt = sa.insert( + if old_token_name: + select_stmt = sa.select( + collection_state_table.c, + ).select_from( collection_state_table, - ).values( - collection_id=self._id, - name=token_name, - state=json.dumps(state).encode(), + ).where( + sa.and_( + collection_state_table.c.collection_id == self._id, + collection_state_table.c.name == old_token_name, + ), ) - connection.execute(insert_stmt) + state_row = connection.execute(select_stmt).one() + state = json.loads(state_row.state.decode()) + + # store new state + ## should never be a duplicate + insert_stmt = sa.insert( + collection_state_table, + ).values( + collection_id=self._id, + name=token_name, + state=json.dumps(state).encode(), + ) + connection.execute(insert_stmt) changes = [] for href, history_etag in state.items(): @@ -362,6 +381,10 @@ class Collection(BaseCollection): return token, changes + def sync(self, old_token: str = '') -> Tuple[str, Iterable[str]]: + with self._storage._engine.begin() as c: + return self._sync(connection=c, old_token=old_token) + class Storage(BaseStorage): def __init__(self, configuration: "radicale.config.Configuration"): @@ -377,7 +400,7 @@ class Storage(BaseStorage): path_parts = path_parts[:-1] return path_parts - def discover(self, path: str, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]: + def _discover(self, path: str, *, connection, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]: logger.info("path = %s, depth = %s", path, depth) if path == '/': return [Collection(self, self._root_collection.id, '')] @@ -440,26 +463,28 @@ class Storage(BaseStorage): ) l = [] - with self._engine.begin() as connection: - self_collection = connection.execute(select_stmt).one_or_none() - if self_collection is None: - return [] - self_collection = Collection(self, self_collection.id, '/'.join(path_parts)) - l += [self_collection] - if select_sub_stmt is not None: - for row in connection.execute(select_sub_stmt): - path = '/'.join(path_parts) - path += '/' - path += row.name - if row.type_ == 'collection': - l += [Collection(self, row.id, path)] - elif row.type_ == 'item': - assert self_collection is not None - l += [self_collection._row_to_item(row)] + self_collection = connection.execute(select_stmt).one_or_none() + if self_collection is None: + return [] + self_collection = Collection(self, self_collection.id, '/'.join(path_parts)) + l += [self_collection] + if select_sub_stmt is not None: + for row in connection.execute(select_sub_stmt): + path = '/'.join(path_parts) + path += '/' + path += row.name + if row.type_ == 'collection': + l += [Collection(self, row.id, path)] + elif row.type_ == 'item': + assert self_collection is not None + l += [self_collection._row_to_item(row)] return l + def discover(self, path: str, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]: + with self._engine.begin() as c: + return self._discover(path, connection=c, depth=depth) - def move(self, item: "radicale_item.Item", to_collection: "BaseCollection", to_href: str) -> None: + def _move(self, item: "radicale_item.Item", to_collection: "BaseCollection", to_href: str, *, connection) -> None: assert isinstance(item.collection, Collection) assert isinstance(to_collection, Collection) src_collection_id = item.collection._id @@ -485,13 +510,18 @@ class Storage(BaseStorage): item_table.c.name == item.href, ) ) - with self._engine.begin() as connection: - connection.execute(delete_stmt) - connection.execute(update_stmt) + connection.execute(delete_stmt) + connection.execute(update_stmt) - def create_collection( + def move(self, item: "radicale_item.Item", to_collection: "BaseCollection", to_href: str) -> None: + with self._engine.begin() as c: + return self._move(item, to_collection, to_href, connection=c) + + def _create_collection( self, href: str, + *, + connection, items: Optional[Iterable["radicale_item.Item"]]=None, props: Optional[Mapping[str, str]]=None, ) -> "BaseCollection": @@ -503,68 +533,79 @@ class Storage(BaseStorage): collection_metadata_table = self._meta.tables['collection_metadata'] item_table = self._meta.tables['item'] - with self._engine.begin() as connection: - for p in path: - select_stmt = sa.select( - collection_table.c, - ).select_from( - collection_table, - ).where( - sa.and_( - collection_table.c.parent_id == parent_id, - collection_table.c.name == p, - ), - ) - c = connection.execute(select_stmt).one_or_none() - if c is None: - insert_stmt = sa.insert( - collection_table, - ).values( - parent_id=parent_id, - name=p, - ).returning( - collection_table.c, - ) - c = connection.execute(insert_stmt).one() - parent_id = c.id - if items is not None or props is not None: - # drop all subcollections and items - delete_collections_stmt = sa.delete( - collection_table, - ).where( + for p in path: + select_stmt = sa.select( + collection_table.c, + ).select_from( + collection_table, + ).where( + sa.and_( collection_table.c.parent_id == parent_id, - ) - delete_meta_stmt = sa.delete( - collection_metadata_table, - ).where( - collection_metadata_table.c.collection_id == parent_id, - ) - delete_items_stmt = sa.delete( - item_table, - ).where( - item_table.c.collection_id == parent_id, - ) - connection.execute(delete_collections_stmt) - connection.execute(delete_meta_stmt) - connection.execute(delete_items_stmt) - if props is not None: + collection_table.c.name == p, + ), + ) + c = connection.execute(select_stmt).one_or_none() + if c is None: insert_stmt = sa.insert( - collection_metadata_table, - ).values([dict(collection_id=parent_id, key=k, value=v) for k, v in props.items()]) - connection.execute(insert_stmt) - if props is not None and 'key' in props and items is not None: - print(items) - # TODO insert items + collection_table, + ).values( + parent_id=parent_id, + name=p, + ).returning( + collection_table.c, + ) + c = connection.execute(insert_stmt).one() + parent_id = c.id + if items is not None or props is not None: + # drop all subcollections and items + delete_collections_stmt = sa.delete( + collection_table, + ).where( + collection_table.c.parent_id == parent_id, + ) + delete_meta_stmt = sa.delete( + collection_metadata_table, + ).where( + collection_metadata_table.c.collection_id == parent_id, + ) + delete_items_stmt = sa.delete( + item_table, + ).where( + item_table.c.collection_id == parent_id, + ) + connection.execute(delete_collections_stmt) + connection.execute(delete_meta_stmt) + connection.execute(delete_items_stmt) + if props is not None: + insert_stmt = sa.insert( + collection_metadata_table, + ).values([dict(collection_id=parent_id, key=k, value=v) for k, v in props.items()]) + connection.execute(insert_stmt) + if props is not None and 'key' in props and items is not None: + print(items) + # TODO insert items c = Collection(self, parent_id, '/'.join(path)) - print(c) return c + def create_collection( + self, + href: str, + items: Optional[Iterable["radicale_item.Item"]]=None, + props: Optional[Mapping[str, str]]=None, + ) -> "BaseCollection": + with self._engine.begin() as c: + return self._create_collection(href, connection=c, items=items, props=props) @radicale.types.contextmanager - def acquire_lock(self, mode: str, user: str = "") -> Iterator[None]: - # locking happens on a db level + def acquire_lock(self, mod: str, user: str = "") -> Iterator[None]: yield - def verify(self) -> bool: + def _verify(self, *, connection) -> bool: + _ = connection return True + + def verify(self): + with self._engine.begin() as c: + return self._verify(connection=c) +