From 4debf553e780308803cf435045902340bf82b0d5 Mon Sep 17 00:00:00 2001 From: redxef Date: Fri, 2 Dec 2022 02:38:21 +0100 Subject: [PATCH] Initial commit. --- .gitignore | 4 + README.md | 8 + radicale_sql/__init__.py | 543 +++++++++++++++++++++++++++++++++++++++ radicale_sql/db.py | 192 ++++++++++++++ setup.py | 9 + 5 files changed, 756 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 radicale_sql/__init__.py create mode 100644 radicale_sql/db.py create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..69cb2fd --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +build/ + +*.db +*.egg-info diff --git a/README.md b/README.md new file mode 100644 index 0000000..72943f8 --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +# Radicale SQL storage plugin + +## TODO + +- ensure all database operations run in one transaction +- implement caching +- write unit tests +- run cleanup of `item_history` and `collection_state` tables diff --git a/radicale_sql/__init__.py b/radicale_sql/__init__.py new file mode 100644 index 0000000..41e1bda --- /dev/null +++ b/radicale_sql/__init__.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import binascii +import datetime +import uuid +import string +import itertools +import json +from hashlib import sha256 +from typing import Optional, Union, Tuple, Iterable, Iterator, Mapping + +import radicale +import radicale.config +import radicale.types +from radicale.storage import BaseStorage, BaseCollection +from radicale.log import logger +from radicale import item as radicale_item +import sqlalchemy as sa + +from . import db + +PLUGIN_CONFIG_SCHEMA = { + 'storage': { + 'db_url': { + 'value': '', + 'type': str, + }, + }, +} + +class Collection(BaseCollection): + + def __init__(self, storage: "Storage", id: uuid.UUID, path: str): + self._storage = storage + self._id = id + self._path = path + + def __repr__(self) -> str: + return f'Collection(id={self._id}, path={self._path})' + + @property + def path(self) -> str: + return self._path + + def _row_to_item(self, row): + return radicale_item.Item( + collection=self, + href=row.name, + last_modified=row.modified, + text=row.data.decode(), + ) + + def get_multi(self, hrefs: Iterable[str]) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]: + item_table = self._storage._meta.tables['item'] + hrefs_ = list(hrefs) + #hrefs_ = [(x,) for x in hrefs] + if not hrefs_: + return [] + select_stmt = sa.select( + item_table.c, + ).select_from( + item_table, + ).where( + sa.and_( + item_table.c.collection_id == self._id, + item_table.c.name.in_(hrefs_) + ), + ) + l = [] + with self._storage._engine.begin() as connection: + for row in connection.execute(select_stmt): + l += [(row.name, self._row_to_item(row))] + hrefs_set = set(hrefs_) + hrefs_set_have = set([x[0] for x in l]) + l += [(x, None) for x in (hrefs_set - hrefs_set_have)] + return l + + def get_all(self) -> Iterator["radicale_item.Item"]: + item_table = self._storage._meta.tables['item'] + select_stmt = sa.select( + item_table.c, + ).select_from( + item_table, + ).where( + item_table.c.collection_id == self._id, + ) + with self._storage._engine.begin() as connection: + for row in connection.execute(select_stmt): + yield self._row_to_item(row) + + def upload(self, href: str, item: "radicale_item.Item") -> "radicale_item.Item": + item_table = self._storage._meta.tables['item'] + + item_serialized = item.serialize().encode() + select_stmt = sa.select( + item_table.c, + ).select_from( + item_table, + ).where( + sa.and_( + item_table.c.collection_id == self._id, + item_table.c.name == href, + ), + ) + insert_stmt = sa.insert( + item_table, + ).values( + collection_id=self._id, + name=href, + data=item_serialized, + ) + update_stmt = sa.update( + item_table, + ).values( + data=item_serialized, + ).where( + sa.and_( + item_table.c.collection_id == self._id, + item_table.c.name == href, + ), + ) + with self._storage._engine.begin() as connection: + if connection.execute(select_stmt).one_or_none() is None: + connection.execute(insert_stmt) + else: + connection.execute(update_stmt) + res = list(self.get_multi([href]))[0][1] + assert res is not None + return res + + def delete(self, href: Optional[str] = None) -> None: + collection_table = self._storage._meta.tables['collection'] + item_table = self._storage._meta.tables['item'] + if href is None: + delete_stmt = sa.delete( + collection_table, + ).where( + collection_table.c.id == self._id, + ) + else: + delete_stmt = sa.delete( + item_table, + ).where( + sa.and_( + item_table.c.collection_id == self._id, + item_table.c.name == href, + ), + ) + with self._storage._engine.begin() as connection: + connection.execute(delete_stmt) + + def get_meta(self, key: Optional[str] = None) -> Union[Mapping[str, str], Optional[str]]: + collection_metadata = self._storage._meta.tables['collection_metadata'] + select_meta = sa.select( + collection_metadata.c.key, + collection_metadata.c.value, + ).select_from( + collection_metadata, + ).where( + collection_metadata.c.collection_id == self._id, + ) + if key is not None: + select_meta = select_meta.where( + collection_metadata.c.key == key, + ) + metadata = {} + with self._storage._engine.begin() as connection: + for row in connection.execute(select_meta): + metadata[row.key] = row.value + if key is not None: + return metadata.get(key) + return metadata + + def set_meta(self, props: Mapping[str, str]) -> None: + collection_metadata = self._storage._meta.tables['collection_metadata'] + delete_stmt = sa.delete( + collection_metadata, + ).where( + collection_metadata.c.collection_id == self._id, + ) + insert_stmt = sa.insert( + collection_metadata, + ).values([dict(collection_id=self._id, key=k, value=v) for k, v in props.items()]) + with self._storage._engine.begin() as connection: + connection.execute(delete_stmt) + connection.execute(insert_stmt) + + @property + def last_modified(self) -> str: + collection = self._storage._meta.tables['collection'] + select_stmt = sa.select( + collection.c.modified, + ).select_from( + collection, + ).where( + collection.c.id == self._id, + ) + with self._storage._engine.begin() as connection: + c = connection.execute(select_stmt).one() + return c.modified.strftime('%a, %d %b %Y %H:%M:%S GMT') + + def _update_history_etag(self, href: str, item: Optional["radicale_item.Item"]) -> str: + item_history_table = self._storage._meta.tables['item_history'] + select_etag_stmt = sa.select( + item_history_table.c, + ).select_from( + item_history_table, + ).where( + sa.and_( + item_history_table.c.collection_id == self._id, + item_history_table.c.name == href, + ), + ) + exists: bool + with self._storage._engine.begin() as connection: + item_history = connection.execute(select_etag_stmt).one_or_none() + if item_history is not None: + exists = True + cache_etag = item_history.etag, + history_etag = item_history.history_etag + else: + exists = False + cache_etag = '' + history_etag = binascii.hexlify(os.urandom(16)).decode('ascii') + etag = item.etag if item else '' + if etag != cache_etag: + if exists: + upsert = sa.update( + item_history_table, + ).values( + etag=etag, + history_etag=history_etag, + ).where( + sa.and_( + item_history_table.c.collection_id == self._id, + item_history_table.c.name == href, + ), + ) + else: + upsert = sa.insert( + item_history_table, + ).values( + collection_id=self._id, + name=href, + etag=etag, + history_etag=history_etag, + ) + connection.execute(upsert) + return history_etag + + def _get_deleted_history_refs(self): + item_table = self._storage._meta.tables['item'] + item_history_table = self._storage._meta.tables['item_history'] + select_stmt = sa.select( + item_history_table.c.name, + ).select_from( + item_history_table.join( + item_table, + sa.and_( + item_history_table.c.collection_id == item_table.c.collection_id, + item_history_table.c.name == item_table.c.name, + ), + isouter=True, + ), + ).where( + item_table.c.id == None, + ) + with self._storage._engine.begin() as connection: + for row in connection.execute(select_stmt): + yield row.href + + def _delete_history_refs(self): + item_history_table = self._storage._meta.tables['item_history'] + delete_stmt = sa.delete( + item_history_table, + ).where( + sa.and_( + item_history_table.c.href.in_(list(self._get_deleted_history_refs())), + item_history_table.c.collection_id == self._id, + item_history_table.c.modified < (datetime.datetime.now() - datetime.timedelta(seconds=self._storage.configuration.get('storage', 'max_sync_token_age'))) + ), + ) + with self._storage._engine.begin() as connection: + connection.execute(delete_stmt) + + def sync(self, old_token: str = '') -> Tuple[str, Iterable[str]]: + _prefix = 'http://radicale.org/ns/sync/' + collection_state_table = self._storage._meta.tables['collection_state'] + def check_token_name(token_name: str) -> bool: + print(token_name) + print(len(token_name)) + if len(token_name) != 64: + return False + for c in token_name: + if c not in string.hexdigits.lower(): + return False + return True + + old_token_name = '' + if old_token: + if not old_token.startswith(_prefix): + raise ValueError(f'Malformed token: {old_token}') + old_token_name = old_token[len(_prefix):] + if not check_token_name(old_token_name): + raise ValueError(f'Malformed token: {old_token}') + state = {} + token_name_hash = sha256() + + # compute new state + for href, item in itertools.chain( + ((item.href, item) for item in self.get_all()), + ((href, None) for href in self._get_deleted_history_refs()) + ): + assert isinstance(href, str) + history_etag = self._update_history_etag(href, item) + state[href] = history_etag + token_name_hash.update((href + '/' + history_etag).encode()) + token_name = token_name_hash.hexdigest() + token = _prefix + token_name + + # if new state hasn't changed: dont send any updates + if token_name == old_token_name: + return token, () + + # load old state + old_state = {} + with self._storage._engine.begin() as connection: + if old_token_name: + select_stmt = sa.select( + collection_state_table.c, + ).select_from( + collection_state_table, + ).where( + sa.and_( + collection_state_table.c.collection_id == self._id, + collection_state_table.c.name == old_token_name, + ), + ) + state_row = connection.execute(select_stmt).one() + state = json.loads(state_row.state.decode()) + + # store new state + ## should never be a duplicate + insert_stmt = sa.insert( + collection_state_table, + ).values( + collection_id=self._id, + name=token_name, + state=json.dumps(state).encode(), + ) + connection.execute(insert_stmt) + + changes = [] + for href, history_etag in state.items(): + if history_etag != old_state.get(href): + changes += [href] + for href, history_etag in old_state.items(): + if href not in state: + changes += [href] + + return token, changes + +class Storage(BaseStorage): + + def __init__(self, configuration: "radicale.config.Configuration"): + super().__init__(configuration) + self._meta = db.create_meta() + self._engine, self._root_collection = db.create(self.configuration.get('storage', 'url'), self._meta) + + def _split_path(self, path: str): + path_parts = path.split('/') + if path_parts[0] == '': + path_parts = path_parts[1:] + if path_parts[-1] == '': + path_parts = path_parts[:-1] + return path_parts + + def discover(self, path: str, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]: + logger.info("path = %s, depth = %s", path, depth) + if path == '/': + return [Collection(self, self._root_collection.id, '')] + path_parts = self._split_path(path) + + collection_table = self._meta.tables['collection'] + item_table = self._meta.tables['item'] + + select_collection_or_item = sa.select( + collection_table.c.id, + collection_table.c.parent_id.label('parent_id'), + collection_table.c.modified, + collection_table.c.name, + sa.literal(None, sa.LargeBinary()).label('data'), + sa.literal('collection', sa.String(16)).label('type_'), + ).select_from( + collection_table + ).union_all(sa.select( + item_table.c.id, + item_table.c.collection_id.label('parent_id'), + item_table.c.modified, + item_table.c.name, + item_table.c.data, + sa.literal('item', sa.String(16)).label('type_'), + ).select_from( + item_table + )) + i = 0 + select_from = select_collection_or_item.alias('data') + aliases = [select_from] + for path in path_parts[::-1]: + aliases += [select_collection_or_item.alias(f't{i}')] + i += 1 + select_from = select_from.join( + aliases[-1], + sa.and_( + aliases[-2].c.parent_id == aliases[-1].c.id, + aliases[-2].c.name == path, + ), + ) + select_stmt = sa.select( + aliases[0].c, + ).select_from( + select_from + ).where( + aliases[-1].c.parent_id == None, + ) + select_sub_stmt = None + if depth != "0": + aliased = select_collection_or_item.alias('data_list') + select_sub_stmt = sa.select( + aliased.c, + ).select_from( + aliased.join( + select_from, + aliased.c.parent_id == aliases[0].c.id, + ), + ).where( + aliases[-1].c.parent_id == None, + ) + + l = [] + with self._engine.begin() as connection: + self_collection = connection.execute(select_stmt).one_or_none() + if self_collection is None: + return [] + self_collection = Collection(self, self_collection.id, '/'.join(path_parts)) + l += [self_collection] + if select_sub_stmt is not None: + for row in connection.execute(select_sub_stmt): + path = '/'.join(path_parts) + path += '/' + path += row.name + if row.type_ == 'collection': + l += [Collection(self, row.id, path)] + elif row.type_ == 'item': + assert self_collection is not None + l += [self_collection._row_to_item(row)] + return l + + + def move(self, item: "radicale_item.Item", to_collection: "BaseCollection", to_href: str) -> None: + pass + + def create_collection( + self, + href: str, + items: Optional[Iterable["radicale_item.Item"]]=None, + props: Optional[Mapping[str, str]]=None, + ) -> "BaseCollection": + print('creating collection') + print(f'href={href}, items={items}, props={props}') + path = self._split_path(href) + parent_id = self._root_collection.id + collection_table = self._meta.tables['collection'] + collection_metadata_table = self._meta.tables['collection_metadata'] + item_table = self._meta.tables['item'] + + with self._engine.begin() as connection: + for p in path: + select_stmt = sa.select( + collection_table.c, + ).select_from( + collection_table, + ).where( + sa.and_( + collection_table.c.parent_id == parent_id, + collection_table.c.name == p, + ), + ) + c = connection.execute(select_stmt).one_or_none() + if c is None: + insert_stmt = sa.insert( + collection_table, + ).values( + parent_id=parent_id, + name=p, + ).returning( + collection_table.c, + ) + c = connection.execute(insert_stmt).one() + parent_id = c.id + if items is not None or props is not None: + # drop all subcollections and items + delete_collections_stmt = sa.delete( + collection_table, + ).where( + collection_table.c.parent_id == parent_id, + ) + delete_meta_stmt = sa.delete( + collection_metadata_table, + ).where( + collection_metadata_table.c.collection_id == parent_id, + ) + delete_items_stmt = sa.delete( + item_table, + ).where( + item_table.c.collection_id == parent_id, + ) + connection.execute(delete_collections_stmt) + connection.execute(delete_meta_stmt) + connection.execute(delete_items_stmt) + if props is not None: + insert_stmt = sa.insert( + collection_metadata_table, + ).values([dict(collection_id=parent_id, key=k, value=v) for k, v in props.items()]) + connection.execute(insert_stmt) + if props is not None and 'key' in props and items is not None: + print(items) + # TODO insert items + c = Collection(self, parent_id, '/'.join(path)) + print(c) + return c + + + + @radicale.types.contextmanager + def acquire_lock(self, mode: str, user: str = "") -> Iterator[None]: + # locking happens on a db level + yield + + def verify(self) -> bool: + return True diff --git a/radicale_sql/db.py b/radicale_sql/db.py new file mode 100644 index 0000000..889c44e --- /dev/null +++ b/radicale_sql/db.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import uuid +import datetime +from typing import Tuple +import sqlalchemy as sa + + +def create_meta() -> sa.MetaData: + meta = sa.MetaData() + + sa.Table( + 'collection', + meta, + sa.Column( + 'id', + sa.Uuid(), + default=uuid.uuid4, + primary_key=True, + ), + sa.Column( + 'parent_id', + sa.Uuid(), + sa.ForeignKey("collection.id"), + index=True, + nullable=True, + ), + sa.Column( + 'modified', + sa.DateTime(), + default=datetime.datetime.now, + onupdate=datetime.datetime.now, + nullable=False, + ), + sa.Column( + 'name', + sa.String(128), + index=True, + nullable=True, + ), + sa.UniqueConstraint('parent_id', 'name'), + ) + + sa.Table( + 'collection_metadata', + meta, + sa.Column( + 'collection_id', + sa.Uuid(), + sa.ForeignKey('collection.id', ondelete='CASCADE'), + primary_key=True, + ), + sa.Column( + 'key', + sa.String(length=128), + primary_key=True, + ), + sa.Column( + 'value', + sa.Text(), + ), + sa.UniqueConstraint('collection_id', 'key'), + ) + + sa.Table( + 'collection_state', + meta, + sa.Column( + 'collection_id', + sa.Uuid(), + sa.ForeignKey('collection.id', ondelete='CASCADE'), + index=True, + nullable=False, + ), + sa.Column( + 'name', + sa.String(length=128), # could be only 64 long + index=True, + nullable=False, + ), + sa.Column( + 'state', + sa.LargeBinary(), + nullable=False, + ), + ) + + sa.Table( + 'item', + meta, + sa.Column( + 'id', + sa.Uuid(), + default=uuid.uuid4, + primary_key=True, + ), + sa.Column( + 'collection_id', + sa.Uuid(), + sa.ForeignKey("collection.id", ondelete='CASCADE'), + index=True, + nullable=False, + ), + sa.Column( + 'modified', + sa.DateTime(), + default=datetime.datetime.now, + onupdate=datetime.datetime.now, + nullable=False, + ), + sa.Column( + 'name', + sa.String(128), + index=True, + nullable=True, + ), + sa.Column( + 'data', + sa.LargeBinary(), + ), + sa.UniqueConstraint('collection_id', 'name'), + ) + + sa.Table( + 'item_history', + meta, + sa.Column( + 'id', + sa.Uuid(), + default=uuid.uuid4, + primary_key=True, + ), + sa.Column( + 'collection_id', + sa.Uuid(), + sa.ForeignKey("collection.id", ondelete='CASCADE'), + index=True, + nullable=False, + ), + sa.Column( + 'modified', + sa.DateTime(), + default=datetime.datetime.now, + onupdate=datetime.datetime.now, + nullable=False, + ), + sa.Column( + 'name', + sa.String(128), + index=True, + nullable=True, + ), + sa.Column( + 'etag', + sa.String(1024), + nullable=False, + ), + sa.Column( + 'history_etag', + sa.String(1024), + nullable=True, + ) + ) + + return meta + +def create(url: str, meta: sa.MetaData) -> Tuple[sa.engine.Engine, sa.engine.Row]: + engine = sa.create_engine(url) + meta.create_all(engine) + + collection = meta.tables['collection'] + with engine.begin() as connection: + select_root_collection = sa.select( + collection.c + ).select_from( + collection + ).where( + collection.c.parent_id == None, + ) + root_collection = connection.execute(select_root_collection).one_or_none() + if root_collection is None: + insert_root_collection = sa.insert( + collection, + ).values(parent_id=None).returning(collection.c) + root_collection = connection.execute(insert_root_collection).one() + + return engine, root_collection + +if __name__ == '__main__': + meta = create_meta() + engine = create('sqlite:///test.db', meta) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..8de0d39 --- /dev/null +++ b/setup.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from distutils.core import setup + +setup( + name='radicale-sql', + packages=['radicale_sql'], +)