-
vlorentz authored
1. When passed a .yaml path (or any path with a non-whitelisted extension), don't read .yml instead when the .yaml exists. It's an extremely surprising behavior. 2. If the .yaml file does not exist, it will still try alternative extensions in order not to break existing deployments which may rely on it, but it raises a warning now. 3. When given a non-existing path, show an error log, but keep parsing it as an empty config, in order not to break existing deployments.
vlorentz authored1. When passed a .yaml path (or any path with a non-whitelisted extension), don't read .yml instead when the .yaml exists. It's an extremely surprising behavior. 2. If the .yaml file does not exist, it will still try alternative extensions in order not to break existing deployments which may rely on it, but it raises a warning now. 3. When given a non-existing path, show an error log, but keep parsing it as an empty config, in order not to break existing deployments.
config.py 8.40 KiB
# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from copy import deepcopy
from itertools import chain
import logging
import os
from typing import Any, Callable, Dict, List, Optional, Tuple
import yaml
logger = logging.getLogger(__name__)
SWH_CONFIG_DIRECTORIES = [
"~/.config/swh",
"~/.swh",
"/etc/softwareheritage",
]
SWH_GLOBAL_CONFIG = "global.yml"
SWH_DEFAULT_GLOBAL_CONFIG = {
"max_content_size": ("int", 100 * 1024 * 1024),
}
SWH_CONFIG_EXTENSIONS = [
".yml",
]
# conversion per type
_map_convert_fn: Dict[str, Callable] = {
"int": int,
"bool": lambda x: x.lower() == "true",
"list[str]": lambda x: [value.strip() for value in x.split(",")],
"list[int]": lambda x: [int(value.strip()) for value in x.split(",")],
}
_map_check_fn: Dict[str, Callable] = {
"int": lambda x: isinstance(x, int),
"bool": lambda x: isinstance(x, bool),
"list[str]": lambda x: (isinstance(x, list) and all(isinstance(y, str) for y in x)),
"list[int]": lambda x: (isinstance(x, list) and all(isinstance(y, int) for y in x)),
}
def exists_accessible(filepath: str) -> bool:
"""Check whether a file exists, and is accessible.
Returns:
True if the file exists and is accessible
False if the file does not exist
Raises:
PermissionError if the file cannot be read.
"""
try:
os.stat(filepath)
except PermissionError:
raise
except FileNotFoundError:
return False
else:
if os.access(filepath, os.R_OK):
return True
else:
raise PermissionError("Permission denied: {filepath!r}")
def read_raw_config(base_config_path: str) -> Dict[str, Any]:
"""Read the raw config corresponding to base_config_path.
Can read yml files.
"""
yml_file = config_path(base_config_path)
if yml_file is None:
logging.error("Config file %s does not exist, ignoring it.", base_config_path)
return {}
else:
logger.debug("Loading config file %s", yml_file)
with open(yml_file) as f:
return yaml.safe_load(f)
def config_path(config_path):
"""Check whether the given config exists"""
if exists_accessible(config_path):
return config_path
for extension in SWH_CONFIG_EXTENSIONS:
if exists_accessible(config_path + extension):
logger.warning(
"%s does not exist, using %s instead",
config_path,
config_path + extension,
)
return config_path + extension
return None
def read(
conf_file: Optional[str] = None,
default_conf: Optional[Dict[str, Tuple[str, Any]]] = None,
) -> Dict[str, Any]:
"""Read the user's configuration file.
Fill in the gap using `default_conf`. `default_conf` is similar to this::
DEFAULT_CONF = {
'a': ('str', '/tmp/swh-loader-git/log'),
'b': ('str', 'dbname=swhloadergit')
'c': ('bool', true)
'e': ('bool', None)
'd': ('int', 10)
}
If conf_file is None, return the default config.
"""
conf: Dict[str, Any] = {}
if conf_file:
base_config_path = os.path.expanduser(conf_file)
conf = read_raw_config(base_config_path) or {}
if not default_conf:
return conf
# remaining missing default configuration key are set
# also type conversion is enforced for underneath layer
for key, (nature_type, default_value) in default_conf.items():
val = conf.get(key, None)
if val is None: # fallback to default value
conf[key] = default_value
elif not _map_check_fn.get(nature_type, lambda x: True)(val):
# value present but not in the proper format, force type conversion
conf[key] = _map_convert_fn.get(nature_type, lambda x: x)(val)
return conf
def priority_read(
conf_filenames: List[str], default_conf: Optional[Dict[str, Tuple[str, Any]]] = None
):
"""Try reading the configuration files from conf_filenames, in order,
and return the configuration from the first one that exists.
default_conf has the same specification as it does in read.
"""
# Try all the files in order
for filename in conf_filenames:
full_filename = config_path(os.path.expanduser(filename))
if full_filename is not None:
return read(full_filename, default_conf)
# Else, return the default configuration
return read(None, default_conf)
def merge_default_configs(base_config, *other_configs):
"""Merge several default config dictionaries, from left to right"""
full_config = base_config.copy()
for config in other_configs:
full_config.update(config)
return full_config
def merge_configs(base: Optional[Dict[str, Any]], other: Optional[Dict[str, Any]]):
"""Merge two config dictionaries
This does merge config dicts recursively, with the rules, for every value
of the dicts (with 'val' not being a dict):
- None + type -> type
- type + None -> None
- dict + dict -> dict (merged)
- val + dict -> TypeError
- dict + val -> TypeError
- val + val -> val (other)
for instance:
>>> d1 = {
... 'key1': {
... 'skey1': 'value1',
... 'skey2': {'sskey1': 'value2'},
... },
... 'key2': 'value3',
... }
with
>>> d2 = {
... 'key1': {
... 'skey1': 'value4',
... 'skey2': {'sskey2': 'value5'},
... },
... 'key3': 'value6',
... }
will give:
>>> d3 = {
... 'key1': {
... 'skey1': 'value4', # <-- note this
... 'skey2': {
... 'sskey1': 'value2',
... 'sskey2': 'value5',
... },
... },
... 'key2': 'value3',
... 'key3': 'value6',
... }
>>> assert merge_configs(d1, d2) == d3
Note that no type checking is done for anything but dicts.
"""
if not isinstance(base, dict) or not isinstance(other, dict):
raise TypeError("Cannot merge a %s with a %s" % (type(base), type(other)))
output = {}
allkeys = set(chain(base.keys(), other.keys()))
for k in allkeys:
vb = base.get(k)
vo = other.get(k)
if isinstance(vo, dict):
output[k] = merge_configs(vb is not None and vb or {}, vo)
elif isinstance(vb, dict) and k in other and other[k] is not None:
output[k] = merge_configs(vb, vo is not None and vo or {})
elif k in other:
output[k] = deepcopy(vo)
else:
output[k] = deepcopy(vb)
return output
def swh_config_paths(base_filename: str) -> List[str]:
"""Return the Software Heritage specific configuration paths for the given
filename."""
return [os.path.join(dirname, base_filename) for dirname in SWH_CONFIG_DIRECTORIES]
def prepare_folders(conf, *keys):
"""Prepare the folder mentioned in config under keys."""
def makedir(folder):
if not os.path.exists(folder):
os.makedirs(folder)
for key in keys:
makedir(conf[key])
def load_global_config():
"""Load the global Software Heritage config"""
return priority_read(
swh_config_paths(SWH_GLOBAL_CONFIG),
SWH_DEFAULT_GLOBAL_CONFIG,
)
def load_named_config(name, default_conf=None, global_conf=True):
"""Load the config named `name` from the Software Heritage
configuration paths.
If global_conf is True (default), read the global configuration
too.
"""
conf = {}
if global_conf:
conf.update(load_global_config())
conf.update(priority_read(swh_config_paths(name), default_conf))
return conf
def load_from_envvar(default_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Load configuration yaml file from the environment variable SWH_CONFIG_FILENAME,
eventually enriched with default configuration key/value from the default_config
dict if provided.
Returns:
Configuration dict
Raises:
AssertionError if SWH_CONFIG_FILENAME is undefined
"""
assert (
"SWH_CONFIG_FILENAME" in os.environ
), "SWH_CONFIG_FILENAME environment variable is undefined."
cfg_path = os.environ["SWH_CONFIG_FILENAME"]
cfg = read_raw_config(cfg_path)
cfg = merge_configs(default_config or {}, cfg)
return cfg