view configmix/__init__.py @ 602:a2fff0d93d83

Split up fast_pathstr2path into internal implementation and a simple wrapper
author Franz Glasner <fzglas.hg@dom66.de>
date Tue, 11 Jan 2022 00:52:56 +0100
parents 238e94aacef6
children 0db860d649d1
line wrap: on
line source

# -*- coding: utf-8 -*-
"""A library for helping with configuration files.

:Author:    Franz Glasner
:Copyright: (c) 2015–2022, Franz Glasner.
            All rights reserved.
:License:   BSD 3-Clause "New" or "Revised" License.
            See LICENSE.txt for details.
:ID:        @(#) $Header$

"""

from __future__ import division, print_function, absolute_import


__version__ = "0.20.1"

__revision__ = "|VCSRevision|"
__date__ = "|VCSJustDate|"

__all__ = ["load", "safe_load",
           "set_assoc", "get_assoc", "clear_assoc",
           "get_default_assoc",
           "Configuration",
           "try_determine_filemode"]


import fnmatch
import copy
import io
import os
import re

from .compat import u2fs
from .config import Configuration, quote, unquote, pathstr2path  # noqa: F401
from . import constants


def load(*files, **kwargs):
    """Load the given configuration files, merge them in the given order
    and return the resulting configuration dictionary.

    :param files: the filenames of the configuration files to read and merge;
                  if a filename starts with ``<dir>`` then the name is
                  interpreted as directory and all files are loaded in
                  sorted order (non-resursively, ignoring unknown filetypes)
    :keyword defaults: optional configuration dictionary with some default
                       settings where the settings from `files` are merged
                       into
    :type defaults: dict-alike or None
    :keyword extras: optional configuration dictionary that will applied
                     last

                     Use this for example to overwrite configuration file
                     settings from commandline arguments.
    :type extras: dict-alike or None
    :keyword strict: enable strict parsing mode for parsers that support it
                     (e.g. to prevent duplicate keys)
    :type strict: bool
    :returns: the configuration
    :rtype: ~configmix.config.Configuration

    """
    defaults = kwargs.get("defaults")
    extras = kwargs.get("extras")
    strict = kwargs.get("strict", False)
    if defaults is None:
        ex = Configuration()
    else:
        ex = merge(None, Configuration(defaults))
    for f in files:
        if f.startswith(constants.DIR_PREFIX):
            for f2 in _get_configuration_files_from_dir(f[5:]):
                nx = _load_cfg_from_file(f2, ignore_unknown=True, strict=strict)
                if nx is not None:
                    ex = merge(nx, ex)
        else:
            nx = _load_cfg_from_file(f, strict=strict)
            if nx is not None:
                ex = merge(nx, ex)
    if extras:
        ex = merge(Configuration(extras), ex)
    return Configuration(ex)


def safe_load(*files, **kwargs):
    """Analogous to :func:`load` but do merging with :func:`safe_merge`
    instead of :func:`.merge`

    """
    defaults = kwargs.get("defaults")
    extras = kwargs.get("extras")
    strict = kwargs.get("strict", False)
    if defaults is None:
        ex = Configuration()
    else:
        ex = safe_merge(None, Configuration(defaults))
    for f in files:
        if f.startswith(constants.DIR_PREFIX):
            for f2 in _get_configuration_files_from_dir(f[5:]):
                nx = _load_cfg_from_file(f2, ignore_unknown=True, strict=strict)
                if nx is not None:
                    ex = safe_merge(nx, ex)
        else:
            nx = _load_cfg_from_file(f, strict=strict)
            if nx is not None:
                ex = safe_merge(nx, ex)
    if extras:
        ex = safe_merge(Configuration(extras), ex)
    return Configuration(ex)


def _get_configuration_files_from_dir(root):
    """Returns the sorted list of files within directory `root`

    """
    files = []

    if not os.path.isdir(root):
        return files

    dirfiles = os.listdir(root)
    dirfiles.sort()
    for f in dirfiles:
        path = os.path.join(root, f)
        if os.path.isdir(path):
            # XXX TBD Recurse??? depth-first???
            continue
        files.append(path)
    return files


def _load_yaml(filename, strict=False):
    from . import yaml
    with open(u2fs(filename), "rb") as yf:
        return yaml.safe_load(yf, strict=strict)


def _load_json(filename, strict=False):
    from . import json
    return json.load(filename)


def _load_py(filename, strict=False):
    from . import py
    return py.load(filename)


def _load_ini(filename, strict=False):
    from . import ini
    return ini.load(filename)


def _load_toml(filename, strict=False):
    from . import toml
    return toml.load(filename)


def _load_ignore(filename, strict=False):
    """A loader that returns `None` just to ignore `filename`"""
    return None


EMACS_MODELINE = re.compile(r"-\*-(.*?)-\*-")
EMACS_MODE = re.compile(r"(?:\A\s*|;\s*)mode[:=]\s*([-_.a-zA-Z0-9]+)")


def try_determine_filemode(filename):
    """Try to determine an explicitely given filemode from an Emacs-compatible
    mode declaration (e.g. ``mode=python``).

    :param str filename:
    :return: the found mode string or `None`
    :rtype: str or None

    Only the first two lines are searched for.

    Conveniently to be used in calls to :func:`~.set_assoc` to determine
    the file-mode by content instead of filename extension.

    """
    with io.open(filename, encoding="ascii", errors="replace") as f:
        idx = 0
        for l in f:
            idx += 1
            mo = EMACS_MODELINE.search(l)
            if mo:
                mo = EMACS_MODE.search(mo.group(1))
                if mo:
                    return mo.group(1)
            if idx >= 2:
                break
    return None


DEFAULT_MODE_LOADERS = {
    "python": _load_py,
    "yaml": _load_yaml,
    "conf": _load_ini,
    "conf-windows": _load_ini,
    "ini": _load_ini,
    "toml": _load_toml,
    "conf-toml": _load_toml,
    "javascript": _load_json,
    "json": _load_json,
    "-*-ignore-*-": _load_ignore,
    "-*- ignore -*-": _load_ignore,
}
"""Default associations between file modes and loader functions"""


DEFAULT_ASSOC = [
    ("*.yml", "yaml"),
    ("*.yaml", "yaml"),
    ("*.json", "json"),
    ("*.py", "python"),
    ("*.ini", "conf"),
    ("*.toml", "toml"),
]
"""The builtin default associations of filename extensions with
file modes -- in that order.

The "mode" part may be a string or a callable with a filename
parameter that returns the mode string for the file or `None` if it
can not determined.

"""


USE_DEFAULT_ASSOC = object()
"""Marker for the default association for an extension.

To be used in :func:`.set_assoc`.
"""


def get_default_assoc(pattern):
    """Return the default file-mode association for the :mod:`fnmatch`
    pattern `pattern`.

    :raises: :class:`KeyError` if the `pattern` is not found.

    """
    for pat, fmode in DEFAULT_ASSOC:
        if pattern == pat:
            return fmode
    else:
        raise KeyError("No loader for pattern %r" % pattern)


mode_loaders = {}
"""All configured associations between file modes and loader functions.

See :data:`.DEFAULT_MODE_LOADERS`.

"""

_extensions = []
"""All configured assiciations of filename extensions with file modes.

See :data:`DEFAULT_ASSOC`

"""


def clear_assoc():
    """Remove all configured loader associations.

    The :data:`.DEFAULT_ASSOC` are **not** changed.

    """
    del _extensions[:]


def get_assoc(pattern):
    """Return the default loader for the :mod:`fnmatch` pattern `pattern`.

    :raises: :class:`KeyError` if the `pattern` is not found.

    """
    for pat, fmode in _extensions:
        if pattern == pat:
            return fmode
    else:
        raise KeyError("No associated file-mode for pattern %r" % pattern)


def set_assoc(fnpattern, mode, append=False):
    """Associate a :mod:`fnmatch` style pattern `fnpattern` with a
    file-mode `mode` that determines what will be called when
    :func:`load` encounters a file argument that matches `fnpattern`.

    :param str fnpattern: the :mod:`fnmatch` pattern to associate a loader
                          with
    :param mode: a mode string or a callable that accepts a `filename`
                 argument and returns a file-mode for the given file
                 (or `None`)
    :type mode: str or callable

    :keyword bool append: If `False` (which is the default) then this
        function inserts the given pattern at the head position of the
        currently defined associations, if `True` the pattern will be appended

    The OS specific case-sensitivity behaviour of
    :func:`fnmatch.fnmatch` applies (i.e. :func:`os.path.normpath`
    will be called for both arguments).

    If `loader` is :data:`.USE_DEFAULT_ASSOC` then the default association
    from :data:`.DEFAULT_ASSOC` will be used -- if any.

    """
    if mode is USE_DEFAULT_ASSOC:
        for p, m in DEFAULT_ASSOC:
            if p == fnpattern:
                if append:
                    _extensions.append((fnpattern, m))
                else:
                    _extensions.insert(0, (fnpattern, m))
                break
        else:
            raise ValueError("no DEFAULT mode for pattern: %r" % fnpattern)
    else:
        if append:
            _extensions.append((fnpattern, mode))
        else:
            _extensions.insert(0, (fnpattern, mode))


def del_assoc(fnpattern):
    """Remove all associations for `fnpattern`.

    :param str fnpattern: the :mod:`fnmatch` pattern to associate a loader
                          with

    """
    while True:
        for i in range(len(_extensions)):
            pat, fmode = _extensions[i]
            if fnpattern == pat:
                del _extensions[i]
                break   # restart
        else:
            return      # nothing deleted -> done


def _load_cfg_from_file(filename, ignore_unknown=False, strict=False):
    """Determine the loader for file `filename` and return the loaded
    configuration dict.

    If `ignore_unknown` is `True` then unknown extensions are ignored.
    Otherwise a :exc:`ValueError` exception is raised.

    Can return `None` is the file should be ignored by the caller.

    """
    for p, m in _extensions:
        if fnmatch.fnmatch(filename, p):
            if callable(m):
                m = m(filename)
                if m is None:
                    continue
            return mode_loaders[m](filename, strict=strict)
    else:
        if ignore_unknown:
            return None
        else:
            raise ValueError("Unknown configuration file type for filename "
                             "%r" % filename)


if 0:
    #
    # From: https://github.com/jet9/python-yconfig/blob/master/yconfig.py
    # License: BSD License
    #
    def dict_merge(a, b):
        """Recursively merges dict's. not just simple a['key'] = b['key'], if
        both a and bhave a key who's value is a dict then dict_merge is called
        on both values and the result stored in the returned dictionary."""

        if not isinstance(b, dict):
            return b
        result = deepcopy(a)                     # noqa
        for k, v in b.iteritems():
            if k in result and isinstance(result[k], dict):
                result[k] = dict_merge(result[k], v)
            else:
                result[k] = deepcopy(v)          # noqa
        return result


def merge(user, default, filter_comments=True):
    """Logically merge the configuration in `user` into `default`.

    :param ~configmix.config.Configuration user:
                the new configuration that will be logically merged
                into `default`
    :param ~configmix.config.Configuration default:
                the base configuration where `user` is logically merged into
    :param bool filter_comments: flag whether to filter comment keys that
                   start with any of the items in :data:`.COMMENTS`
    :returns: `user` with the necessary amendments from `default`.
              If `user` is ``None`` then `default` is returned.

    .. note:: The configuration in `user` is augmented/changed
              **inplace**.

              The configuration in `default` will be changed **inplace**
              when filtering out comments (which is the default).

    If a value in `user` is equal to :data:`.constants.DEL_VALUE`
    (``{{::DEL::}}``) the corresponding key will be deleted from the
    merged output.

    From http://stackoverflow.com/questions/823196/yaml-merge-in-python

    """
    if user is None:
        if filter_comments:
            _filter_comments(default)
        _filter_deletions(default)
        return default
    if filter_comments:
        _filter_comments(user)
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k in user:
                if user[k] == constants.DEL_VALUE:
                    # do not copy
                    del user[k]
                else:
                    if isinstance(user, Configuration):
                        user[k] = _merge(
                            user.getitem_ns(k), v, filter_comments)
                    else:
                        user[k] = _merge(user[k], v, filter_comments)
            else:
                user[k] = v
    _filter_deletions(user)
    return user


def _merge(user, default, filter_comments):
    """Recursion helper for :func:`.merge`

    """
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k in user:
                if user[k] == constants.DEL_VALUE:
                    # do not copy
                    del user[k]
                else:
                    if isinstance(user, Configuration):
                        user[k] = _merge(
                            user.getitem_ns(k), v, filter_comments)
                    else:
                        user[k] = _merge(user[k], v, filter_comments)
            else:
                user[k] = v
    return user


def safe_merge(user, default, filter_comments=True):
    """A more safe version of :func:`.merge` that makes deep copies of
    the returned container objects.

    Contrary to :func:`.merge` no given argument is ever changed
    inplace. Every object from `default` is decoupled from the result
    -- so changing the `default` configuration later does not propagate
    into a merged configuration later.

    """
    if user is None:
        if filter_comments:
            _filter_comments(default)
        _filter_deletions(default)
        return copy.deepcopy(default)
    user = copy.deepcopy(user)
    if filter_comments:
        _filter_comments(user)
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k in user:
                if user[k] == constants.DEL_VALUE:
                    # do not copy
                    del user[k]
                else:
                    if isinstance(user, Configuration):
                        user[k] = _safe_merge(
                            user.getitem_ns(k), v, filter_comments)
                    else:
                        user[k] = _safe_merge(user[k], v, filter_comments)
            else:
                user[k] = copy.deepcopy(v)
    _filter_deletions(user)
    return user


def _safe_merge(user, default, filter_comments):
    """Recursion helper for :func:`safe_merge`

    """
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k in user:
                if user[k] == constants.DEL_VALUE:
                    # do not copy
                    del user[k]
                else:
                    if isinstance(user, Configuration):
                        user[k] = _safe_merge(
                            user.getitem_ns(k), v, filter_comments)
                    else:
                        user[k] = _safe_merge(user[k], v, filter_comments)
            else:
                user[k] = copy.deepcopy(v)
    return user


def _filter_comments(d):
    """Recursively filter comments keys in the dict `d`.

    Comment keys are keys that start with any of the items in
    :data:`.constants.COMMENTS`.

    """
    if not isinstance(d, dict):
        return
    # use a copy of the keys because we change `d` while iterating
    if isinstance(d, Configuration):
        for k in list(d.keys()):
            if _is_comment(k):
                del d[k]
            else:
                dk = d.getitem_ns(k)
                if isinstance(dk, dict):
                    _filter_comments(dk)
    else:
        for k in list(d.keys()):
            if _is_comment(k):
                del d[k]
            else:
                if isinstance(d[k], dict):
                    _filter_comments(d[k])


def _is_comment(k):
    for i in constants.COMMENTS:
        try:
            if k.startswith(i):
                return True
        except AttributeError:
            # non-string key
            return False
    return False


def _filter_deletions(d):
    """Recursively filter deletions in the dict `d`.

    Deletions have values that equal :data:`.constants.DEL_VALUE`.

    """
    if not isinstance(d, dict):
        return
    # use a copy of the items because we change `d` while iterating
    if isinstance(d, Configuration):
        for k, v in list(d.items()):
            if v == constants.DEL_VALUE:
                del d[k]
            else:
                dk = d.getitem_ns(k)
                if isinstance(dk, dict):
                    _filter_deletions(dk)
    else:
        for k, v in list(d.items()):
            if v == constants.DEL_VALUE:
                del d[k]
            else:
                if isinstance(d[k], dict):
                    _filter_deletions(d[k])


#
# Init loader defaults: mode->loader and extension->mode
#
mode_loaders.update(DEFAULT_MODE_LOADERS)
for _pattern, _mode in DEFAULT_ASSOC:
    set_assoc(_pattern, _mode)