wp-cal-integration/adapters/wordpress.py

155 lines
5 KiB
Python
Raw Normal View History

2022-09-28 15:59:12 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2022-09-28 15:52:52 +02:00
import requests
import urllib.parse
import json
import datetime
2022-10-13 00:16:05 +02:00
import logging
2022-09-28 15:52:52 +02:00
from . import utils
2022-10-13 00:16:05 +02:00
logger = logging.getLogger(f"wp_cal.{__name__}")
2022-09-28 15:52:52 +02:00
class CalendarMetadata():
def __init__(
self,
name: str,
id: int,
translations: dict,
):
self.name = name
self.id = id
self.translations = translations
def to_dict(self):
translations = {
f'calendar_name_translation_{k}': v for k, v in self.translations.items()
}
return {
'calendar_name': self.name,
'calendar_id': self.id,
**translations,
}
class Wordpress():
def __init__(
self,
base_url: str,
*,
calendar_metadata: CalendarMetadata,
credentials: dict,
):
self.base_url = base_url
self.session = requests.Session()
self.credentials = credentials
self.calendar_meadata = calendar_metadata
def login(self):
login_request = self.session.post(
f'{self.base_url}/wp-login.php',
data={
'log': self.credentials['user'],
'pwd': self.credentials['password'],
'wp-submit': 'Anmelden',
'redirect_to': f'{self.base_url}/wp-admin/',
'testcookie': 1,
2022-10-13 11:06:44 +02:00
},
allow_redirects=False,
2022-09-28 15:52:52 +02:00
)
2022-10-13 11:06:44 +02:00
logger.debug('login request return headers = %s', login_request.headers)
login_request_cookies = [x.strip() for x in login_request.headers['Set-Cookie'].split(',')]
if len(login_request_cookies) > 1:
logger.debug('login seems to be ok, cookies = %s', login_request_cookies)
return True
return False
2022-09-28 15:52:52 +02:00
def _datestr_to_date(self, s, tz=None):
if s.endswith('Z'):
s = s[:-1]
return datetime.datetime.fromisoformat(s).astimezone().date()
def _generate_data_single(self, event, item_id=2):
start = event['start']
end = event['end']
summary = event['summary']
day_increment = datetime.timedelta(days=1)
if 'date' in start:
start = self._datestr_to_date(start['date'])
elif 'dateTime' in start:
start = self._datestr_to_date(start['dateTime'], tz=start['timeZone'])
else:
raise ValueError('Cannot process event')
if 'date' in end:
end = self._datestr_to_date(end['date'])
elif 'dateTime' in end:
end = self._datestr_to_date(end['dateTime'], tz=end['timeZone'])
end += day_increment # if its a time on a day, we want to add one, in order to also include it in the update
else:
raise ValueError('Cannot process event')
flow = start
dictionary = {}
while flow < end:
dictionary.setdefault(flow.year, {})
dictionary[flow.year].setdefault(flow.month, {})
dictionary[flow.year][flow.month].setdefault(flow.day, {})
dictionary[flow.year][flow.month][flow.day] = {
'description': summary,
'legend_item_id': item_id,
}
flow += day_increment
return dictionary
def _fill_empty(self, d, *, start: datetime.datetime, until: datetime.timedelta):
day_increment = datetime.timedelta(days=1)
flow = start
while flow < start + until:
d.setdefault(flow.year, {})
d[flow.year].setdefault(flow.month, {})
d[flow.year][flow.month].setdefault(flow.day, {})
d[flow.year][flow.month][flow.day].setdefault('legend_item_id', 1)
d[flow.year][flow.month][flow.day].setdefault('description', '')
flow += day_increment
return d
def _generate_data(
self,
events,
start: datetime.datetime | None=None,
until: datetime.timedelta | None=None
):
start = start if start else datetime.datetime.utcnow().astimezone()
2022-10-13 11:06:44 +02:00
until = until if until else datetime.timedelta(days=365)
2022-09-28 15:52:52 +02:00
final_dict = {}
for event in events:
data = self._generate_data_single(event)
utils.dict_merge(final_dict, data)
final_dict = self._fill_empty(
final_dict,
start=start,
until=until,
)
return final_dict
2022-10-13 11:06:44 +02:00
def post_events(self, events, start: datetime.datetime | None=None, until: datetime.timedelta | None=None):
2022-09-28 15:52:52 +02:00
metadata = self.calendar_meadata.to_dict()
2022-10-13 11:06:44 +02:00
data = self._generate_data(events, start=start, until=until)
2022-09-28 15:52:52 +02:00
update_request = self.session.post(
f'{self.base_url}/wp-admin/admin-ajax.php',
auth=(self.credentials['user'], self.credentials['password']),
data={
'action': 'wpbs_save_calendar_data',
'form_data': urllib.parse.urlencode(metadata),
'calendar_data': json.dumps(data),
},
)
2022-10-13 11:06:44 +02:00
return 'wpbs_message=calendar_update_success' in update_request.text
2022-09-28 15:52:52 +02:00