Compare commits

...

3 commits

5 changed files with 175 additions and 44 deletions

View file

@ -3,47 +3,36 @@
import tempfile import tempfile
import datetime import datetime
import logging
import os.path import os.path
import json import json
from google.auth.transport.requests import Request from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials #from google.oauth2.credentials import Credentials
from google.oauth2.service_account import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build from googleapiclient.discovery import build
from googleapiclient.errors import HttpError from googleapiclient.errors import HttpError
logger = logging.getLogger(f"wp_cal.{__name__}")
# If modifying these scopes, delete the file token.json. # If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/calendar'] SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']
class Google(): class Google():
def __init__(self, calendar_id, *, credentials, token_file): def __init__(self, calendar_id, *, credentials, token_file):
_, credentials_file = tempfile.mkstemp()
with open(credentials_file, 'w') as fp:
json.dump(credentials, fp)
self.calendar_id = calendar_id self.calendar_id = calendar_id
self.credentials_file = credentials_file self.credentials_info = credentials
self.token_file = token_file
self.credentials = None self.credentials = None
def login(self): def login(self):
if os.path.exists(self.token_file): self.credentials = Credentials.from_service_account_info(self.credentials_info, scopes=SCOPES)
self.credentials = Credentials.from_authorized_user_file(self.token_file, SCOPES) if not self.credentials.valid:
if self.credentials is None or not self.credentials.valid:
if self.credentials is not None \
and self.credentials.expired \
and self.credentials.refresh_token:
self.credentials.refresh(Request()) self.credentials.refresh(Request())
else: logger.debug('credentials.valid = %s', repr(self.credentials.valid))
flow = InstalledAppFlow.from_client_secrets_file(
self.credentials_file,
SCOPES,
)
self.credentials = flow.run_local_server(port=0)
with open(self.token_file, 'w') as token:
token.write(self.credentials.to_json())
def get_events(self, start: datetime.datetime | None=None, until: datetime.timedelta | None=None, limit=None): def get_events(self, start: datetime.datetime | None=None, until: datetime.timedelta | None=None, limit=None):
until = until if until else datetime.timedelta(days=365) until = until if until else datetime.timedelta(days=365)
@ -59,7 +48,7 @@ class Google():
maxResults=limit, maxResults=limit,
).execute() ).execute()
except HttpError: except HttpError:
print('an error occured') logger.exception("a http error occured")
raise raise
events = events.get('items', []) events = events.get('items', [])
return events return events

View file

@ -1,6 +1,10 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import logging
logger = logging.getLogger(f"wp_cal.{__name__}")
def dict_merge(d0, d1): def dict_merge(d0, d1):
for k, v in d1.items(): for k, v in d1.items():
if (k in d0 and isinstance(d0[k], dict) and isinstance(d1[k], dict)): if (k in d0 and isinstance(d0[k], dict) and isinstance(d1[k], dict)):

View file

@ -5,10 +5,11 @@ import requests
import urllib.parse import urllib.parse
import json import json
import datetime import datetime
import logging
from . import utils from . import utils
logger = logging.getLogger(f"wp_cal.{__name__}")
class CalendarMetadata(): class CalendarMetadata():

48
ci/pipeline.yaml Normal file
View file

@ -0,0 +1,48 @@
---
resources:
- name: every-day
type: time
source:
interval: 24h
- name: script
type: git
source:
uri: https://gitea.redxef.at/redxef/wp-cal-integration
branch: test
jobs:
- name: update-calendar
plan:
- get: every-day
trigger: true
- get: script
- task: run-update
config:
platform: linux
image_resource:
type: registry-image
source:
repository: alpine
inputs:
- name: script
path: .
params:
VAULT_TOKEN: ((vault_token.token))
run:
path: sh
args:
- -c
- |
#!/usr/bin/env sh
apk --no-cache --update add \
libcap vault jq \
python3 py3-pip py3-yaml \
py3-google-api-python-client \
py3-google-auth-httplib2 \
&& python3 -m pip install google-auth-oauthlib
setcap cap_ipc_lock= "$(command -v vault)"
vault read -address https://vault.redxef.at -format json secret/data/scouts/google/website/wp-cal-integration \
| jq .data.data \
| ./main.py

127
main.py
View file

@ -1,17 +1,72 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os
import sys import sys
import yaml import yaml
import logging
try: try:
from yaml import CLoader as Loader from yaml import CLoader as Loader, CDumper as Dumper
except ImportError: except ImportError:
from yaml import Loader from yaml import Loader, Dumper
import click
from adapters import * from adapters import *
class Config(): logger = logging.getLogger(f'wp_cal.{__name__}')
class BaseConfig(dict):
def __init__(self, file, defaults, *args, **kwargs):
self._ex = None
self.file = file
self.defaults = defaults
super().__init__(*args, **kwargs)
try:
self._load()
except FileNotFoundError as ex:
self._ex = ex
def __repr__(self):
return super().__repr__()
def __str__(self):
return super().__str__()
def _load(self):
try:
if self.file == '-':
config = yaml.load(sys.stdin, Loader=Loader)
else:
with open(self.file) as fp:
config = yaml.load(fp, Loader=Loader)
if config is None:
config = {}
for k, v in config.items():
self[k] = v
finally:
for keylist, value in self.defaults:
d = self
for i, key in enumerate(keylist):
repl = value if (i == len(keylist) - 1) else {}
d[key] = d.get(key, repl)
d = d[key]
def _save(self):
with open(self.file, 'w') as fp:
yaml.dump(self, fp, Dumper=Dumper)
def exception(self):
return self._ex
def load(self):
return self._load()
def save(self):
return self._save()
class Config(BaseConfig):
""" """
The default configuration. The default configuration.
@ -26,31 +81,60 @@ class Config():
.wordpress.calendar.translations: a dictionary of language <-> translation pairs (example: {"en": "Reservations"}) .wordpress.calendar.translations: a dictionary of language <-> translation pairs (example: {"en": "Reservations"})
.wordpress.credentials.user: the user as which to log into wordpress .wordpress.credentials.user: the user as which to log into wordpress
.wordpress.credentials.password: the users password .wordpress.credentials.password: the users password
.logging.level: one or more log levels of the form <module.submodule>:<level> seperated by a `,` (comma)
""" """
def __init__(self, file): def __init__(self, file, defaults, *args, **kwargs):
self.file = file defaults += [
self.config: dict | None = None ('google.calendar_id', '#TODO insert google calendar id'),
('google.credentials', {}),
('google.token_file', os.path.join(os.environ['HOME'], '.wp-cal-google-token')),
('wordpress.url', '#TODO insert url to wordpress site'),
('wordpress.calendar.id', '#TODO insert wp-booking-system calendar id'),
('wordpress.calendar.name', '#TODO insert calendar name'),
('wordpress.calendar.translations', {'en': '#TODO insert english translation'}),
('wordpress.credentials.user', '#TODO insert wordpress username'),
('wordpress.credentials.password', '#TODO insert wordpress password'),
]
defaults = [(x[0].split('.'), x[1]) for x in defaults]
super().__init__(file, defaults, *args, **kwargs)
def load(self): def init_logging(level: str):
if self.file == '-': allowed_values = {'NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'}
config = yaml.load(sys.stdin, Loader=Loader)
log_levels = [x.split(':') for x in level.split(',')]
for module, level in log_levels:
module = module.strip() if module else None
level = level.strip().upper()
if level not in allowed_values:
raise ValueError(f'invalid log level, allowed values: {repr(allowed_values)}')
level = getattr(logging, level)
logging.getLogger(module).setLevel(level)
@click.command()
@click.option('--level', '-l', envvar='WP_CAL_LEVEL', default=':WARNING,wp_cal:INFO', help='The log level for the application')
@click.option('--config', '-c', envvar='WP_CAL_CONFIG', default='-', help='The configuration file')
def main(level, config):
logging.basicConfig()
init_logging(level)
config = Config(config, [('logging.level', ':WARNING,wp_cal:INFO')])
if config.exception():
logger.info('config not found, trying to generate template')
try:
config.save()
except Exception:
logger.exception('failed to generate template')
else: else:
with open(self.file) as fp: logger.info('generated config at "%s"', config.file)
config = yaml.load(fp, Loader=Loader) return
self.config = config
def __getitem__(self, name): init_logging(config['logging']['level'])
assert self.config is not None
return self.config[name]
if __name__ == '__main__':
config = Config('-')
config.load()
g = Google( g = Google(
config['google']['calendar_id'], config['google']['calendar_id'],
credentials=config['google']['credentials'], credentials=config['google']['credentials'],
token_file=config['google'].get('token_file', '~/.wp-cal-integration-google-token.json') token_file=config['google']['token_file'],
) )
w = Wordpress( w = Wordpress(
config['wordpress']['url'], config['wordpress']['url'],
@ -63,5 +147,10 @@ if __name__ == '__main__':
) )
g.login() g.login()
events = g.get_events() events = g.get_events()
logger.info("syncing %d events", len(events))
w.login() w.login()
w.post_events(events) w.post_events(events)
logger.info("done")
if __name__ == '__main__':
main()