view configmix/__init__.py @ 654:0d6673d06c2c

Add support for using "tomllib" (in Python's stdlib since 3.11) and "tomli" TOML packages. They are preferred if they are found to be installed. But note that the declared dependency for the "toml" extra nevertheless is the "toml" package. Because it is available for all supported Python versions. So use Python 3.11+ or install "tomli" manually if you want to use the alternate packages.
author Franz Glasner <fzglas.hg@dom66.de>
date Thu, 19 May 2022 22:10:59 +0200
parents e73cf5593010
children 02a21e689fc1
line wrap: on
line source

# -*- coding: utf-8 -*-
"""A library for helping with configuration files.

:Author:    Franz Glasner
:Copyright: (c) 2015–2022, Franz Glasner.
            All rights reserved.
:License:   BSD 3-Clause "New" or "Revised" License.
            See LICENSE.txt for details.
:ID:        @(#) $Header$

"""

from __future__ import division, print_function, absolute_import


__version__ = "0.20.5"

__revision__ = "|VCSRevision|"
__date__ = "|VCSJustDate|"

__all__ = ["load", "safe_load",
           "set_assoc", "get_assoc", "clear_assoc",
           "get_default_assoc",
           "Configuration",
           "try_determine_filemode"]


import fnmatch
import copy
import io
import os
import re

from .compat import u2fs
from .config import Configuration, quote, unquote, pathstr2path  # noqa: F401
from . import constants


def load(*files, **kwargs):
    """Load the given configuration files, merge them in the given order
    and return the resulting configuration dictionary.

    :param files: the filenames of the configuration files to read and merge;
                  if a filename starts with ``<dir>`` then the name is
                  interpreted as directory and all files are loaded in
                  sorted order (non-resursively, ignoring unknown filetypes)
    :keyword defaults: optional configuration dictionary with some default
                       settings where the settings from `files` are merged
                       into
    :type defaults: dict-alike or None
    :keyword extras: optional configuration dictionary that will applied
                     last

                     Use this for example to overwrite configuration file
                     settings from commandline arguments.
    :type extras: dict-alike or None
    :keyword strict: enable strict parsing mode for parsers that support it
                     (e.g. to prevent duplicate keys)
    :type strict: bool
    :returns: the configuration
    :rtype: ~configmix.config.Configuration

    """
    defaults = kwargs.get("defaults")
    extras = kwargs.get("extras")
    strict = kwargs.get("strict", False)
    if defaults is None:
        ex = Configuration()
    else:
        ex = merge(None, Configuration(defaults))
    for f in files:
        if f.startswith(constants.DIR_PREFIX):
            for f2 in _get_configuration_files_from_dir(f[5:]):
                nx = _load_cfg_from_file(f2, ignore_unknown=True, strict=strict)
                if nx is not None:
                    ex = merge(nx, ex)
        else:
            nx = _load_cfg_from_file(f, strict=strict)
            if nx is not None:
                ex = merge(nx, ex)
    if extras:
        ex = merge(Configuration(extras), ex)
    return Configuration(ex)


def safe_load(*files, **kwargs):
    """Analogous to :func:`load` but do merging with :func:`safe_merge`
    instead of :func:`.merge`

    """
    defaults = kwargs.get("defaults")
    extras = kwargs.get("extras")
    strict = kwargs.get("strict", False)
    if defaults is None:
        ex = Configuration()
    else:
        ex = safe_merge(None, Configuration(defaults))
    for f in files:
        if f.startswith(constants.DIR_PREFIX):
            for f2 in _get_configuration_files_from_dir(f[5:]):
                nx = _load_cfg_from_file(f2, ignore_unknown=True, strict=strict)
                if nx is not None:
                    ex = safe_merge(nx, ex)
        else:
            nx = _load_cfg_from_file(f, strict=strict)
            if nx is not None:
                ex = safe_merge(nx, ex)
    if extras:
        ex = safe_merge(Configuration(extras), ex)
    return Configuration(ex)


def _get_configuration_files_from_dir(root):
    """Returns the sorted list of files within directory `root`

    """
    files = []

    if not os.path.isdir(root):
        return files

    dirfiles = os.listdir(root)
    dirfiles.sort()
    for f in dirfiles:
        path = os.path.join(root, f)
        if os.path.isdir(path):
            # XXX TBD Recurse??? depth-first???
            continue
        files.append(path)
    return files


def _load_yaml(filename, strict=False):
    from . import yaml
    with open(u2fs(filename), "rb") as yf:
        return yaml.safe_load(yf, strict=strict)


def _load_json(filename, strict=False):
    from . import json
    return json.load(filename)


def _load_py(filename, strict=False):
    from . import py
    return py.load(filename)


def _load_ini(filename, strict=False):
    from . import ini
    return ini.load(filename)


def _load_toml(filename, strict=False):
    from . import toml
    return toml.load(filename)


def _load_ignore(filename, strict=False):
    """A loader that returns `None` just to ignore `filename`"""
    return None


EMACS_MODELINE = re.compile(r"-\*-(.*?)-\*-")
EMACS_MODE = re.compile(r"(?:\A\s*|;\s*)mode[:=]\s*([-_.a-zA-Z0-9]+)")


def try_determine_filemode(filename):
    """Try to determine an explicitely given filemode from an Emacs-compatible
    mode declaration (e.g. ``mode=python``).

    :param str filename:
    :return: the found mode string or `None`
    :rtype: str or None

    Only the first two lines are searched for.

    Conveniently to be used in calls to :func:`~.set_assoc` to determine
    the file-mode by content instead of filename extension.

    """
    with io.open(filename, encoding="ascii", errors="replace") as f:
        idx = 0
        for l in f:
            idx += 1
            mo = EMACS_MODELINE.search(l)
            if mo:
                mo = EMACS_MODE.search(mo.group(1))
                if mo:
                    return mo.group(1)
            if idx >= 2:
                break
    return None


DEFAULT_MODE_LOADERS = {
    "python": _load_py,
    "yaml": _load_yaml,
    "conf": _load_ini,
    "conf-windows": _load_ini,
    "ini": _load_ini,
    "toml": _load_toml,
    "conf-toml": _load_toml,
    "javascript": _load_json,
    "json": _load_json,
    "-*-ignore-*-": _load_ignore,
    "-*- ignore -*-": _load_ignore,
}
"""Default associations between file modes and loader functions"""


DEFAULT_ASSOC = [
    ("*.yml", "yaml"),
    ("*.yaml", "yaml"),
    ("*.json", "json"),
    ("*.py", "python"),
    ("*.ini", "conf"),
    ("*.toml", "toml"),
]
"""The builtin default associations of filename extensions with
file modes -- in that order.

The "mode" part may be a string or a callable with a filename
parameter that returns the mode string for the file or `None` if it
can not determined.

"""


USE_DEFAULT_ASSOC = object()
"""Marker for the default association for an extension.

To be used in :func:`.set_assoc`.
"""


def get_default_assoc(pattern):
    """Return the default file-mode association for the :mod:`fnmatch`
    pattern `pattern`.

    :raises: :class:`KeyError` if the `pattern` is not found.

    """
    for pat, fmode in DEFAULT_ASSOC:
        if pattern == pat:
            return fmode
    else:
        raise KeyError("No loader for pattern %r" % pattern)


mode_loaders = {}
"""All configured associations between file modes and loader functions.

See :data:`.DEFAULT_MODE_LOADERS`.

"""

_extensions = []
"""All configured assiciations of filename extensions with file modes.

See :data:`DEFAULT_ASSOC`

"""


def clear_assoc():
    """Remove all configured loader associations.

    The :data:`.DEFAULT_ASSOC` are **not** changed.

    """
    del _extensions[:]


def get_assoc(pattern):
    """Return the default loader for the :mod:`fnmatch` pattern `pattern`.

    :raises: :class:`KeyError` if the `pattern` is not found.

    """
    for pat, fmode in _extensions:
        if pattern == pat:
            return fmode
    else:
        raise KeyError("No associated file-mode for pattern %r" % pattern)


def set_assoc(fnpattern, mode, append=False):
    """Associate a :mod:`fnmatch` style pattern `fnpattern` with a
    file-mode `mode` that determines what will be called when
    :func:`load` encounters a file argument that matches `fnpattern`.

    :param str fnpattern: the :mod:`fnmatch` pattern to associate a loader
                          with
    :param mode: a mode string or a callable that accepts a `filename`
                 argument and returns a file-mode for the given file
                 (or `None`)
    :type mode: str or callable

    :keyword bool append: If `False` (which is the default) then this
        function inserts the given pattern at the head position of the
        currently defined associations, if `True` the pattern will be appended

    The OS specific case-sensitivity behaviour of
    :func:`fnmatch.fnmatch` applies (i.e. :func:`os.path.normpath`
    will be called for both arguments).

    If `loader` is :data:`.USE_DEFAULT_ASSOC` then the default association
    from :data:`.DEFAULT_ASSOC` will be used -- if any.

    """
    if mode is USE_DEFAULT_ASSOC:
        for p, m in DEFAULT_ASSOC:
            if p == fnpattern:
                if append:
                    _extensions.append((fnpattern, m))
                else:
                    _extensions.insert(0, (fnpattern, m))
                break
        else:
            raise ValueError("no DEFAULT mode for pattern: %r" % fnpattern)
    else:
        if append:
            _extensions.append((fnpattern, mode))
        else:
            _extensions.insert(0, (fnpattern, mode))


def del_assoc(fnpattern):
    """Remove all associations for `fnpattern`.

    :param str fnpattern: the :mod:`fnmatch` pattern to associate a loader
                          with

    """
    while True:
        for i in range(len(_extensions)):
            pat, fmode = _extensions[i]
            if fnpattern == pat:
                del _extensions[i]
                break   # restart
        else:
            return      # nothing deleted -> done


def _load_cfg_from_file(filename, ignore_unknown=False, strict=False):
    """Determine the loader for file `filename` and return the loaded
    configuration dict.

    If `ignore_unknown` is `True` then unknown extensions are ignored.
    Otherwise a :exc:`ValueError` exception is raised.

    Can return `None` is the file should be ignored by the caller.

    """
    for p, m in _extensions:
        if fnmatch.fnmatch(filename, p):
            if callable(m):
                m = m(filename)
                if m is None:
                    continue
            return mode_loaders[m](filename, strict=strict)
    else:
        if ignore_unknown:
            return None
        else:
            raise ValueError("Unknown configuration file type for filename "
                             "%r" % filename)


if 0:
    #
    # From: https://github.com/jet9/python-yconfig/blob/master/yconfig.py
    # License: BSD License
    #
    def dict_merge(a, b):
        """Recursively merges dict's. not just simple a['key'] = b['key'], if
        both a and bhave a key who's value is a dict then dict_merge is called
        on both values and the result stored in the returned dictionary."""

        if not isinstance(b, dict):
            return b
        result = deepcopy(a)                     # noqa
        for k, v in b.iteritems():
            if k in result and isinstance(result[k], dict):
                result[k] = dict_merge(result[k], v)
            else:
                result[k] = deepcopy(v)          # noqa
        return result


def merge(user, default, filter_comments=True):
    """Logically merge the configuration in `user` into `default`.

    :param ~configmix.config.Configuration user:
                the new configuration that will be logically merged
                into `default`
    :param ~configmix.config.Configuration default:
                the base configuration where `user` is logically merged into
    :param bool filter_comments: flag whether to filter comment keys that
                   start with any of the items in :data:`.COMMENTS`
    :returns: `user` with the necessary amendments from `default`.
              If `user` is ``None`` then `default` is returned.

    .. note:: The configuration in `user` is augmented/changed
              **inplace**.

              The configuration in `default` will be changed **inplace**
              when filtering out comments (which is the default).

    If a value in `user` is equal to :data:`.constants.DEL_VALUE`
    (``{{::DEL::}}``) the corresponding key will be deleted from the
    merged output.

    From http://stackoverflow.com/questions/823196/yaml-merge-in-python

    """
    if user is None:
        if filter_comments:
            _filter_comments(default)
        _filter_deletions(default)
        return default
    if filter_comments:
        _filter_comments(user)
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k in user:
                if isinstance(user, Configuration):
                    ukv = user.getitem_ns(k)
                else:
                    ukv = user[k]
                if ukv == constants.DEL_VALUE:
                    # do not copy
                    del user[k]
                else:
                    user[k] = _merge(ukv, v, filter_comments)
            else:
                user[k] = v
    _filter_deletions(user)
    return user


def _merge(user, default, filter_comments):
    """Recursion helper for :func:`.merge`

    """
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k in user:
                if isinstance(user, Configuration):
                    ukv = user.getitem_ns(k)
                else:
                    ukv = user[k]
                if ukv == constants.DEL_VALUE:
                    # do not copy
                    del user[k]
                else:
                    user[k] = _merge(ukv, v, filter_comments)
            else:
                user[k] = v
    return user


def safe_merge(user, default, filter_comments=True):
    """A more safe version of :func:`.merge` that makes deep copies of
    the returned container objects.

    Contrary to :func:`.merge` no given argument is ever changed
    inplace. Every object from `default` is decoupled from the result
    -- so changing the `default` configuration later does not propagate
    into a merged configuration later.

    """
    if user is None:
        if filter_comments:
            _filter_comments(default)
        _filter_deletions(default)
        return copy.deepcopy(default)
    user = copy.deepcopy(user)
    if filter_comments:
        _filter_comments(user)
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k in user:
                if isinstance(user, Configuration):
                    ukv = user.getitem_ns(k)
                else:
                    ukv = user[k]
                if ukv == constants.DEL_VALUE:
                    # do not copy
                    del user[k]
                else:
                    user[k] = _safe_merge(ukv, v, filter_comments)
            else:
                user[k] = copy.deepcopy(v)
    _filter_deletions(user)
    return user


def _safe_merge(user, default, filter_comments):
    """Recursion helper for :func:`safe_merge`

    """
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k in user:
                if isinstance(user, Configuration):
                    ukv = user.getitem_ns(k)
                else:
                    ukv = user[k]
                if ukv == constants.DEL_VALUE:
                    # do not copy
                    del user[k]
                else:
                    user[k] = _safe_merge(ukv, v, filter_comments)
            else:
                user[k] = copy.deepcopy(v)
    return user


def _filter_comments(d):
    """Recursively filter comments keys in the dict `d`.

    Comment keys are keys that start with any of the items in
    :data:`.constants.COMMENTS`.

    """
    if not isinstance(d, dict):
        return
    # use a copy of the keys because we change `d` while iterating
    if isinstance(d, Configuration):
        for k in list(d.keys()):
            if _is_comment(k):
                del d[k]
            else:
                dk = d.getitem_ns(k)
                if isinstance(dk, dict):
                    _filter_comments(dk)
    else:
        for k in list(d.keys()):
            if _is_comment(k):
                del d[k]
            else:
                if isinstance(d[k], dict):
                    _filter_comments(d[k])


def _is_comment(k):
    for i in constants.COMMENTS:
        try:
            if k.startswith(i):
                return True
        except AttributeError:
            # non-string key
            return False
    return False


def _filter_deletions(d):
    """Recursively filter deletions in the dict `d`.

    Deletions have values that equal :data:`.constants.DEL_VALUE`.

    """
    if not isinstance(d, dict):
        return
    # use a copy of the items because we change `d` while iterating
    if isinstance(d, Configuration):
        for k, v in list(d.items()):
            if v == constants.DEL_VALUE:
                del d[k]
            else:
                dk = d.getitem_ns(k)
                if isinstance(dk, dict):
                    _filter_deletions(dk)
    else:
        for k, v in list(d.items()):
            if v == constants.DEL_VALUE:
                del d[k]
            else:
                if isinstance(d[k], dict):
                    _filter_deletions(d[k])


#
# Init loader defaults: mode->loader and extension->mode
#
mode_loaders.update(DEFAULT_MODE_LOADERS)
for _pattern, _mode in DEFAULT_ASSOC:
    set_assoc(_pattern, _mode)