Compare commits

..

2 commits

Author SHA1 Message Date
9d023ffda0
Add integrationtest script. 2022-12-07 17:54:42 +01:00
1906897159
Fix return deleted items of other collections in sync.
Fix invalid type (must be str, was datetime.datetime) of last_modified.
2022-12-07 17:53:49 +01:00
2 changed files with 129 additions and 25 deletions

View file

@ -5,6 +5,7 @@ import os
import re
import binascii
import datetime
import zoneinfo
import uuid
import string
import itertools
@ -36,6 +37,13 @@ PLUGIN_CONFIG_SCHEMA = {
},
}
class Item(radicale_item.Item):
def __init__(self, *args, last_modified: Optional[Union[str, datetime.datetime]]=None, **kwargs):
if last_modified is not None and isinstance(last_modified, datetime.datetime):
last_modified = last_modified.astimezone(tz=zoneinfo.ZoneInfo('GMT')).strftime('%a, %d %b %Y %H:%M:%S GMT')
super().__init__(*args, last_modified=last_modified, **kwargs)
class Collection(BaseCollection):
def __init__(self, storage: "Storage", id: uuid.UUID, path: str):
@ -50,8 +58,8 @@ class Collection(BaseCollection):
def path(self) -> str:
return self._path
def _row_to_item(self, row):
return radicale_item.Item(
def _row_to_item(self, row) -> "radicale_item.Item":
return Item(
collection=self,
href=row.name,
last_modified=row.modified,
@ -95,9 +103,8 @@ class Collection(BaseCollection):
).where(
item_table.c.collection_id == self._id,
)
with self._storage._engine.begin() as connection:
for row in connection.execute(select_stmt):
yield self._row_to_item(row)
for row in connection.execute(select_stmt):
yield self._row_to_item(row)
def get_all(self) -> Iterator["radicale_item.Item"]:
with self._storage._engine.begin() as c:
@ -299,7 +306,10 @@ class Collection(BaseCollection):
isouter=True,
),
).where(
item_table.c.id == None,
sa.and_(
item_history_table.c.collection_id == self._id,
item_table.c.id == None,
),
)
for row in connection.execute(select_stmt):
yield row.name
@ -322,6 +332,7 @@ class Collection(BaseCollection):
# https://github.com/Kozea/Radicale/blob/6a56a6026f6ec463d6eb77da29e03c48c0c736c6/radicale/storage/multifilesystem/sync.py
_prefix = 'http://radicale.org/ns/sync/'
collection_state_table = self._storage._meta.tables['collection_state']
def check_token_name(token_name: str) -> bool:
if len(token_name) != 64:
return False
@ -436,19 +447,18 @@ class BdayCollection(Collection):
if r.match(v):
return datetime.datetime.strptime(v, f)
raise ValueError(f'cannot parse specified string {v}')
cal = vobject.iCalendar()
if 'bday' not in o.contents:
return None
name = o.fn.value
date = vobj_str2date(o.bday)
if date.year <= 1900:
date = date.replace(year=datetime.datetime.now().year)
date_end = date + datetime.timedelta(days=1)
cal.add('vevent')
cal.vevent_list[-1].add('uid').value = o.uid.value
cal.vevent_list[-1].add('dtstamp').value = vobj_str2date(o.rev)
cal.vevent_list[-1].add('summary').value = name
cal.vevent_list[-1].add('summary').value = o.fn.value
cal.vevent_list[-1].add('dtstart').value = date.date()
cal.vevent_list[-1].add('dtend').value = date_end.date()
cal.vevent_list[-1].add('rrule').value = 'FREQ=YEARLY'
@ -459,7 +469,7 @@ class BdayCollection(Collection):
if new_vobject is None:
return None
assert item.href is not None
return radicale_item.Item(
return Item(
collection=self,
href=item.href,
#href=item.href + '.ics',
@ -618,29 +628,15 @@ class Storage(BaseStorage):
).where(
aliases[-1].c.parent_id == None,
)
select_sub_stmt = None
if depth != "0":
aliased = select_collection_or_item.alias('data_list')
select_sub_stmt = sa.select(
aliased.c,
).select_from(
aliased.join(
select_from,
aliased.c.parent_id == aliases[0].c.id,
),
).where(
aliases[-1].c.parent_id == None,
)
l = []
self_collection = connection.execute(select_stmt).one_or_none()
if self_collection is None:
# None found
return []
if self_collection.type_ != 'collection':
# Item found
return [radicale_item.Item(
return [Item(
collection=self._get_collection(self_collection.parent_id, connection=connection),
href=self_collection.name,
last_modified=self_collection.modified,
@ -652,6 +648,17 @@ class Storage(BaseStorage):
l += [self_collection]
# collection should list contents
if depth != "0":
sub_stmt_select_from = select_collection_or_item.alias()
select_sub_stmt = sa.select(
sub_stmt_select_from.c,
).select_from(
sub_stmt_select_from,
).where(
sa.and_(
sub_stmt_select_from.c.parent_id == self_collection._id,
sub_stmt_select_from.c.type_ == 'collection',
),
)
for row in connection.execute(select_sub_stmt):
path = '/'.join(path_parts)
path += '/'

97
test/test.py Executable file
View file

@ -0,0 +1,97 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import uuid
import asyncio
import datetime
import caldav
import caldav.lib
_BASE_URL = 'http://localhost:5232/'
_PASSWORD = 'test'
_USER = 'test'
def test_caldav():
client0 = None
try:
# create calendars
client0 = caldav.DAVClient(url=_BASE_URL, username='test', password='test')
principal0 = client0.principal()
calendar0 = principal0.make_calendar(name=f'test-calendar-{uuid.uuid4()}')
print(f'calendar url = {calendar0.url}')
print(calendar0.events())
calendar0_events = set([x.url for x in calendar0.events()])
assert calendar0_events == set()
# create event and store it
calendar0_events |= {calendar0.save_event(
dtstart=datetime.datetime.now(),
dtend=datetime.datetime.now() + datetime.timedelta(hours=1),
summary='event0',
).url}
# obtain sync token for first event
calendar0_updates = calendar0.objects_by_sync_token()
calendar0_token = calendar0_updates.sync_token
assert set([x.url for x in calendar0_updates]) == calendar0_events, (set(calendar0_updates), set(calendar0_events))
# get changes with sync token (should give no difference)
# do this for both calendars
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
assert set(calendar0_updates) == set()
# add another event to the calendar
calendar0_events |= {calendar0.save_event(
dtstart=datetime.datetime.now(),
dtend=datetime.datetime.now() + datetime.timedelta(hours=1),
summary='event0',
).url}
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
assert len(set(calendar0_updates)) == 1
calendar0_token = calendar0_updates.sync_token
# check that sync token returns 0 updates
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
assert set(calendar0_updates) == set()
# update event
calendar0_any_event = calendar0.event_by_url(list(calendar0_events)[0])
calendar0_any_event.load()
calendar0_any_event.vobject_instance.vevent_list[0].summary.value = 'event0-edit0'
calendar0_any_event.save()
# check that we get the edited event
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
assert len(set(calendar0_updates)) == 1
calendar0_token = calendar0_updates.sync_token
# delete this event
calendar0_any_event = calendar0.event_by_url(list(calendar0_events)[0])
calendar0_any_event.delete()
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
assert len(list(calendar0_updates)) == 1
#assert list(calendar0_updates)[0].is_deleted, list(calendar0_updates)[0]
try:
list(calendar0_updates)[0].load()
except caldav.lib.error.NotFoundError:
pass
else:
assert False
#changes = calendar.objects_by_sync_token(load_objects=True)
#token = changes.sync_token
except:
print('failed')
raise
else:
print('success')
finally:
if client0 is not None:
client0.close()
if __name__ == '__main__':
test_caldav()