ovhddns/main.py

158 lines
4.8 KiB
Python
Raw Normal View History

2023-02-19 01:01:41 +01:00
#!/usr/bin/env python3
import itertools
import json
import os
import sys
import logging
import yaml
import schema
import click
import ovh
import dns.resolver
import dns.rdatatype
from schema import And, Optional, Use, Schema
logger = logging.getLogger('ovhdns')
config_schema = Schema({
'ovh': {
'application_key': Use(str),
'application_secret': Use(str),
'consumer_key': Use(str),
'endpoint': Use(str),
},
'zone': Use(str),
'subdomain': Use(str),
Optional('field_type', default='A'): And(Use(str), Use(str.upper), dns.rdatatype.RdataType.make),
Optional('logging', default={'level': ':WARNING,ovhdns:INFO'}): {
Optional('level', default=':WARNING,ovhdns:INFO'): Use(str),
},
Optional('ttl', default=None): Use(lambda i: int(i) if i is not None else None),
})
def get_opendns_ips():
ips = []
for addr, typ in itertools.product(
['resolver1.opendns.com', 'resolver2.opendns.com'],
['A', 'AAAA'],
):
ips += [r.to_text() for r in dns.resolver.resolve(addr, typ)]
return ips
def get_my_ips():
resolver = dns.resolver.Resolver(configure=False)
resolver.nameservers = get_opendns_ips()
return [r.to_text() for r in resolver.resolve('myip.opendns.com')]
def get_records(client, config):
zone = config['zone']
records = client.get(f'/domain/zone/{zone}/record')
records = [
client.get(f'/domain/zone/{zone}/record/{id}')
for id in records
]
return records
def init_logging():
logging.getLogger().addHandler(
logging.StreamHandler(),
)
def set_logging_level(level: str):
levels = {
'NOTSET': logging.NOTSET,
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'WARN': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL,
}
log_levels = [x.split(':') for x in level.split(',')]
for module, level in log_levels:
module = module.strip() if module else None
level = level.strip().upper()
if level not in levels:
raise ValueError(f'invalid log level, allowed values: {repr(set(levels.keys()))}')
logging.getLogger(module).setLevel(levels[level])
def load_config(file):
if file == '-':
config = yaml.safe_load(sys.stdin)
else:
if os.stat(file).st_mode & 0o777 & ~0o600:
raise Exception('refusing to load insecure configuration file, file must have permission 0o600')
with open(file) as fp:
config = yaml.safe_load(fp)
return config_schema.validate(config)
@click.command()
@click.option('--config', '-c', envvar='OVHDNS_CONFIG', default='-', help='The configuration file')
@click.option('--level', '-l', envvar='OVHDNS_LEVEL', default=None, help='The log level for the application')
@click.option('--force-refresh', '-f', envvar='OVHDNS_FORCE_REFRESH', is_flag=True, help='Force zone refresh even if deemed not needed')
def main(config, level, force_refresh):
init_logging()
config = load_config(config)
if level:
config['logging']['level'] = level
config = config_schema.validate(config)
set_logging_level(config['logging']['level'])
ovh_client = ovh.Client(
**config['ovh'],
)
records = [
r
for r in get_records(ovh_client, {'zone': 'redxef.at'})
if r['fieldType'] == config['field_type']
and r['zone'] == config['zone']
and r['subDomain'] == config['subdomain']
]
logger.info('found matching records: %s', records)
if len(records) > 1:
logger.error('found more than one record, don\'t know which to pick')
sys.exit(1)
my_ips = get_my_ips()
need_refresh = False
if len(records) == 1:
logger.info('record already present, updating if needed')
if my_ips[0] == records[0]['target']:
logger.info('ip is up-to-date, skipping update')
else:
need_refresh = True
ovh_client.put(
f'/domain/zone/{config["zone"]}/record/{records[0]["id"]}',
target=my_ips[0],
ttl=config['ttl'],
)
logger.info('updated record, new ip is %s', my_ips[0])
else:
logger.info('record not already present, creating new one')
need_refresh = True
ovh_client.post(
f'/domain/zone/{config["zone"]}/record/',
subDomain=config['subdomain'],
target=my_ips[0],
fieldType=config['field_type'],
ttl=config['ttl'],
)
logger.info('created new record, ip is %s', my_ips[0])
if need_refresh or force_refresh:
ovh_client.post(f'/domain/zone/{config["zone"]}/refresh')
logger.info('refreshed zone')
else:
logger.info('zone refresh skipped')
sys.exit(0)
if __name__ == '__main__':
main()