wp-cal-integration/adapters/google.py

73 lines
2.3 KiB
Python
Raw Normal View History

2022-09-28 15:59:12 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2022-09-28 15:52:52 +02:00
import datetime
import logging
import os
2022-09-28 15:52:52 +02:00
from google.auth.transport.requests import Request
#from google.oauth2.credentials import Credentials
from google.oauth2.service_account import Credentials
2022-09-28 15:52:52 +02:00
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from schema import Use, Optional, Schema
2022-09-28 15:52:52 +02:00
logger = logging.getLogger(f"wp_cal.{__name__}")
from .abc import Source, Adapter
2022-09-28 15:52:52 +02:00
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']
2022-09-28 15:52:52 +02:00
class Google(Source, Adapter):
schema = Schema({
'calendar_id': Use(str),
'credentials': Use(dict),
Optional(
'token_file',
default=os.path.join(
os.environ["HOME"],
'.wp-cal-google-token',
),
): Use(str),
})
2022-09-28 15:52:52 +02:00
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calendar_id = self.config['calendar_id']
self.credentials_info = self.config['credentials']
self.token_file = self.config['token_file']
# runtime data
2022-09-28 15:52:52 +02:00
self.credentials = None
def login(self) -> bool:
self.credentials = Credentials.from_service_account_info(self.credentials_info, scopes=SCOPES)
if not self.credentials.valid:
self.credentials.refresh(Request())
logger.debug('credentials.valid = %s', repr(self.credentials.valid))
return self.credentials.valid
2022-09-28 15:52:52 +02:00
def get_events(self, start: datetime.datetime | None=None, until: datetime.timedelta | None=None, limit=None):
until = until if until else datetime.timedelta(days=365)
now = start if start else datetime.datetime.utcnow().astimezone()
now_365 = now + until
try:
service = build('calendar', 'v3', credentials=self.credentials)
events = service.events().list(
calendarId=self.calendar_id,
timeMin=now.isoformat(),
timeMax=now_365.isoformat(),
maxResults=limit,
).execute()
except HttpError:
logger.exception("a http error occured")
2022-09-28 15:52:52 +02:00
raise
events = events.get('items', [])
return events