ovhddns/main.py

171 lines
5.5 KiB
Python
Executable file

#!/usr/bin/env python3
import itertools
import os
import sys
import typing
import logging
import yaml
import click
import ovh
import dns.resolver
import dns.rdatatype
from schema import And, Optional, Use, Schema
logger = logging.getLogger('ovhdns')
def str_or_list(i: typing.Union[str, typing.List[str]]) -> typing.List[str]:
if isinstance(i, str):
return [i]
return i
config_schema = Schema({
'ovh': {
'application_key': Use(str),
'application_secret': Use(str),
'consumer_key': Use(str),
'endpoint': Use(str),
},
'zone': Use(str),
Optional('subdomain', default=''): Use(str_or_list),
Optional('field_type', default='A'): And(Use(str), Use(str.upper), dns.rdatatype.RdataType.make),
Optional('logging', default={'level': ':WARNING,ovhdns:INFO'}): {
Optional('level', default=':WARNING,ovhdns:INFO'): Use(str),
},
Optional('ttl', default=60): Use(lambda i: int(i) if i is not None else 60),
})
def get_opendns_ips():
ips = []
for addr, typ in itertools.product(
['resolver1.opendns.com', 'resolver2.opendns.com'],
['A', 'AAAA'],
):
ips += [r.to_text() for r in dns.resolver.resolve(addr, typ)]
return ips
def get_my_ips():
resolver = dns.resolver.Resolver(configure=False)
resolver.nameservers = get_opendns_ips()
return [r.to_text() for r in resolver.resolve('myip.opendns.com')]
def get_records(client, config):
zone = config['zone']
records = client.get(f'/domain/zone/{zone}/record')
records = [
client.get(f'/domain/zone/{zone}/record/{id}')
for id in records
]
return records
def init_logging():
logging.getLogger().addHandler(
logging.StreamHandler(),
)
def set_logging_level(level: str):
levels = {
'NOTSET': logging.NOTSET,
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'WARN': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL,
}
log_levels = [x.split(':') for x in level.split(',')]
for module, level in log_levels:
module = module.strip() if module else None
level = level.strip().upper()
if level not in levels:
raise ValueError(f'invalid log level, allowed values: {repr(set(levels.keys()))}')
logging.getLogger(module).setLevel(levels[level])
def load_config(file):
if file == '-':
config = yaml.safe_load(sys.stdin)
else:
if os.stat(file).st_mode & 0o777 & ~0o600:
raise Exception('refusing to load insecure configuration file, file must have permission 0o600')
with open(file) as fp:
config = yaml.safe_load(fp)
return config_schema.validate(config)
@click.command()
@click.option('--config', '-c', envvar='OVHDNS_CONFIG', default='-', help='The configuration file')
@click.option('--level', '-l', envvar='OVHDNS_LEVEL', default=None, help='The log level for the application')
@click.option('--force-refresh', '-f', envvar='OVHDNS_FORCE_REFRESH', is_flag=True, help='Force zone refresh even if deemed not needed')
def main(config, level, force_refresh):
init_logging()
config = load_config(config)
if level:
config['logging']['level'] = level
config = config_schema.validate(config)
set_logging_level(config['logging']['level'])
ovh_client = ovh.Client(
**config['ovh'],
)
records = get_records(ovh_client, {'zone': config['zone']})
logger.debug('Found records %d:', len(records))
for r in records:
logger.debug(' - %s', repr(r))
logging.info('matching against subdomains %s', config['subdomain'])
records_dict = {}
for subdomain in config['subdomain']:
records_dict[subdomain] = [
r
for r in records
if r['fieldType'] == config['field_type']
and r['zone'] == config['zone']
and r['subDomain'] == subdomain
]
logger.info('found matching records: %s', records_dict)
for v in records_dict.values():
if len(v) > 1:
logger.error('found more than one record, don\'t know which to pick')
sys.exit(1)
my_ips = get_my_ips()
need_refresh = False
for subdomain, records in records_dict.items():
if len(records) == 1:
logger.info('record already present, updating if needed')
if my_ips[0] == records[0]['target']:
logger.info('ip is up-to-date, skipping update')
else:
need_refresh = True
ovh_client.put(
f'/domain/zone/{config["zone"]}/record/{records[0]["id"]}',
target=my_ips[0],
ttl=config['ttl'],
)
logger.info('updated record, new ip is %s', my_ips[0])
else:
logger.info('record not already present, creating new one')
need_refresh = True
ovh_client.post(
f'/domain/zone/{config["zone"]}/record/',
subDomain=subdomain,
target=my_ips[0],
fieldType=config['field_type'],
ttl=config['ttl'],
)
logger.info('created new record, ip is %s', my_ips[0])
if need_refresh or force_refresh:
ovh_client.post(f'/domain/zone/{config["zone"]}/refresh')
logger.info('refreshed zone')
else:
logger.info('zone refresh skipped')
sys.exit(0)
if __name__ == '__main__':
main()