Compare commits

..

No commits in common. "1213e469f644bf699b804168d511f30ee9ab374a" and "4debf553e780308803cf435045902340bf82b0d5" have entirely different histories.

View file

@ -52,7 +52,7 @@ class Collection(BaseCollection):
text=row.data.decode(), text=row.data.decode(),
) )
def _get_multi(self, hrefs: Iterable[str], *, connection) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]: def get_multi(self, hrefs: Iterable[str]) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]:
item_table = self._storage._meta.tables['item'] item_table = self._storage._meta.tables['item']
hrefs_ = list(hrefs) hrefs_ = list(hrefs)
#hrefs_ = [(x,) for x in hrefs] #hrefs_ = [(x,) for x in hrefs]
@ -69,18 +69,15 @@ class Collection(BaseCollection):
), ),
) )
l = [] l = []
for row in connection.execute(select_stmt): with self._storage._engine.begin() as connection:
l += [(row.name, self._row_to_item(row))] for row in connection.execute(select_stmt):
l += [(row.name, self._row_to_item(row))]
hrefs_set = set(hrefs_) hrefs_set = set(hrefs_)
hrefs_set_have = set([x[0] for x in l]) hrefs_set_have = set([x[0] for x in l])
l += [(x, None) for x in (hrefs_set - hrefs_set_have)] l += [(x, None) for x in (hrefs_set - hrefs_set_have)]
return l return l
def get_multi(self, hrefs: Iterable[str]) -> Iterable[Tuple[str, Optional["radicale_item.Item"]]]: def get_all(self) -> Iterator["radicale_item.Item"]:
with self._storage._engine.begin() as c:
return self._get_multi(hrefs=hrefs, connection=c)
def _get_all(self, *, connection) -> Iterator["radicale_item.Item"]:
item_table = self._storage._meta.tables['item'] item_table = self._storage._meta.tables['item']
select_stmt = sa.select( select_stmt = sa.select(
item_table.c, item_table.c,
@ -93,13 +90,7 @@ class Collection(BaseCollection):
for row in connection.execute(select_stmt): for row in connection.execute(select_stmt):
yield self._row_to_item(row) yield self._row_to_item(row)
def get_all(self) -> Iterator["radicale_item.Item"]: def upload(self, href: str, item: "radicale_item.Item") -> "radicale_item.Item":
with self._storage._engine.begin() as c:
for i in self._get_all(connection=c):
yield i
def _upload(self, href: str, item: "radicale_item.Item", *, connection) -> "radicale_item.Item":
item_table = self._storage._meta.tables['item'] item_table = self._storage._meta.tables['item']
item_serialized = item.serialize().encode() item_serialized = item.serialize().encode()
@ -130,19 +121,16 @@ class Collection(BaseCollection):
item_table.c.name == href, item_table.c.name == href,
), ),
) )
if connection.execute(select_stmt).one_or_none() is None: with self._storage._engine.begin() as connection:
connection.execute(insert_stmt) if connection.execute(select_stmt).one_or_none() is None:
else: connection.execute(insert_stmt)
connection.execute(update_stmt) else:
res = list(self._get_multi([href], connection=connection))[0][1] connection.execute(update_stmt)
res = list(self.get_multi([href]))[0][1]
assert res is not None assert res is not None
return res return res
def upload(self, href: str, item: "radicale_item.Item") -> "radicale_item.Item": def delete(self, href: Optional[str] = None) -> None:
with self._storage._engine.begin() as c:
return self._upload(href, item, connection=c)
def _delete(self, *, connection, href: Optional[str] = None) -> None:
collection_table = self._storage._meta.tables['collection'] collection_table = self._storage._meta.tables['collection']
item_table = self._storage._meta.tables['item'] item_table = self._storage._meta.tables['item']
if href is None: if href is None:
@ -160,13 +148,10 @@ class Collection(BaseCollection):
item_table.c.name == href, item_table.c.name == href,
), ),
) )
connection.execute(delete_stmt) with self._storage._engine.begin() as connection:
connection.execute(delete_stmt)
def delete(self, href: Optional[str] = None) -> None: def get_meta(self, key: Optional[str] = None) -> Union[Mapping[str, str], Optional[str]]:
with self._storage._engine.begin() as c:
return self._delete(connection=c, href=href)
def _get_meta(self, *, connection, key: Optional[str] = None) -> Union[Mapping[str, str], Optional[str]]:
collection_metadata = self._storage._meta.tables['collection_metadata'] collection_metadata = self._storage._meta.tables['collection_metadata']
select_meta = sa.select( select_meta = sa.select(
collection_metadata.c.key, collection_metadata.c.key,
@ -181,17 +166,14 @@ class Collection(BaseCollection):
collection_metadata.c.key == key, collection_metadata.c.key == key,
) )
metadata = {} metadata = {}
for row in connection.execute(select_meta): with self._storage._engine.begin() as connection:
metadata[row.key] = row.value for row in connection.execute(select_meta):
metadata[row.key] = row.value
if key is not None: if key is not None:
return metadata.get(key) return metadata.get(key)
return metadata return metadata
def get_meta(self, key: Optional[str] = None) -> Union[Mapping[str, str], Optional[str]]: def set_meta(self, props: Mapping[str, str]) -> None:
with self._storage._engine.begin() as c:
return self._get_meta(connection=c, key=key)
def _set_meta(self, props: Mapping[str, str], *, connection) -> None:
collection_metadata = self._storage._meta.tables['collection_metadata'] collection_metadata = self._storage._meta.tables['collection_metadata']
delete_stmt = sa.delete( delete_stmt = sa.delete(
collection_metadata, collection_metadata,
@ -201,14 +183,12 @@ class Collection(BaseCollection):
insert_stmt = sa.insert( insert_stmt = sa.insert(
collection_metadata, collection_metadata,
).values([dict(collection_id=self._id, key=k, value=v) for k, v in props.items()]) ).values([dict(collection_id=self._id, key=k, value=v) for k, v in props.items()])
connection.execute(delete_stmt) with self._storage._engine.begin() as connection:
connection.execute(insert_stmt) connection.execute(delete_stmt)
connection.execute(insert_stmt)
def set_meta(self, props: Mapping[str, str]) -> None: @property
with self._storage._engine.begin() as c: def last_modified(self) -> str:
return self._set_meta(props, connection=c)
def _last_modified(self, *, connection) -> str:
collection = self._storage._meta.tables['collection'] collection = self._storage._meta.tables['collection']
select_stmt = sa.select( select_stmt = sa.select(
collection.c.modified, collection.c.modified,
@ -217,15 +197,11 @@ class Collection(BaseCollection):
).where( ).where(
collection.c.id == self._id, collection.c.id == self._id,
) )
c = connection.execute(select_stmt).one() with self._storage._engine.begin() as connection:
c = connection.execute(select_stmt).one()
return c.modified.strftime('%a, %d %b %Y %H:%M:%S GMT') return c.modified.strftime('%a, %d %b %Y %H:%M:%S GMT')
@property
def last_modified(self):
with self._storage._engine.begin() as c:
return self._last_modified(connection=c)
def _update_history_etag(self, href: str, item: Optional["radicale_item.Item"], *, connection) -> str: def _update_history_etag(self, href: str, item: Optional["radicale_item.Item"]) -> str:
item_history_table = self._storage._meta.tables['item_history'] item_history_table = self._storage._meta.tables['item_history']
select_etag_stmt = sa.select( select_etag_stmt = sa.select(
item_history_table.c, item_history_table.c,
@ -238,42 +214,43 @@ class Collection(BaseCollection):
), ),
) )
exists: bool exists: bool
item_history = connection.execute(select_etag_stmt).one_or_none() with self._storage._engine.begin() as connection:
if item_history is not None: item_history = connection.execute(select_etag_stmt).one_or_none()
exists = True if item_history is not None:
cache_etag = item_history.etag, exists = True
history_etag = item_history.history_etag cache_etag = item_history.etag,
else: history_etag = item_history.history_etag
exists = False
cache_etag = ''
history_etag = binascii.hexlify(os.urandom(16)).decode('ascii')
etag = item.etag if item else ''
if etag != cache_etag:
if exists:
upsert = sa.update(
item_history_table,
).values(
etag=etag,
history_etag=history_etag,
).where(
sa.and_(
item_history_table.c.collection_id == self._id,
item_history_table.c.name == href,
),
)
else: else:
upsert = sa.insert( exists = False
item_history_table, cache_etag = ''
).values( history_etag = binascii.hexlify(os.urandom(16)).decode('ascii')
collection_id=self._id, etag = item.etag if item else ''
name=href, if etag != cache_etag:
etag=etag, if exists:
history_etag=history_etag, upsert = sa.update(
) item_history_table,
connection.execute(upsert) ).values(
etag=etag,
history_etag=history_etag,
).where(
sa.and_(
item_history_table.c.collection_id == self._id,
item_history_table.c.name == href,
),
)
else:
upsert = sa.insert(
item_history_table,
).values(
collection_id=self._id,
name=href,
etag=etag,
history_etag=history_etag,
)
connection.execute(upsert)
return history_etag return history_etag
def _get_deleted_history_refs(self, *, connection): def _get_deleted_history_refs(self):
item_table = self._storage._meta.tables['item'] item_table = self._storage._meta.tables['item']
item_history_table = self._storage._meta.tables['item_history'] item_history_table = self._storage._meta.tables['item_history']
select_stmt = sa.select( select_stmt = sa.select(
@ -290,16 +267,17 @@ class Collection(BaseCollection):
).where( ).where(
item_table.c.id == None, item_table.c.id == None,
) )
for row in connection.execute(select_stmt): with self._storage._engine.begin() as connection:
yield row.href for row in connection.execute(select_stmt):
yield row.href
def _delete_history_refs(self, *, connection): def _delete_history_refs(self):
item_history_table = self._storage._meta.tables['item_history'] item_history_table = self._storage._meta.tables['item_history']
delete_stmt = sa.delete( delete_stmt = sa.delete(
item_history_table, item_history_table,
).where( ).where(
sa.and_( sa.and_(
item_history_table.c.href.in_(list(self._get_deleted_history_refs(connection=connection))), item_history_table.c.href.in_(list(self._get_deleted_history_refs())),
item_history_table.c.collection_id == self._id, item_history_table.c.collection_id == self._id,
item_history_table.c.modified < (datetime.datetime.now() - datetime.timedelta(seconds=self._storage.configuration.get('storage', 'max_sync_token_age'))) item_history_table.c.modified < (datetime.datetime.now() - datetime.timedelta(seconds=self._storage.configuration.get('storage', 'max_sync_token_age')))
), ),
@ -307,10 +285,12 @@ class Collection(BaseCollection):
with self._storage._engine.begin() as connection: with self._storage._engine.begin() as connection:
connection.execute(delete_stmt) connection.execute(delete_stmt)
def _sync(self, *, connection, old_token: str = '') -> Tuple[str, Iterable[str]]: def sync(self, old_token: str = '') -> Tuple[str, Iterable[str]]:
_prefix = 'http://radicale.org/ns/sync/' _prefix = 'http://radicale.org/ns/sync/'
collection_state_table = self._storage._meta.tables['collection_state'] collection_state_table = self._storage._meta.tables['collection_state']
def check_token_name(token_name: str) -> bool: def check_token_name(token_name: str) -> bool:
print(token_name)
print(len(token_name))
if len(token_name) != 64: if len(token_name) != 64:
return False return False
for c in token_name: for c in token_name:
@ -331,10 +311,10 @@ class Collection(BaseCollection):
# compute new state # compute new state
for href, item in itertools.chain( for href, item in itertools.chain(
((item.href, item) for item in self.get_all()), ((item.href, item) for item in self.get_all()),
((href, None) for href in self._get_deleted_history_refs(connection=connection)) ((href, None) for href in self._get_deleted_history_refs())
): ):
assert isinstance(href, str) assert isinstance(href, str)
history_etag = self._update_history_etag(href, item, connection=connection) history_etag = self._update_history_etag(href, item)
state[href] = history_etag state[href] = history_etag
token_name_hash.update((href + '/' + history_etag).encode()) token_name_hash.update((href + '/' + history_etag).encode())
token_name = token_name_hash.hexdigest() token_name = token_name_hash.hexdigest()
@ -346,30 +326,31 @@ class Collection(BaseCollection):
# load old state # load old state
old_state = {} old_state = {}
if old_token_name: with self._storage._engine.begin() as connection:
select_stmt = sa.select( if old_token_name:
collection_state_table.c, select_stmt = sa.select(
).select_from( collection_state_table.c,
).select_from(
collection_state_table,
).where(
sa.and_(
collection_state_table.c.collection_id == self._id,
collection_state_table.c.name == old_token_name,
),
)
state_row = connection.execute(select_stmt).one()
state = json.loads(state_row.state.decode())
# store new state
## should never be a duplicate
insert_stmt = sa.insert(
collection_state_table, collection_state_table,
).where( ).values(
sa.and_( collection_id=self._id,
collection_state_table.c.collection_id == self._id, name=token_name,
collection_state_table.c.name == old_token_name, state=json.dumps(state).encode(),
),
) )
state_row = connection.execute(select_stmt).one() connection.execute(insert_stmt)
state = json.loads(state_row.state.decode())
# store new state
## should never be a duplicate
insert_stmt = sa.insert(
collection_state_table,
).values(
collection_id=self._id,
name=token_name,
state=json.dumps(state).encode(),
)
connection.execute(insert_stmt)
changes = [] changes = []
for href, history_etag in state.items(): for href, history_etag in state.items():
@ -381,10 +362,6 @@ class Collection(BaseCollection):
return token, changes return token, changes
def sync(self, old_token: str = '') -> Tuple[str, Iterable[str]]:
with self._storage._engine.begin() as c:
return self._sync(connection=c, old_token=old_token)
class Storage(BaseStorage): class Storage(BaseStorage):
def __init__(self, configuration: "radicale.config.Configuration"): def __init__(self, configuration: "radicale.config.Configuration"):
@ -400,7 +377,8 @@ class Storage(BaseStorage):
path_parts = path_parts[:-1] path_parts = path_parts[:-1]
return path_parts return path_parts
def _discover(self, path: str, *, connection, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]: def discover(self, path: str, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]:
logger.info("path = %s, depth = %s", path, depth)
if path == '/': if path == '/':
return [Collection(self, self._root_collection.id, '')] return [Collection(self, self._root_collection.id, '')]
path_parts = self._split_path(path) path_parts = self._split_path(path)
@ -462,128 +440,27 @@ class Storage(BaseStorage):
) )
l = [] l = []
self_collection = connection.execute(select_stmt).one_or_none() with self._engine.begin() as connection:
if self_collection is None: self_collection = connection.execute(select_stmt).one_or_none()
return [] if self_collection is None:
self_collection = Collection(self, self_collection.id, '/'.join(path_parts)) return []
l += [self_collection] self_collection = Collection(self, self_collection.id, '/'.join(path_parts))
if select_sub_stmt is not None: l += [self_collection]
for row in connection.execute(select_sub_stmt): if select_sub_stmt is not None:
path = '/'.join(path_parts) for row in connection.execute(select_sub_stmt):
path += '/' path = '/'.join(path_parts)
path += row.name path += '/'
if row.type_ == 'collection': path += row.name
l += [Collection(self, row.id, path)] if row.type_ == 'collection':
elif row.type_ == 'item': l += [Collection(self, row.id, path)]
assert self_collection is not None elif row.type_ == 'item':
l += [self_collection._row_to_item(row)] assert self_collection is not None
l += [self_collection._row_to_item(row)]
return l return l
def discover(self, path: str, depth: str = "0") -> Iterable["radicale.types.CollectionOrItem"]:
with self._engine.begin() as c:
return self._discover(path, connection=c, depth=depth)
def _move(self, item: "radicale_item.Item", to_collection: "BaseCollection", to_href: str, *, connection) -> None:
assert isinstance(item.collection, Collection)
assert isinstance(to_collection, Collection)
src_collection_id = item.collection._id
dst_collection_id = to_collection._id
item_table = self._meta.tables['item']
delete_stmt = sa.delete(
item_table,
).where(
sa.and_(
item_table.c.collection_id == dst_collection_id,
item_table.c.name == to_href,
)
)
update_stmt = sa.update(
item_table,
).values(
collection_id=dst_collection_id,
name=to_href,
).where(
sa.and_(
item_table.c.collection_id == src_collection_id,
item_table.c.name == item.href,
)
)
connection.execute(delete_stmt)
connection.execute(update_stmt)
def move(self, item: "radicale_item.Item", to_collection: "BaseCollection", to_href: str) -> None: def move(self, item: "radicale_item.Item", to_collection: "BaseCollection", to_href: str) -> None:
with self._engine.begin() as c: pass
return self._move(item, to_collection, to_href, connection=c)
def _create_collection(
self,
href: str,
*,
connection,
items: Optional[Iterable["radicale_item.Item"]]=None,
props: Optional[Mapping[str, str]]=None,
) -> "BaseCollection":
path = self._split_path(href)
parent_id = self._root_collection.id
collection_table = self._meta.tables['collection']
collection_metadata_table = self._meta.tables['collection_metadata']
item_table = self._meta.tables['item']
for p in path:
select_stmt = sa.select(
collection_table.c,
).select_from(
collection_table,
).where(
sa.and_(
collection_table.c.parent_id == parent_id,
collection_table.c.name == p,
),
)
c = connection.execute(select_stmt).one_or_none()
if c is None:
insert_stmt = sa.insert(
collection_table,
).values(
parent_id=parent_id,
name=p,
).returning(
collection_table.c,
)
c = connection.execute(insert_stmt).one()
parent_id = c.id
if items is not None or props is not None:
# drop all subcollections and items
delete_collections_stmt = sa.delete(
collection_table,
).where(
collection_table.c.parent_id == parent_id,
)
delete_meta_stmt = sa.delete(
collection_metadata_table,
).where(
collection_metadata_table.c.collection_id == parent_id,
)
delete_items_stmt = sa.delete(
item_table,
).where(
item_table.c.collection_id == parent_id,
)
connection.execute(delete_collections_stmt)
connection.execute(delete_meta_stmt)
connection.execute(delete_items_stmt)
if props is not None:
insert_stmt = sa.insert(
collection_metadata_table,
).values([dict(collection_id=parent_id, key=k, value=v) for k, v in props.items()])
connection.execute(insert_stmt)
c = Collection(self, parent_id, '/'.join(path))
if props is not None and 'key' in props and items is not None:
for i in items:
assert i.href is not None
c._upload(i.href, i, connection=connection)
return c
def create_collection( def create_collection(
self, self,
@ -591,19 +468,76 @@ class Storage(BaseStorage):
items: Optional[Iterable["radicale_item.Item"]]=None, items: Optional[Iterable["radicale_item.Item"]]=None,
props: Optional[Mapping[str, str]]=None, props: Optional[Mapping[str, str]]=None,
) -> "BaseCollection": ) -> "BaseCollection":
with self._engine.begin() as c: print('creating collection')
return self._create_collection(href, connection=c, items=items, props=props) print(f'href={href}, items={items}, props={props}')
path = self._split_path(href)
parent_id = self._root_collection.id
collection_table = self._meta.tables['collection']
collection_metadata_table = self._meta.tables['collection_metadata']
item_table = self._meta.tables['item']
with self._engine.begin() as connection:
for p in path:
select_stmt = sa.select(
collection_table.c,
).select_from(
collection_table,
).where(
sa.and_(
collection_table.c.parent_id == parent_id,
collection_table.c.name == p,
),
)
c = connection.execute(select_stmt).one_or_none()
if c is None:
insert_stmt = sa.insert(
collection_table,
).values(
parent_id=parent_id,
name=p,
).returning(
collection_table.c,
)
c = connection.execute(insert_stmt).one()
parent_id = c.id
if items is not None or props is not None:
# drop all subcollections and items
delete_collections_stmt = sa.delete(
collection_table,
).where(
collection_table.c.parent_id == parent_id,
)
delete_meta_stmt = sa.delete(
collection_metadata_table,
).where(
collection_metadata_table.c.collection_id == parent_id,
)
delete_items_stmt = sa.delete(
item_table,
).where(
item_table.c.collection_id == parent_id,
)
connection.execute(delete_collections_stmt)
connection.execute(delete_meta_stmt)
connection.execute(delete_items_stmt)
if props is not None:
insert_stmt = sa.insert(
collection_metadata_table,
).values([dict(collection_id=parent_id, key=k, value=v) for k, v in props.items()])
connection.execute(insert_stmt)
if props is not None and 'key' in props and items is not None:
print(items)
# TODO insert items
c = Collection(self, parent_id, '/'.join(path))
print(c)
return c
@radicale.types.contextmanager @radicale.types.contextmanager
def acquire_lock(self, mod: str, user: str = "") -> Iterator[None]: def acquire_lock(self, mode: str, user: str = "") -> Iterator[None]:
# locking happens on a db level
yield yield
def _verify(self, *, connection) -> bool: def verify(self) -> bool:
_ = connection
return True return True
def verify(self):
with self._engine.begin() as c:
return self._verify(connection=c)