wp-cal-integration/adapters/wordpress.py

181 lines
5.9 KiB
Python
Raw Permalink Normal View History

2022-09-28 15:59:12 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2022-09-28 15:52:52 +02:00
import requests
import urllib.parse
import json
import datetime
2022-10-13 00:16:05 +02:00
import logging
2022-09-28 15:52:52 +02:00
from schema import Use, Schema
2023-11-14 15:40:53 +01:00
from bs4 import BeautifulSoup
2022-09-28 15:52:52 +02:00
from . import utils
from .abc import Sink, Adapter
2022-09-28 15:52:52 +02:00
2022-10-13 00:16:05 +02:00
logger = logging.getLogger(f"wp_cal.{__name__}")
2022-09-28 15:52:52 +02:00
class CalendarMetadata():
def __init__(
self,
name: str,
id: int,
translations: dict,
):
self.name = name
self.id = id
self.translations = translations
def to_dict(self):
translations = {
f'calendar_name_translation_{k}': v for k, v in self.translations.items()
}
return {
'calendar_name': self.name,
'calendar_id': self.id,
**translations,
}
class Wordpress(Sink, Adapter):
schema = Schema({
'url': Use(str),
'calendar': {
'id': Use(str),
'name': Use(str),
'translations': Use(dict),
},
'credentials': {
'user': Use(str),
'password': Use(str),
},
})
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.base_url = self.config['url']
self.credentials = self.config['credentials']
self.calendar_metadata = CalendarMetadata(**self.config['calendar'])
# runtime data
2022-09-28 15:52:52 +02:00
self.session = requests.Session()
def login(self):
login_request = self.session.post(
f'{self.base_url}/wp-login.php',
data={
'log': self.credentials['user'],
'pwd': self.credentials['password'],
'wp-submit': 'Anmelden',
'redirect_to': f'{self.base_url}/wp-admin/',
'testcookie': 1,
2022-10-13 11:06:44 +02:00
},
allow_redirects=False,
2022-09-28 15:52:52 +02:00
)
2022-10-13 11:06:44 +02:00
logger.debug('login request return headers = %s', login_request.headers)
login_request_cookies = [x.strip() for x in login_request.headers['Set-Cookie'].split(',')]
if len(login_request_cookies) > 1:
logger.debug('login seems to be ok, cookies = %s', login_request_cookies)
return True
return False
2022-09-28 15:52:52 +02:00
def _datestr_to_date(self, s, tz=None):
if s.endswith('Z'):
s = s[:-1]
return datetime.datetime.fromisoformat(s).astimezone().date()
def _generate_data_single(self, event, item_id=2):
start = event['start']
end = event['end']
summary = event['summary']
day_increment = datetime.timedelta(days=1)
if 'date' in start:
start = self._datestr_to_date(start['date'])
elif 'dateTime' in start:
start = self._datestr_to_date(start['dateTime'], tz=start['timeZone'])
else:
raise ValueError('Cannot process event')
if 'date' in end:
end = self._datestr_to_date(end['date'])
elif 'dateTime' in end:
end = self._datestr_to_date(end['dateTime'], tz=end['timeZone'])
end += day_increment # if its a time on a day, we want to add one, in order to also include it in the update
else:
raise ValueError('Cannot process event')
flow = start
dictionary = {}
while flow < end:
dictionary.setdefault(flow.year, {})
dictionary[flow.year].setdefault(flow.month, {})
dictionary[flow.year][flow.month].setdefault(flow.day, {})
dictionary[flow.year][flow.month][flow.day] = {
'description': summary,
'legend_item_id': item_id,
}
flow += day_increment
return dictionary
def _fill_empty(self, d, *, start: datetime.datetime, until: datetime.timedelta):
day_increment = datetime.timedelta(days=1)
flow = start
while flow < start + until:
d.setdefault(flow.year, {})
d[flow.year].setdefault(flow.month, {})
d[flow.year][flow.month].setdefault(flow.day, {})
d[flow.year][flow.month][flow.day].setdefault('legend_item_id', 1)
d[flow.year][flow.month][flow.day].setdefault('description', '')
flow += day_increment
return d
def _generate_data(
self,
events,
start: datetime.datetime | None=None,
until: datetime.timedelta | None=None
):
start = start if start else datetime.datetime.utcnow().astimezone()
2022-10-13 11:06:44 +02:00
until = until if until else datetime.timedelta(days=365)
2022-09-28 15:52:52 +02:00
final_dict = {}
for event in events:
data = self._generate_data_single(event)
utils.dict_merge(final_dict, data)
final_dict = self._fill_empty(
final_dict,
start=start,
until=until,
)
return final_dict
2023-11-14 15:40:53 +01:00
def get_nonce(self):
r = self.session.get(
f'{self.base_url}/wp-admin/admin.php?page=wpbs-calendars&subpage=edit-calendar&calendar_id=1',
)
soup = BeautifulSoup(r.text, 'html.parser')
nonce = soup.find_all('input', {'id': 'wpbs_token'})[0]
return nonce['value']
2022-10-13 11:06:44 +02:00
def post_events(self, events, start: datetime.datetime | None=None, until: datetime.timedelta | None=None):
metadata = self.calendar_metadata.to_dict()
2022-10-13 11:06:44 +02:00
data = self._generate_data(events, start=start, until=until)
2022-09-28 15:52:52 +02:00
update_request = self.session.post(
f'{self.base_url}/wp-admin/admin-ajax.php',
auth=(self.credentials['user'], self.credentials['password']),
data={
'action': 'wpbs_save_calendar_data',
'form_data': urllib.parse.urlencode(metadata),
'calendar_data': json.dumps(data),
2023-11-14 15:40:53 +01:00
'wpbs_token': self.get_nonce(),
2022-09-28 15:52:52 +02:00
},
)
r = 'wpbs_message=calendar_update_success' in update_request.text
if not r:
raise Exception(f'failed to post events, got answer {update_request.text}')
2022-09-28 15:52:52 +02:00