Compare commits
2 commits
072feed372
...
9d023ffda0
Author | SHA1 | Date | |
---|---|---|---|
9d023ffda0 | |||
1906897159 |
2 changed files with 129 additions and 25 deletions
|
@ -5,6 +5,7 @@ import os
|
|||
import re
|
||||
import binascii
|
||||
import datetime
|
||||
import zoneinfo
|
||||
import uuid
|
||||
import string
|
||||
import itertools
|
||||
|
@ -36,6 +37,13 @@ PLUGIN_CONFIG_SCHEMA = {
|
|||
},
|
||||
}
|
||||
|
||||
class Item(radicale_item.Item):
|
||||
|
||||
def __init__(self, *args, last_modified: Optional[Union[str, datetime.datetime]]=None, **kwargs):
|
||||
if last_modified is not None and isinstance(last_modified, datetime.datetime):
|
||||
last_modified = last_modified.astimezone(tz=zoneinfo.ZoneInfo('GMT')).strftime('%a, %d %b %Y %H:%M:%S GMT')
|
||||
super().__init__(*args, last_modified=last_modified, **kwargs)
|
||||
|
||||
class Collection(BaseCollection):
|
||||
|
||||
def __init__(self, storage: "Storage", id: uuid.UUID, path: str):
|
||||
|
@ -50,8 +58,8 @@ class Collection(BaseCollection):
|
|||
def path(self) -> str:
|
||||
return self._path
|
||||
|
||||
def _row_to_item(self, row):
|
||||
return radicale_item.Item(
|
||||
def _row_to_item(self, row) -> "radicale_item.Item":
|
||||
return Item(
|
||||
collection=self,
|
||||
href=row.name,
|
||||
last_modified=row.modified,
|
||||
|
@ -95,7 +103,6 @@ class Collection(BaseCollection):
|
|||
).where(
|
||||
item_table.c.collection_id == self._id,
|
||||
)
|
||||
with self._storage._engine.begin() as connection:
|
||||
for row in connection.execute(select_stmt):
|
||||
yield self._row_to_item(row)
|
||||
|
||||
|
@ -299,7 +306,10 @@ class Collection(BaseCollection):
|
|||
isouter=True,
|
||||
),
|
||||
).where(
|
||||
sa.and_(
|
||||
item_history_table.c.collection_id == self._id,
|
||||
item_table.c.id == None,
|
||||
),
|
||||
)
|
||||
for row in connection.execute(select_stmt):
|
||||
yield row.name
|
||||
|
@ -322,6 +332,7 @@ class Collection(BaseCollection):
|
|||
# https://github.com/Kozea/Radicale/blob/6a56a6026f6ec463d6eb77da29e03c48c0c736c6/radicale/storage/multifilesystem/sync.py
|
||||
_prefix = 'http://radicale.org/ns/sync/'
|
||||
collection_state_table = self._storage._meta.tables['collection_state']
|
||||
|
||||
def check_token_name(token_name: str) -> bool:
|
||||
if len(token_name) != 64:
|
||||
return False
|
||||
|
@ -436,19 +447,18 @@ class BdayCollection(Collection):
|
|||
if r.match(v):
|
||||
return datetime.datetime.strptime(v, f)
|
||||
raise ValueError(f'cannot parse specified string {v}')
|
||||
|
||||
cal = vobject.iCalendar()
|
||||
if 'bday' not in o.contents:
|
||||
return None
|
||||
name = o.fn.value
|
||||
date = vobj_str2date(o.bday)
|
||||
if date.year <= 1900:
|
||||
date = date.replace(year=datetime.datetime.now().year)
|
||||
date_end = date + datetime.timedelta(days=1)
|
||||
|
||||
cal.add('vevent')
|
||||
cal.vevent_list[-1].add('uid').value = o.uid.value
|
||||
cal.vevent_list[-1].add('dtstamp').value = vobj_str2date(o.rev)
|
||||
cal.vevent_list[-1].add('summary').value = name
|
||||
cal.vevent_list[-1].add('summary').value = o.fn.value
|
||||
cal.vevent_list[-1].add('dtstart').value = date.date()
|
||||
cal.vevent_list[-1].add('dtend').value = date_end.date()
|
||||
cal.vevent_list[-1].add('rrule').value = 'FREQ=YEARLY'
|
||||
|
@ -459,7 +469,7 @@ class BdayCollection(Collection):
|
|||
if new_vobject is None:
|
||||
return None
|
||||
assert item.href is not None
|
||||
return radicale_item.Item(
|
||||
return Item(
|
||||
collection=self,
|
||||
href=item.href,
|
||||
#href=item.href + '.ics',
|
||||
|
@ -618,29 +628,15 @@ class Storage(BaseStorage):
|
|||
).where(
|
||||
aliases[-1].c.parent_id == None,
|
||||
)
|
||||
select_sub_stmt = None
|
||||
if depth != "0":
|
||||
aliased = select_collection_or_item.alias('data_list')
|
||||
select_sub_stmt = sa.select(
|
||||
aliased.c,
|
||||
).select_from(
|
||||
aliased.join(
|
||||
select_from,
|
||||
aliased.c.parent_id == aliases[0].c.id,
|
||||
),
|
||||
).where(
|
||||
aliases[-1].c.parent_id == None,
|
||||
)
|
||||
|
||||
l = []
|
||||
self_collection = connection.execute(select_stmt).one_or_none()
|
||||
|
||||
if self_collection is None:
|
||||
# None found
|
||||
return []
|
||||
if self_collection.type_ != 'collection':
|
||||
# Item found
|
||||
return [radicale_item.Item(
|
||||
return [Item(
|
||||
collection=self._get_collection(self_collection.parent_id, connection=connection),
|
||||
href=self_collection.name,
|
||||
last_modified=self_collection.modified,
|
||||
|
@ -652,6 +648,17 @@ class Storage(BaseStorage):
|
|||
l += [self_collection]
|
||||
# collection should list contents
|
||||
if depth != "0":
|
||||
sub_stmt_select_from = select_collection_or_item.alias()
|
||||
select_sub_stmt = sa.select(
|
||||
sub_stmt_select_from.c,
|
||||
).select_from(
|
||||
sub_stmt_select_from,
|
||||
).where(
|
||||
sa.and_(
|
||||
sub_stmt_select_from.c.parent_id == self_collection._id,
|
||||
sub_stmt_select_from.c.type_ == 'collection',
|
||||
),
|
||||
)
|
||||
for row in connection.execute(select_sub_stmt):
|
||||
path = '/'.join(path_parts)
|
||||
path += '/'
|
||||
|
|
97
test/test.py
Executable file
97
test/test.py
Executable file
|
@ -0,0 +1,97 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import uuid
|
||||
import asyncio
|
||||
import datetime
|
||||
import caldav
|
||||
import caldav.lib
|
||||
|
||||
_BASE_URL = 'http://localhost:5232/'
|
||||
_PASSWORD = 'test'
|
||||
_USER = 'test'
|
||||
|
||||
def test_caldav():
|
||||
client0 = None
|
||||
try:
|
||||
|
||||
# create calendars
|
||||
client0 = caldav.DAVClient(url=_BASE_URL, username='test', password='test')
|
||||
principal0 = client0.principal()
|
||||
calendar0 = principal0.make_calendar(name=f'test-calendar-{uuid.uuid4()}')
|
||||
print(f'calendar url = {calendar0.url}')
|
||||
|
||||
print(calendar0.events())
|
||||
calendar0_events = set([x.url for x in calendar0.events()])
|
||||
assert calendar0_events == set()
|
||||
|
||||
# create event and store it
|
||||
calendar0_events |= {calendar0.save_event(
|
||||
dtstart=datetime.datetime.now(),
|
||||
dtend=datetime.datetime.now() + datetime.timedelta(hours=1),
|
||||
summary='event0',
|
||||
).url}
|
||||
|
||||
# obtain sync token for first event
|
||||
calendar0_updates = calendar0.objects_by_sync_token()
|
||||
calendar0_token = calendar0_updates.sync_token
|
||||
assert set([x.url for x in calendar0_updates]) == calendar0_events, (set(calendar0_updates), set(calendar0_events))
|
||||
|
||||
# get changes with sync token (should give no difference)
|
||||
# do this for both calendars
|
||||
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
|
||||
assert set(calendar0_updates) == set()
|
||||
|
||||
# add another event to the calendar
|
||||
calendar0_events |= {calendar0.save_event(
|
||||
dtstart=datetime.datetime.now(),
|
||||
dtend=datetime.datetime.now() + datetime.timedelta(hours=1),
|
||||
summary='event0',
|
||||
).url}
|
||||
|
||||
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
|
||||
assert len(set(calendar0_updates)) == 1
|
||||
calendar0_token = calendar0_updates.sync_token
|
||||
|
||||
# check that sync token returns 0 updates
|
||||
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
|
||||
assert set(calendar0_updates) == set()
|
||||
|
||||
# update event
|
||||
calendar0_any_event = calendar0.event_by_url(list(calendar0_events)[0])
|
||||
calendar0_any_event.load()
|
||||
calendar0_any_event.vobject_instance.vevent_list[0].summary.value = 'event0-edit0'
|
||||
calendar0_any_event.save()
|
||||
|
||||
# check that we get the edited event
|
||||
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
|
||||
assert len(set(calendar0_updates)) == 1
|
||||
calendar0_token = calendar0_updates.sync_token
|
||||
|
||||
# delete this event
|
||||
calendar0_any_event = calendar0.event_by_url(list(calendar0_events)[0])
|
||||
calendar0_any_event.delete()
|
||||
|
||||
calendar0_updates = calendar0.objects_by_sync_token(calendar0_token)
|
||||
assert len(list(calendar0_updates)) == 1
|
||||
#assert list(calendar0_updates)[0].is_deleted, list(calendar0_updates)[0]
|
||||
try:
|
||||
list(calendar0_updates)[0].load()
|
||||
except caldav.lib.error.NotFoundError:
|
||||
pass
|
||||
else:
|
||||
assert False
|
||||
|
||||
#changes = calendar.objects_by_sync_token(load_objects=True)
|
||||
#token = changes.sync_token
|
||||
except:
|
||||
print('failed')
|
||||
raise
|
||||
else:
|
||||
print('success')
|
||||
finally:
|
||||
if client0 is not None:
|
||||
client0.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_caldav()
|
Loading…
Reference in a new issue