Fix sync.

This commit is contained in:
redxef 2022-12-06 23:42:15 +01:00
parent 075c06f428
commit aeda06de59
Signed by: redxef
GPG key ID: 7DAC3AA211CBD921

View file

@ -137,8 +137,10 @@ class Collection(BaseCollection):
)
if connection.execute(select_stmt).one_or_none() is None:
connection.execute(insert_stmt)
self._storage._collection_updated(self._id, connection=connection)
else:
connection.execute(update_stmt)
self._storage._item_updated(self._id, href, connection=connection)
self._update_history_etag(href, item, connection=connection)
res = list(self._get_multi([href], connection=connection))[0][1]
assert res is not None
@ -166,6 +168,7 @@ class Collection(BaseCollection):
item_table.c.name == href,
),
)
self._storage._item_updated(self._id, href, connection=connection)
connection.execute(delete_stmt)
def delete(self, href: Optional[str] = None) -> None:
@ -209,6 +212,7 @@ class Collection(BaseCollection):
).values([dict(collection_id=self._id, key=k, value=v) for k, v in props.items()])
connection.execute(delete_stmt)
connection.execute(insert_stmt)
self._storage._collection_updated(self._id, connection=connection)
def set_meta(self, props: Mapping[str, str]) -> None:
with self._storage._engine.begin() as c:
@ -255,6 +259,7 @@ class Collection(BaseCollection):
history_etag = binascii.hexlify(os.urandom(16)).decode('ascii')
etag = item.etag if item else ''
if etag != cache_etag:
history_etag = radicale_item.get_etag(history_etag + '/' + etag).strip('\"')
if exists:
upsert = sa.update(
item_history_table,
@ -297,7 +302,7 @@ class Collection(BaseCollection):
item_table.c.id == None,
)
for row in connection.execute(select_stmt):
yield row.href
yield row.name
def _delete_history_refs(self, *, connection):
item_history_table = self._storage._meta.tables['item_history']
@ -310,8 +315,7 @@ class Collection(BaseCollection):
item_history_table.c.modified < (datetime.datetime.now() - datetime.timedelta(seconds=self._storage.configuration.get('storage', 'max_sync_token_age')))
),
)
with self._storage._engine.begin() as connection:
connection.execute(delete_stmt)
connection.execute(delete_stmt)
def _sync(self, *, connection, old_token: str = '') -> Tuple[str, Iterable[str]]:
# Parts of this method have been taken from
@ -333,15 +337,21 @@ class Collection(BaseCollection):
old_token_name = old_token[len(_prefix):]
if not check_token_name(old_token_name):
raise ValueError(f'Malformed token: {old_token}')
state = {}
token_name_hash = sha256()
# compute new state
state = {}
token_name_hash = sha256()
for href, item in itertools.chain(
((item.href, item) for item in self._get_all(connection=connection)),
((href, None) for href in self._get_deleted_history_refs(connection=connection))
):
assert isinstance(href, str)
if href in state:
# we don't want to overwrite states
# this could happen with another storage collection
# which doesn't store the items itself, but
# derives them from another one
continue
history_etag = self._update_history_etag(href, item, connection=connection)
state[href] = history_etag
token_name_hash.update((href + '/' + history_etag).encode())
@ -365,19 +375,27 @@ class Collection(BaseCollection):
collection_state_table.c.name == old_token_name,
),
)
state_row = connection.execute(select_stmt).one()
state = json.loads(state_row.state.decode())
state_row = connection.execute(select_stmt).one_or_none()
state = json.loads(state_row.state.decode()) if state_row is not None else {}
# store new state
## should never be a duplicate
insert_stmt = sa.insert(
select_new_state = sa.select(
collection_state_table.c,
).select_from(
collection_state_table,
).values(
collection_id=self._id,
name=token_name,
state=json.dumps(state).encode(),
).where(
collection_state_table.c.collection_id == self._id,
collection_state_table.c.name == token_name,
)
connection.execute(insert_stmt)
if connection.execute(select_new_state).one_or_none() is None:
insert_stmt = sa.insert(
collection_state_table,
).values(
collection_id=self._id,
name=token_name,
state=json.dumps(state).encode(),
)
connection.execute(insert_stmt)
changes = []
for href, history_etag in state.items():
@ -409,9 +427,6 @@ class BdayCollection(Collection):
def __repr__(self) -> str:
return f'BdayCollection(id={self._id}, path={self._path}, birthday_source={self._birthday_source})'
def _sync(self, *, connection, old_token: str = '') -> Tuple[str, Iterable[str]]:
return self._birthday_source_collection._sync(connection=connection, old_token=old_token)
def _to_calendar_entry(self, o: vobject.base.Component) -> Optional[vobject.base.Component]:
def vobj_str2date(content_line):
v = content_line.value
@ -440,9 +455,11 @@ class BdayCollection(Collection):
if new_vobject is None:
return None
new_vobject.add('uid').value = item.uid
assert item.href is not None
return radicale_item.Item(
collection=self,
href=item.href,
#href=item.href + '.ics',
last_modified=item.last_modified,
text=new_vobject.serialize().strip(),
)
@ -515,6 +532,30 @@ class Storage(BaseStorage):
# TODO: path
return create_collection(self, id, '', birthday_source=row.birthday_source)
def _collection_updated(self, collection_id, *, connection):
collection_table = self._meta.tables['collection']
connection.execute(sa.update(
collection_table,
).values(
modified=datetime.datetime.now(),
).where(
collection_table.c.id == collection_id,
))
def _item_updated(self, collection_id: uuid.UUID, href: str, *, connection):
item_table = self._meta.tables['item']
item_row = connection.execute(sa.update(
item_table,
).values(
modified=datetime.datetime.now(),
).where(
sa.and_(
item_table.c.collection_id == collection_id,
item_table.c.name == href,
),
).returning(item_table.c)).one()
self._collection_updated(item_row.collection_id, connection=connection)
def _discover(self, path: str, *, connection, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]:
if path == '/':
return [create_collection(self, self._root_collection.id, '', birthday_source=None)]
@ -590,25 +631,30 @@ class Storage(BaseStorage):
l = []
self_collection = connection.execute(select_stmt).one_or_none()
if self_collection is None:
# None found
return []
if self_collection.type_ != 'collection':
# Item found
return [radicale_item.Item(
collection=self._get_collection(self_collection.parent_id, connection=connection),
href=self_collection.name,
last_modified=self_collection.modified,
text=self_collection.data.decode(),
)]
# collection found
self_collection = create_collection(self, self_collection.id, '/'.join(path_parts), birthday_source=self_collection.birthday_source)
l += [self_collection]
if select_sub_stmt is not None:
# collection should list contents
if depth != "0":
for row in connection.execute(select_sub_stmt):
path = '/'.join(path_parts)
path += '/'
path += row.name
l += [create_collection(self, row.id, path, birthday_source=row.birthday_source)]
l += list(self_collection._get_all(connection=connection))
print(';;;; discovered items')
return l
def discover(self, path: str, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]:
@ -643,6 +689,8 @@ class Storage(BaseStorage):
)
connection.execute(delete_stmt)
connection.execute(update_stmt)
self._collection_updated(src_collection_id, connection=connection)
self._collection_updated(dst_collection_id, connection=connection)
to_collection._update_history_etag(to_href, item, connection=connection)
assert item.href is not None
item.collection._update_history_etag(item.href, None, connection=connection)