Compare commits

..

2 commits

Author SHA1 Message Date
a5ab43d4df
Add simple birthday calendar. 2022-12-06 21:13:56 +01:00
a4fff82636
Silence linter warnings. 2022-12-06 19:07:19 +01:00

View file

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
import re
import binascii import binascii
import datetime import datetime
import uuid import uuid
@ -17,6 +18,7 @@ import radicale.types
from radicale.storage import BaseStorage, BaseCollection from radicale.storage import BaseStorage, BaseCollection
from radicale.log import logger from radicale.log import logger
from radicale import item as radicale_item from radicale import item as radicale_item
import vobject
import sqlalchemy as sa import sqlalchemy as sa
from . import db from . import db
@ -27,6 +29,10 @@ PLUGIN_CONFIG_SCHEMA = {
'value': '', 'value': '',
'type': str, 'type': str,
}, },
'generate_birthday_calendars': {
'value': "False",
'type': bool,
},
}, },
} }
@ -387,6 +393,91 @@ class Collection(BaseCollection):
with self._storage._engine.begin() as c: with self._storage._engine.begin() as c:
return self._sync(connection=c, old_token=old_token) return self._sync(connection=c, old_token=old_token)
class BdayCollection(Collection):
R_FMT = (
(re.compile('^[0-9]{8}$'), '%Y%m%d'),
(re.compile('^--[0-9]{4}$'), '--%m%d'),
(re.compile('^[0-9]{4}-[0-9]{2}-[0-9]{2}$'), '%Y-%m-%d')
)
def __init__(self, storage: "Storage", id: uuid.UUID, path: str, birthday_source: uuid.UUID):
super().__init__(storage, id, path)
self._birthday_source = birthday_source
self._birthday_source_collection = Collection(storage, birthday_source, '') # TODO: ugly hack, get the correct path
def __repr__(self) -> str:
return f'BdayCollection(id={self._id}, path={self._path}, birthday_source={self._birthday_source})'
def _sync(self, *, connection, old_token: str = '') -> Tuple[str, Iterable[str]]:
return self._birthday_source_collection._sync(connection=connection, old_token=old_token)
def _to_calendar_entry(self, o: vobject.base.Component) -> Optional[vobject.base.Component]:
def vobj_str2date(content_line):
v = content_line.value
for r, f in self.R_FMT:
if r.match(v):
return datetime.datetime.strptime(v, f)
raise ValueError(f'cannot parse specified string {v}')
cal = vobject.iCalendar()
if 'bday' not in o.contents:
return None
name = o.fn.value
date = vobj_str2date(o.bday)
if date.year <= 1900:
date = date.replace(year=datetime.datetime.now().year)
date_end = date + datetime.timedelta(days=1)
cal.add('vevent')
cal.vevent_list[-1].add('summary').value = name
cal.vevent_list[-1].add('dtstart').value = date.date()
cal.vevent_list[-1].add('dtend').value = date_end.date()
cal.vevent_list[-1].add('rrule').value = 'FREQ=YEARLY'
return cal
def _item_transform(self, item: "radicale_item.Item") -> Optional["radicale_item.Item"]:
new_vobject = self._to_calendar_entry(item.vobject_item)
if new_vobject is None:
return None
new_vobject.add('uid').value = item.uid
return radicale_item.Item(
collection=self,
href=item.href,
last_modified=item.last_modified,
text=new_vobject.serialize().strip(),
)
def _get_multi(self, hrefs: Iterable[str], *, connection) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]:
l = []
for href, i in self._birthday_source_collection._get_multi(hrefs, connection=connection):
l += [(href, self._item_transform(i) if i is not None else None)]
return l
def _get_all(self, *, connection) -> Iterator["radicale_item.Item"]:
for i in self._birthday_source_collection._get_all(connection=connection):
ni = self._item_transform(i)
if ni is not None:
yield ni
def _upload(self, href: str, item: "radicale_item.Item", *, connection) -> "radicale_item.Item":
raise NotImplementedError
def _delete(self, *, connection, href: Optional[str] = None) -> None:
if href is not None:
raise NotImplementedError
super()._delete(connection=connection, href=href)
def _last_modified(self, *, connection) -> str:
return self._birthday_source_collection._last_modified(connection=connection)
def create_collection(*args, birthday_source: Optional[uuid.UUID], **kwargs):
if birthday_source is not None:
c = BdayCollection
kwargs['birthday_source'] = birthday_source
else:
c = Collection
return c(*args, **kwargs)
class Storage(BaseStorage): class Storage(BaseStorage):
def __init__(self, configuration: "radicale.config.Configuration"): def __init__(self, configuration: "radicale.config.Configuration"):
@ -404,31 +495,30 @@ class Storage(BaseStorage):
def _discover(self, path: str, *, connection, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]: def _discover(self, path: str, *, connection, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]:
if path == '/': if path == '/':
return [Collection(self, self._root_collection.id, '')] return [create_collection(self, self._root_collection.id, '', birthday_source=None)]
path_parts = self._split_path(path) path_parts = self._split_path(path)
collection_table = self._meta.tables['collection'] collection_table = self._meta.tables['collection']
collection_metadata_table = self._meta.tables['collection_metadata']
item_table = self._meta.tables['item'] item_table = self._meta.tables['item']
# TODO: rename variable, it only selects collections now
select_collection_or_item = sa.select( select_collection_or_item = sa.select(
collection_table.c.id, collection_table.c.id,
collection_table.c.parent_id.label('parent_id'), collection_table.c.parent_id.label('parent_id'),
collection_table.c.modified, collection_table.c.modified,
collection_table.c.name, collection_table.c.name,
sa.literal(None, sa.LargeBinary()).label('data'), collection_metadata_table.c.value.cast(sa.Uuid()).label('birthday_source'),
sa.literal('collection', sa.String(16)).label('type_'),
).select_from( ).select_from(
collection_table collection_table.join(
).union_all(sa.select( collection_metadata_table,
item_table.c.id, sa.and_(
item_table.c.collection_id.label('parent_id'), collection_table.c.id == collection_metadata_table.c.collection_id,
item_table.c.modified, collection_metadata_table.c.key == 'birthday_source',
item_table.c.name, ),
item_table.c.data, isouter=True,
sa.literal('item', sa.String(16)).label('type_'), )
).select_from( )
item_table
))
i = 0 i = 0
select_from = select_collection_or_item.alias('data') select_from = select_collection_or_item.alias('data')
aliases = [select_from] aliases = [select_from]
@ -467,18 +557,15 @@ class Storage(BaseStorage):
self_collection = connection.execute(select_stmt).one_or_none() self_collection = connection.execute(select_stmt).one_or_none()
if self_collection is None: if self_collection is None:
return [] return []
self_collection = Collection(self, self_collection.id, '/'.join(path_parts)) self_collection = create_collection(self, self_collection.id, '/'.join(path_parts), birthday_source=self_collection.birthday_source)
l += [self_collection] l += [self_collection]
if select_sub_stmt is not None: if select_sub_stmt is not None:
for row in connection.execute(select_sub_stmt): for row in connection.execute(select_sub_stmt):
path = '/'.join(path_parts) path = '/'.join(path_parts)
path += '/' path += '/'
path += row.name path += row.name
if row.type_ == 'collection': l += [create_collection(self, row.id, path, birthday_source=row.birthday_source)]
l += [Collection(self, row.id, path)] l += list(self_collection._get_all(connection=connection))
elif row.type_ == 'item':
assert self_collection is not None
l += [self_collection._row_to_item(row)]
return l return l
def discover(self, path: str, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]: def discover(self, path: str, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]:
@ -518,6 +605,23 @@ class Storage(BaseStorage):
with self._engine.begin() as c: with self._engine.begin() as c:
return self._move(item, to_collection, to_href, connection=c) return self._move(item, to_collection, to_href, connection=c)
def _create_bday_calendar(
self,
href: str,
*,
connection,
address_book: "BaseCollection",
address_props: Optional[Mapping[str, str]]=None,
) -> "BaseCollection":
assert isinstance(address_book, Collection)
calendar_props = {
**(address_props or {}),
'birthday_source': str(address_book._id),
'tag': 'VCALENDAR',
}
calendar = self._create_collection(href, connection=connection, props=calendar_props)
return calendar
def _create_collection( def _create_collection(
self, self,
href: str, href: str,
@ -526,6 +630,7 @@ class Storage(BaseStorage):
items: Optional[Iterable["radicale_item.Item"]]=None, items: Optional[Iterable["radicale_item.Item"]]=None,
props: Optional[Mapping[str, str]]=None, props: Optional[Mapping[str, str]]=None,
) -> "BaseCollection": ) -> "BaseCollection":
logger.debug('create_collection: %s, %s, %s', href, items, props)
path = self._split_path(href) path = self._split_path(href)
parent_id = self._root_collection.id parent_id = self._root_collection.id
collection_table = self._meta.tables['collection'] collection_table = self._meta.tables['collection']
@ -581,10 +686,19 @@ class Storage(BaseStorage):
).values([dict(collection_id=parent_id, key=k, value=v) for k, v in props.items()]) ).values([dict(collection_id=parent_id, key=k, value=v) for k, v in props.items()])
connection.execute(insert_stmt) connection.execute(insert_stmt)
c = Collection(self, parent_id, '/'.join(path)) c = Collection(self, parent_id, '/'.join(path))
if props is not None and 'key' in props and items is not None: if self.configuration.get('storage', 'generate_birthday_calendars') \
and props is not None and props.get('tag') == 'VADDRESSBOOK':
bday_href = '/'.join(path[:-1] + [str(uuid.uuid4())])
bday_href = f'/{bday_href}/'
self._create_bday_calendar(bday_href, connection=connection, address_book=c, address_props=props)
if props is not None and 'tag' in props and items is not None:
suffix = '.bin'
if props['tag'] == 'VADDRESSBOOK':
suffix = '.vcf'
elif props['tag'] == 'VCALENDAR':
suffix = '.ics'
for i in items: for i in items:
assert i.href is not None c._upload(i.uid + suffix, i, connection=connection)
c._upload(i.href, i, connection=connection)
return c return c
def create_collection( def create_collection(
@ -599,6 +713,7 @@ class Storage(BaseStorage):
@radicale.types.contextmanager @radicale.types.contextmanager
def acquire_lock(self, mod: str, user: str = "") -> Iterator[None]: def acquire_lock(self, mod: str, user: str = "") -> Iterator[None]:
_ = mod, user
yield yield
def _verify(self, *, connection) -> bool: def _verify(self, *, connection) -> bool: