#!/usr/bin/env python3 # -*- coding: utf-8 -*- import datetime import logging import os from google.auth.transport.requests import Request #from google.oauth2.credentials import Credentials from google.oauth2.service_account import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError from schema import Use, Optional, Schema logger = logging.getLogger(f"wp_cal.{__name__}") from .abc import Source, Adapter # If modifying these scopes, delete the file token.json. SCOPES = ['https://www.googleapis.com/auth/calendar.readonly'] class Google(Source, Adapter): schema = Schema({ 'calendar_id': Use(str), 'credentials': Use(dict), Optional( 'token_file', default=os.path.join( os.environ["HOME"], '.wp-cal-google-token', ), ): Use(str), }) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.calendar_id = self.config['calendar_id'] self.credentials_info = self.config['credentials'] self.token_file = self.config['token_file'] # runtime data self.credentials = None def login(self) -> bool: self.credentials = Credentials.from_service_account_info(self.credentials_info, scopes=SCOPES) if not self.credentials.valid: self.credentials.refresh(Request()) logger.debug('credentials.valid = %s', repr(self.credentials.valid)) return self.credentials.valid def get_events(self, start: datetime.datetime | None=None, until: datetime.timedelta | None=None, limit=None): until = until if until else datetime.timedelta(days=365) now = start if start else datetime.datetime.utcnow().astimezone() now_365 = now + until try: service = build('calendar', 'v3', credentials=self.credentials) events = service.events().list( calendarId=self.calendar_id, timeMin=now.isoformat(), timeMax=now_365.isoformat(), maxResults=limit, ).execute() except HttpError: logger.exception("a http error occured") raise events = events.get('items', []) return events