170 lines
5.5 KiB
Python
170 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import requests
|
|
import urllib.parse
|
|
import json
|
|
import datetime
|
|
import logging
|
|
|
|
from schema import Use, Schema
|
|
|
|
from . import utils
|
|
from .abc import Sink, Adapter
|
|
|
|
logger = logging.getLogger(f"wp_cal.{__name__}")
|
|
|
|
class CalendarMetadata():
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
id: int,
|
|
translations: dict,
|
|
):
|
|
self.name = name
|
|
self.id = id
|
|
self.translations = translations
|
|
|
|
def to_dict(self):
|
|
translations = {
|
|
f'calendar_name_translation_{k}': v for k, v in self.translations.items()
|
|
}
|
|
return {
|
|
'calendar_name': self.name,
|
|
'calendar_id': self.id,
|
|
**translations,
|
|
}
|
|
|
|
class Wordpress(Sink, Adapter):
|
|
|
|
schema = Schema({
|
|
'url': Use(str),
|
|
'calendar': {
|
|
'id': Use(str),
|
|
'name': Use(str),
|
|
'translations': Use(dict),
|
|
},
|
|
'credentials': {
|
|
'user': Use(str),
|
|
'password': Use(str),
|
|
},
|
|
})
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.base_url = self.config['url']
|
|
self.credentials = self.config['credentials']
|
|
self.calendar_metadata = CalendarMetadata(**self.config['calendar'])
|
|
|
|
# runtime data
|
|
self.session = requests.Session()
|
|
|
|
def login(self):
|
|
login_request = self.session.post(
|
|
f'{self.base_url}/wp-login.php',
|
|
data={
|
|
'log': self.credentials['user'],
|
|
'pwd': self.credentials['password'],
|
|
'wp-submit': 'Anmelden',
|
|
'redirect_to': f'{self.base_url}/wp-admin/',
|
|
'testcookie': 1,
|
|
},
|
|
allow_redirects=False,
|
|
)
|
|
logger.debug('login request return headers = %s', login_request.headers)
|
|
login_request_cookies = [x.strip() for x in login_request.headers['Set-Cookie'].split(',')]
|
|
if len(login_request_cookies) > 1:
|
|
logger.debug('login seems to be ok, cookies = %s', login_request_cookies)
|
|
return True
|
|
return False
|
|
|
|
def _datestr_to_date(self, s, tz=None):
|
|
if s.endswith('Z'):
|
|
s = s[:-1]
|
|
return datetime.datetime.fromisoformat(s).astimezone().date()
|
|
|
|
def _generate_data_single(self, event, item_id=2):
|
|
start = event['start']
|
|
end = event['end']
|
|
summary = event['summary']
|
|
|
|
day_increment = datetime.timedelta(days=1)
|
|
|
|
if 'date' in start:
|
|
start = self._datestr_to_date(start['date'])
|
|
elif 'dateTime' in start:
|
|
start = self._datestr_to_date(start['dateTime'], tz=start['timeZone'])
|
|
else:
|
|
raise ValueError('Cannot process event')
|
|
|
|
if 'date' in end:
|
|
end = self._datestr_to_date(end['date'])
|
|
elif 'dateTime' in end:
|
|
end = self._datestr_to_date(end['dateTime'], tz=end['timeZone'])
|
|
end += day_increment # if its a time on a day, we want to add one, in order to also include it in the update
|
|
else:
|
|
raise ValueError('Cannot process event')
|
|
|
|
flow = start
|
|
dictionary = {}
|
|
while flow < end:
|
|
dictionary.setdefault(flow.year, {})
|
|
dictionary[flow.year].setdefault(flow.month, {})
|
|
dictionary[flow.year][flow.month].setdefault(flow.day, {})
|
|
dictionary[flow.year][flow.month][flow.day] = {
|
|
'description': summary,
|
|
'legend_item_id': item_id,
|
|
}
|
|
flow += day_increment
|
|
return dictionary
|
|
|
|
def _fill_empty(self, d, *, start: datetime.datetime, until: datetime.timedelta):
|
|
day_increment = datetime.timedelta(days=1)
|
|
flow = start
|
|
while flow < start + until:
|
|
d.setdefault(flow.year, {})
|
|
d[flow.year].setdefault(flow.month, {})
|
|
d[flow.year][flow.month].setdefault(flow.day, {})
|
|
d[flow.year][flow.month][flow.day].setdefault('legend_item_id', 1)
|
|
d[flow.year][flow.month][flow.day].setdefault('description', '')
|
|
flow += day_increment
|
|
return d
|
|
|
|
def _generate_data(
|
|
self,
|
|
events,
|
|
start: datetime.datetime | None=None,
|
|
until: datetime.timedelta | None=None
|
|
):
|
|
start = start if start else datetime.datetime.utcnow().astimezone()
|
|
until = until if until else datetime.timedelta(days=365)
|
|
|
|
final_dict = {}
|
|
for event in events:
|
|
data = self._generate_data_single(event)
|
|
utils.dict_merge(final_dict, data)
|
|
final_dict = self._fill_empty(
|
|
final_dict,
|
|
start=start,
|
|
until=until,
|
|
)
|
|
return final_dict
|
|
|
|
def post_events(self, events, start: datetime.datetime | None=None, until: datetime.timedelta | None=None):
|
|
metadata = self.calendar_metadata.to_dict()
|
|
data = self._generate_data(events, start=start, until=until)
|
|
update_request = self.session.post(
|
|
f'{self.base_url}/wp-admin/admin-ajax.php',
|
|
auth=(self.credentials['user'], self.credentials['password']),
|
|
data={
|
|
'action': 'wpbs_save_calendar_data',
|
|
'form_data': urllib.parse.urlencode(metadata),
|
|
'calendar_data': json.dumps(data),
|
|
},
|
|
)
|
|
r = 'wpbs_message=calendar_update_success' in update_request.text
|
|
if not r:
|
|
raise Exception(f'failed to post events, got answer {update_request.text}')
|
|
|
|
|