Mercurial > hgrepos > Python > libs > ConfigMix
view configmix/__init__.py @ 654:0d6673d06c2c
Add support for using "tomllib" (in Python's stdlib since 3.11) and "tomli" TOML packages.
They are preferred if they are found to be installed.
But note that the declared dependency for the "toml" extra nevertheless
is the "toml" package. Because it is available for all supported Python
versions.
So use Python 3.11+ or install "tomli" manually if you want to use the
alternate packages.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Thu, 19 May 2022 22:10:59 +0200 |
| parents | e73cf5593010 |
| children | 02a21e689fc1 |
line wrap: on
line source
# -*- coding: utf-8 -*- """A library for helping with configuration files. :Author: Franz Glasner :Copyright: (c) 2015–2022, Franz Glasner. All rights reserved. :License: BSD 3-Clause "New" or "Revised" License. See LICENSE.txt for details. :ID: @(#) $Header$ """ from __future__ import division, print_function, absolute_import __version__ = "0.20.5" __revision__ = "|VCSRevision|" __date__ = "|VCSJustDate|" __all__ = ["load", "safe_load", "set_assoc", "get_assoc", "clear_assoc", "get_default_assoc", "Configuration", "try_determine_filemode"] import fnmatch import copy import io import os import re from .compat import u2fs from .config import Configuration, quote, unquote, pathstr2path # noqa: F401 from . import constants def load(*files, **kwargs): """Load the given configuration files, merge them in the given order and return the resulting configuration dictionary. :param files: the filenames of the configuration files to read and merge; if a filename starts with ``<dir>`` then the name is interpreted as directory and all files are loaded in sorted order (non-resursively, ignoring unknown filetypes) :keyword defaults: optional configuration dictionary with some default settings where the settings from `files` are merged into :type defaults: dict-alike or None :keyword extras: optional configuration dictionary that will applied last Use this for example to overwrite configuration file settings from commandline arguments. :type extras: dict-alike or None :keyword strict: enable strict parsing mode for parsers that support it (e.g. to prevent duplicate keys) :type strict: bool :returns: the configuration :rtype: ~configmix.config.Configuration """ defaults = kwargs.get("defaults") extras = kwargs.get("extras") strict = kwargs.get("strict", False) if defaults is None: ex = Configuration() else: ex = merge(None, Configuration(defaults)) for f in files: if f.startswith(constants.DIR_PREFIX): for f2 in _get_configuration_files_from_dir(f[5:]): nx = _load_cfg_from_file(f2, ignore_unknown=True, strict=strict) if nx is not None: ex = merge(nx, ex) else: nx = _load_cfg_from_file(f, strict=strict) if nx is not None: ex = merge(nx, ex) if extras: ex = merge(Configuration(extras), ex) return Configuration(ex) def safe_load(*files, **kwargs): """Analogous to :func:`load` but do merging with :func:`safe_merge` instead of :func:`.merge` """ defaults = kwargs.get("defaults") extras = kwargs.get("extras") strict = kwargs.get("strict", False) if defaults is None: ex = Configuration() else: ex = safe_merge(None, Configuration(defaults)) for f in files: if f.startswith(constants.DIR_PREFIX): for f2 in _get_configuration_files_from_dir(f[5:]): nx = _load_cfg_from_file(f2, ignore_unknown=True, strict=strict) if nx is not None: ex = safe_merge(nx, ex) else: nx = _load_cfg_from_file(f, strict=strict) if nx is not None: ex = safe_merge(nx, ex) if extras: ex = safe_merge(Configuration(extras), ex) return Configuration(ex) def _get_configuration_files_from_dir(root): """Returns the sorted list of files within directory `root` """ files = [] if not os.path.isdir(root): return files dirfiles = os.listdir(root) dirfiles.sort() for f in dirfiles: path = os.path.join(root, f) if os.path.isdir(path): # XXX TBD Recurse??? depth-first??? continue files.append(path) return files def _load_yaml(filename, strict=False): from . import yaml with open(u2fs(filename), "rb") as yf: return yaml.safe_load(yf, strict=strict) def _load_json(filename, strict=False): from . import json return json.load(filename) def _load_py(filename, strict=False): from . import py return py.load(filename) def _load_ini(filename, strict=False): from . import ini return ini.load(filename) def _load_toml(filename, strict=False): from . import toml return toml.load(filename) def _load_ignore(filename, strict=False): """A loader that returns `None` just to ignore `filename`""" return None EMACS_MODELINE = re.compile(r"-\*-(.*?)-\*-") EMACS_MODE = re.compile(r"(?:\A\s*|;\s*)mode[:=]\s*([-_.a-zA-Z0-9]+)") def try_determine_filemode(filename): """Try to determine an explicitely given filemode from an Emacs-compatible mode declaration (e.g. ``mode=python``). :param str filename: :return: the found mode string or `None` :rtype: str or None Only the first two lines are searched for. Conveniently to be used in calls to :func:`~.set_assoc` to determine the file-mode by content instead of filename extension. """ with io.open(filename, encoding="ascii", errors="replace") as f: idx = 0 for l in f: idx += 1 mo = EMACS_MODELINE.search(l) if mo: mo = EMACS_MODE.search(mo.group(1)) if mo: return mo.group(1) if idx >= 2: break return None DEFAULT_MODE_LOADERS = { "python": _load_py, "yaml": _load_yaml, "conf": _load_ini, "conf-windows": _load_ini, "ini": _load_ini, "toml": _load_toml, "conf-toml": _load_toml, "javascript": _load_json, "json": _load_json, "-*-ignore-*-": _load_ignore, "-*- ignore -*-": _load_ignore, } """Default associations between file modes and loader functions""" DEFAULT_ASSOC = [ ("*.yml", "yaml"), ("*.yaml", "yaml"), ("*.json", "json"), ("*.py", "python"), ("*.ini", "conf"), ("*.toml", "toml"), ] """The builtin default associations of filename extensions with file modes -- in that order. The "mode" part may be a string or a callable with a filename parameter that returns the mode string for the file or `None` if it can not determined. """ USE_DEFAULT_ASSOC = object() """Marker for the default association for an extension. To be used in :func:`.set_assoc`. """ def get_default_assoc(pattern): """Return the default file-mode association for the :mod:`fnmatch` pattern `pattern`. :raises: :class:`KeyError` if the `pattern` is not found. """ for pat, fmode in DEFAULT_ASSOC: if pattern == pat: return fmode else: raise KeyError("No loader for pattern %r" % pattern) mode_loaders = {} """All configured associations between file modes and loader functions. See :data:`.DEFAULT_MODE_LOADERS`. """ _extensions = [] """All configured assiciations of filename extensions with file modes. See :data:`DEFAULT_ASSOC` """ def clear_assoc(): """Remove all configured loader associations. The :data:`.DEFAULT_ASSOC` are **not** changed. """ del _extensions[:] def get_assoc(pattern): """Return the default loader for the :mod:`fnmatch` pattern `pattern`. :raises: :class:`KeyError` if the `pattern` is not found. """ for pat, fmode in _extensions: if pattern == pat: return fmode else: raise KeyError("No associated file-mode for pattern %r" % pattern) def set_assoc(fnpattern, mode, append=False): """Associate a :mod:`fnmatch` style pattern `fnpattern` with a file-mode `mode` that determines what will be called when :func:`load` encounters a file argument that matches `fnpattern`. :param str fnpattern: the :mod:`fnmatch` pattern to associate a loader with :param mode: a mode string or a callable that accepts a `filename` argument and returns a file-mode for the given file (or `None`) :type mode: str or callable :keyword bool append: If `False` (which is the default) then this function inserts the given pattern at the head position of the currently defined associations, if `True` the pattern will be appended The OS specific case-sensitivity behaviour of :func:`fnmatch.fnmatch` applies (i.e. :func:`os.path.normpath` will be called for both arguments). If `loader` is :data:`.USE_DEFAULT_ASSOC` then the default association from :data:`.DEFAULT_ASSOC` will be used -- if any. """ if mode is USE_DEFAULT_ASSOC: for p, m in DEFAULT_ASSOC: if p == fnpattern: if append: _extensions.append((fnpattern, m)) else: _extensions.insert(0, (fnpattern, m)) break else: raise ValueError("no DEFAULT mode for pattern: %r" % fnpattern) else: if append: _extensions.append((fnpattern, mode)) else: _extensions.insert(0, (fnpattern, mode)) def del_assoc(fnpattern): """Remove all associations for `fnpattern`. :param str fnpattern: the :mod:`fnmatch` pattern to associate a loader with """ while True: for i in range(len(_extensions)): pat, fmode = _extensions[i] if fnpattern == pat: del _extensions[i] break # restart else: return # nothing deleted -> done def _load_cfg_from_file(filename, ignore_unknown=False, strict=False): """Determine the loader for file `filename` and return the loaded configuration dict. If `ignore_unknown` is `True` then unknown extensions are ignored. Otherwise a :exc:`ValueError` exception is raised. Can return `None` is the file should be ignored by the caller. """ for p, m in _extensions: if fnmatch.fnmatch(filename, p): if callable(m): m = m(filename) if m is None: continue return mode_loaders[m](filename, strict=strict) else: if ignore_unknown: return None else: raise ValueError("Unknown configuration file type for filename " "%r" % filename) if 0: # # From: https://github.com/jet9/python-yconfig/blob/master/yconfig.py # License: BSD License # def dict_merge(a, b): """Recursively merges dict's. not just simple a['key'] = b['key'], if both a and bhave a key who's value is a dict then dict_merge is called on both values and the result stored in the returned dictionary.""" if not isinstance(b, dict): return b result = deepcopy(a) # noqa for k, v in b.iteritems(): if k in result and isinstance(result[k], dict): result[k] = dict_merge(result[k], v) else: result[k] = deepcopy(v) # noqa return result def merge(user, default, filter_comments=True): """Logically merge the configuration in `user` into `default`. :param ~configmix.config.Configuration user: the new configuration that will be logically merged into `default` :param ~configmix.config.Configuration default: the base configuration where `user` is logically merged into :param bool filter_comments: flag whether to filter comment keys that start with any of the items in :data:`.COMMENTS` :returns: `user` with the necessary amendments from `default`. If `user` is ``None`` then `default` is returned. .. note:: The configuration in `user` is augmented/changed **inplace**. The configuration in `default` will be changed **inplace** when filtering out comments (which is the default). If a value in `user` is equal to :data:`.constants.DEL_VALUE` (``{{::DEL::}}``) the corresponding key will be deleted from the merged output. From http://stackoverflow.com/questions/823196/yaml-merge-in-python """ if user is None: if filter_comments: _filter_comments(default) _filter_deletions(default) return default if filter_comments: _filter_comments(user) if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if filter_comments and _is_comment(k): continue if k in user: if isinstance(user, Configuration): ukv = user.getitem_ns(k) else: ukv = user[k] if ukv == constants.DEL_VALUE: # do not copy del user[k] else: user[k] = _merge(ukv, v, filter_comments) else: user[k] = v _filter_deletions(user) return user def _merge(user, default, filter_comments): """Recursion helper for :func:`.merge` """ if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if filter_comments and _is_comment(k): continue if k in user: if isinstance(user, Configuration): ukv = user.getitem_ns(k) else: ukv = user[k] if ukv == constants.DEL_VALUE: # do not copy del user[k] else: user[k] = _merge(ukv, v, filter_comments) else: user[k] = v return user def safe_merge(user, default, filter_comments=True): """A more safe version of :func:`.merge` that makes deep copies of the returned container objects. Contrary to :func:`.merge` no given argument is ever changed inplace. Every object from `default` is decoupled from the result -- so changing the `default` configuration later does not propagate into a merged configuration later. """ if user is None: if filter_comments: _filter_comments(default) _filter_deletions(default) return copy.deepcopy(default) user = copy.deepcopy(user) if filter_comments: _filter_comments(user) if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if filter_comments and _is_comment(k): continue if k in user: if isinstance(user, Configuration): ukv = user.getitem_ns(k) else: ukv = user[k] if ukv == constants.DEL_VALUE: # do not copy del user[k] else: user[k] = _safe_merge(ukv, v, filter_comments) else: user[k] = copy.deepcopy(v) _filter_deletions(user) return user def _safe_merge(user, default, filter_comments): """Recursion helper for :func:`safe_merge` """ if isinstance(user, dict) and isinstance(default, dict): for k, v in default.items(): if filter_comments and _is_comment(k): continue if k in user: if isinstance(user, Configuration): ukv = user.getitem_ns(k) else: ukv = user[k] if ukv == constants.DEL_VALUE: # do not copy del user[k] else: user[k] = _safe_merge(ukv, v, filter_comments) else: user[k] = copy.deepcopy(v) return user def _filter_comments(d): """Recursively filter comments keys in the dict `d`. Comment keys are keys that start with any of the items in :data:`.constants.COMMENTS`. """ if not isinstance(d, dict): return # use a copy of the keys because we change `d` while iterating if isinstance(d, Configuration): for k in list(d.keys()): if _is_comment(k): del d[k] else: dk = d.getitem_ns(k) if isinstance(dk, dict): _filter_comments(dk) else: for k in list(d.keys()): if _is_comment(k): del d[k] else: if isinstance(d[k], dict): _filter_comments(d[k]) def _is_comment(k): for i in constants.COMMENTS: try: if k.startswith(i): return True except AttributeError: # non-string key return False return False def _filter_deletions(d): """Recursively filter deletions in the dict `d`. Deletions have values that equal :data:`.constants.DEL_VALUE`. """ if not isinstance(d, dict): return # use a copy of the items because we change `d` while iterating if isinstance(d, Configuration): for k, v in list(d.items()): if v == constants.DEL_VALUE: del d[k] else: dk = d.getitem_ns(k) if isinstance(dk, dict): _filter_deletions(dk) else: for k, v in list(d.items()): if v == constants.DEL_VALUE: del d[k] else: if isinstance(d[k], dict): _filter_deletions(d[k]) # # Init loader defaults: mode->loader and extension->mode # mode_loaders.update(DEFAULT_MODE_LOADERS) for _pattern, _mode in DEFAULT_ASSOC: set_assoc(_pattern, _mode)
