Mercurial > hgrepos > Python > libs > ConfigMix
view configmix/__init__.py @ 657:213f0ec3bbbc
Test list access with negative indexes
| author | Franz Glasner <f.glasner@feldmann-mg.com> |
|---|---|
| date | Mon, 30 May 2022 13:19:19 +0200 |
| parents | e73cf5593010 |
| children | 02a21e689fc1 |
line wrap: on
line source
# -*- coding: utf-8 -*- """A library for helping with configuration files. :Author: Franz Glasner :Copyright: (c) 2015–2022, Franz Glasner. All rights reserved. :License: BSD 3-Clause "New" or "Revised" License. See LICENSE.txt for details. :ID: @(#) $Header$ """ from __future__ import division, print_function, absolute_import __version__ = "0.20.5" __revision__ = "|VCSRevision|" __date__ = "|VCSJustDate|" __all__ = ["load", "safe_load", "set_assoc", "get_assoc", "clear_assoc", "get_default_assoc", "Configuration", "try_determine_filemode"] import fnmatch import copy import io import os import re from .compat import u2fs from .config import Configuration, quote, unquote, pathstr2path # noqa: F401 from . import constants def load(*files, **kwargs): """Load the given configuration files, merge them in the given order and return the resulting configuration dictionary. :param files: the filenames of the configuration files to read and merge; if a filename starts with ``<dir>`` then the name is interpreted as directory and all files are loaded in sorted order (non-resursively, ignoring unknown filetypes) :keyword defaults: optional configuration dictionary with some default settings where the settings from `files` are merged into :type defaults: dict-alike or None :keyword extras: optional configuration dictionary that will applied last Use this for example to overwrite configuration file settings from commandline arguments. :type extras: dict-alike or None :keyword strict: enable strict parsing mode for parsers that support it (e.g. to prevent duplicate keys) :type strict: bool :returns: the configuration :rtype: ~configmix.config.Configuration """ defaults = kwargs.get("defaults") extras = kwargs.get("extras") strict = kwargs.get("strict", False) if defaults is None: ex = Configuration() else: ex = merge(None, Configuration(defaults)) for f in files: if f.startswith(constants.DIR_PREFIX): for f2 in _get_configuration_files_from_dir(f[5:]): nx = _load_cfg_from_file(f2, ignore_unknown=True, strict=strict) if nx is not None: ex = merge(nx, ex) else: nx = _load_cfg_from_file(f, strict=strict) if nx is not None: ex = merge(nx, ex) if extras: ex = merge(Configuration(extras), ex) return Configuration(ex) def safe_load(*files, **kwargs): """Analogous to :func:`load` but do merging with :func:`safe_merge` instead of :func:`.merge` """ defaults = kwargs.get("defaults") extras = kwargs.get("extras") strict = kwargs.get("strict", False) if defaults is None: ex = Configuration() else: ex = safe_merge(None, Configuration(defaults)) for f in files: if f.startswith(constants.DIR_PREFIX): for f2 in _get_configuration_files_from_dir(f[5:]): nx = _load_cfg_from_file(f2, ignore_unknown=True, strict=strict) if nx is not None: ex = safe_merge(nx, ex) else: nx = _load_cfg_from_file(f, strict=strict) if nx is not None: ex = safe_merge(nx, ex) if extras: ex = safe_merge(Configuration(extras), ex) return Configuration(ex) def _get_configuration_files_from_dir(root): """Returns the sorted list of files within directory `root` """ files = [] if not os.path.isdir(root): return files dirfiles = os.listdir(root) dirfiles.sort() for f in dirfiles: path = os.path.join(root, f) if os.path.isdir(path): # XXX TBD Recurse??? depth-first??? continue files.append(path) return files def _load_yaml(filename, strict=False): from . import yaml with open(u2fs(filename), "rb") as yf: return yaml.safe_load(yf, strict=strict) def _load_json(filename, strict=False): from . import json return json.load(filename) def _load_py(filename, strict=False): from . import py return py.load(filename) def _load_ini(filename, strict=False): from . import ini return ini.load(filename) def _load_toml(filename, strict=False): from . import toml return toml.load(filename) def _load_ignore(filename, strict=False): """A loader that returns `None` just to ignore `filename`""" return None EMACS_MODELINE = re.compile(r"-\*-(.*?)-\*-") EMACS_MODE = re.compile(r"(?:\A\s*|;\s*)mode[:=]\s*([-_.a-zA-Z0-9]+)") def try_determine_filemode(filename): """Try to determine an explicitely given filemode from an Emacs-compatible mode declaration (e.g. ``mode=python``). :param str filename: :return: the found mode string or `None` :rtype: str or None Only the first two lines are searched for. Conveniently to be used in calls to :func:`~.set_assoc` to determine the file-mode by content instead of filename extension. """ with io.open(filename, encoding="ascii", errors="replace") as f: idx = 0 for l in f: idx += 1 mo = EMACS_MODELINE.search(l) if mo: mo = EMACS_MODE.search(mo.group(1)) if mo: return mo.group(1) if idx >= 2: break return None DEFAULT_MODE_LOADERS = { "python": _load_py, "yaml": _load_yaml, "conf": _load_ini, "conf-windows": _load_ini, "ini": _load_ini, "toml": _load_toml, "conf-toml": _load_toml, "javascript": _load_json, "json": _load_json, "-*-ignore-*-": _load_ignore, "-*- ignore -*-": _load_ignore, } """Default associations between file modes and loader functions""" DEFAULT_ASSOC = [ ("*.yml", "yaml"), ("*.yaml", "yaml"), ("*.json", "json"), ("*.py", "python"), ("*.ini", "conf"), ("*.toml", "toml"), ] """The builtin default associations of filename extensions with file modes -- in that order. The "mode" part may be a string or a callable with a filename parameter that returns the mode string for the file or `None` if it can not determined. """ USE_DEFAULT_ASSOC = object() """Marker for the default association for an extension. To be used in :func:`.set_assoc`. """ def get_default_assoc(pattern): """Return the default file-mode association for the :mod:`fnmatch` pattern `pattern`. :raises: :class:`KeyError` if the `pattern` is not found. """ for pat, fmode in DEFAULT_ASSOC: if pattern == pat: return fmode else: raise KeyError("No loader for pattern %r" % pattern) mode_loaders = {} """All configured associations between file modes and loader functions. See :data:`.DEFAULT_MODE_LOADERS`. """ _extensions = [] """All configured assiciations of filename extensions with file modes. See :data:`DEFAULT_ASSOC` """ def clear_assoc(): """Remove all configured loader associations. The :data:`.DEFAULT_ASSOC` are **not** changed. """ del _extensions[:] def get_assoc(pattern): """Return the default loader for the :mod:`fnmatch` pattern `pattern`. :raises: :class:`KeyError` if the `pattern` is not found. """ for pat, fmode in _extensions: if pattern == pat: return fmode else: raise KeyError("No associated file-mode for pattern %r" % pattern) def set_assoc(fnpattern, mode, append=False): """Associate a :mod:`fnmatch` style pattern `fnpattern` with a file-mode `mode` that determines what will be called when :func:`load` encounters a file argument that matches `fnpattern`. :param str fnpattern: the :mod:`fnmatch` pattern to associate a loader with :param mode: a mode string or a callable that accepts a `filename` argument and returns a file-mode for the given file (or `None`) :type mode: str or callable :keyword bool append: If `False` (which is the default) then this function inserts the given pattern at the head position of the currently defined associations, if `True` the pattern will be appended The OS specific case-sensitivity behaviour of :func:`fnmatch.fnmatch` applies (i.e. :func:`os.path.normpath` will be called for both arguments). If `loader` is :data:`.USE_DEFAULT_ASSOC` then the default association from :data:`.DEFAULT_ASSOC` will be used -- if any. """ if mode is USE_DEFAULT_ASSOC: for p, m in DEFAULT_ASSOC: if p == fnpattern: if append: _extensions.append((fnpattern, m)) else: _extensions.insert(0, (fnpattern, m)) break else: raise ValueError("no DEFAULT mode for pattern: %r" % fnpattern) else: if append: _extensions.append((fnpattern, mode)) else: _extensions.insert(0, (fnpattern, mode)) def del_assoc(fnpattern): """Remove all associations for `fnpattern`. :param str fnpattern: the :mod:`fnmatch` pattern to associate a loader with """ while True: for i in range(len(_extensions)): pat, fmode = _extensions[i] if fnpattern == pat: del _extensions[i] break # restart else: return # nothing deleted -> done def _load_cfg_from_file(filename, ignore_unknown=False, strict=False): """Determine the loader for file `filename` and return the loaded configuration dict. If `ignore_unknown` is `True` then unknown extensions are ignored. Otherwise a :exc:`ValueError` exception is raised. Can return `None` is the file should be ignored by the caller. """ for p, m in _extensions: if fnmatch.fnmatch(filename, p): if callable(m): m = m(filename) if m is None: continue return mode_loaders[m](filename, strict=strict) else: if ignore_unknown: return None else: raise ValueError("Unknown configuration file type for filename " "%r" % filename) if 0: # # From: https://github.com/jet9/python-yconfig/blob/master/yconfig.py # License: BSD License # def dict_merge(a, b): """Recursively merges dict's. not just simple a['key'] = b['key'], if both a and bhave a key who's value is a dict then dict_merge is called on both values and the result stored in the returned dictionary.""" if not isinstance(b, dict): return b result = deepcopy(a) # noqa for k, v in b.iteritems(): if k in result and isinstance(result[k], dict): result[k] = dict_merge(result[k], v) else: result[k] = deepcopy(v) # noqa return result def merge(user, default, filter_comments=True): """Logically merge the configuration in `user` into `default`. :param ~configmix.config.Configuration user: the new configuration that will be logically merged into `default` :param ~configmix.config.Configuration default: the base configuration where `user` is logically merged into :param bool filter_comments: flag whether to filter comment keys that start with any of the items in :data:`.COMMENTS` :returns: `user` with the necessary amendments from `default`. If `user` is ``None`` then `default` is returned. .. note:: The configuration in `user` is augmented/changed **inplace**. The configuration in `default` will be changed **inplace** when filtering out comments (which is the default). If a value in `user` is equal to :data:`.constants.DEL_VALUE` (``{{::DEL::}}``) the corresponding key will be deleted from the merged output. From http://stackoverflow.com/questions/823196/yaml-merge-in-python """ if user is None: if filter_comments: _filter_comments(default) _filter_deletions(default) return default if filter_comments: _filter_comments(user) if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if filter_comments and _is_comment(k): continue if k in user: if isinstance(user, Configuration): ukv = user.getitem_ns(k) else: ukv = user[k] if ukv == constants.DEL_VALUE: # do not copy del user[k] else: user[k] = _merge(ukv, v, filter_comments) else: user[k] = v _filter_deletions(user) return user def _merge(user, default, filter_comments): """Recursion helper for :func:`.merge` """ if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if filter_comments and _is_comment(k): continue if k in user: if isinstance(user, Configuration): ukv = user.getitem_ns(k) else: ukv = user[k] if ukv == constants.DEL_VALUE: # do not copy del user[k] else: user[k] = _merge(ukv, v, filter_comments) else: user[k] = v return user def safe_merge(user, default, filter_comments=True): """A more safe version of :func:`.merge` that makes deep copies of the returned container objects. Contrary to :func:`.merge` no given argument is ever changed inplace. Every object from `default` is decoupled from the result -- so changing the `default` configuration later does not propagate into a merged configuration later. """ if user is None: if filter_comments: _filter_comments(default) _filter_deletions(default) return copy.deepcopy(default) user = copy.deepcopy(user) if filter_comments: _filter_comments(user) if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if filter_comments and _is_comment(k): continue if k in user: if isinstance(user, Configuration): ukv = user.getitem_ns(k) else: ukv = user[k] if ukv == constants.DEL_VALUE: # do not copy del user[k] else: user[k] = _safe_merge(ukv, v, filter_comments) else: user[k] = copy.deepcopy(v) _filter_deletions(user) return user def _safe_merge(user, default, filter_comments): """Recursion helper for :func:`safe_merge` """ if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if filter_comments and _is_comment(k): continue if k in user: if isinstance(user, Configuration): ukv = user.getitem_ns(k) else: ukv = user[k] if ukv == constants.DEL_VALUE: # do not copy del user[k] else: user[k] = _safe_merge(ukv, v, filter_comments) else: user[k] = copy.deepcopy(v) return user def _filter_comments(d): """Recursively filter comments keys in the dict `d`. Comment keys are keys that start with any of the items in :data:`.constants.COMMENTS`. """ if not isinstance(d, dict): return # use a copy of the keys because we change `d` while iterating if isinstance(d, Configuration): for k in list(d.keys()): if _is_comment(k): del d[k] else: dk = d.getitem_ns(k) if isinstance(dk, dict): _filter_comments(dk) else: for k in list(d.keys()): if _is_comment(k): del d[k] else: if isinstance(d[k], dict): _filter_comments(d[k]) def _is_comment(k): for i in constants.COMMENTS: try: if k.startswith(i): return True except AttributeError: # non-string key return False return False def _filter_deletions(d): """Recursively filter deletions in the dict `d`. Deletions have values that equal :data:`.constants.DEL_VALUE`. """ if not isinstance(d, dict): return # use a copy of the items because we change `d` while iterating if isinstance(d, Configuration): for k, v in list(d.items()): if v == constants.DEL_VALUE: del d[k] else: dk = d.getitem_ns(k) if isinstance(dk, dict): _filter_deletions(dk) else: for k, v in list(d.items()): if v == constants.DEL_VALUE: del d[k] else: if isinstance(d[k], dict): _filter_deletions(d[k]) # # Init loader defaults: mode->loader and extension->mode # mode_loaders.update(DEFAULT_MODE_LOADERS) for _pattern, _mode in DEFAULT_ASSOC: set_assoc(_pattern, _mode)
