view configmix/__init__.py @ 174:e2505f524ab9

Use the "@(#)" sigil in the package documentation header
author Franz Glasner <f.glasner@feldmann-mg.com>
date Fri, 26 Apr 2019 18:11:03 +0200
parents b3ba2b0265b5
children 327032bb0f6b
line wrap: on
line source

# -*- coding: utf-8 -*-
"""A library for helping with configuration files.

:Author:    Franz Glasner
:Copyright: (c) 2015–2019, Franz Glasner.
            All rights reserved.
:License:   3-clause BSD License.
            See LICENSE.txt for details.
:ID:        @(#) $Header$

"""

from __future__ import division, print_function, absolute_import


__version__ = "0.7.dev1"

__revision__ = "|VCSRevision|"
__date__ = "|VCSJustDate|"


import fnmatch
import copy

from .compat import u, u2fs
from .config import Configuration


__all__ = ["load", "safe_load",
           "set_loader", "get_loader",
           "get_default_loader",
           "Configuration"]


COMMENTS = [u("__comment"),
            u("__doc"),
]
"""Prefixes for comment configuration keys that are to be handled as
comments

"""


def load(*files, **kwargs):
    """Load the given configuration files, merge them in the given order
    and return the resulting configuration dictionary.

    :param files: the filenames of the configuration files to read and merge
    :keyword defaults: optional configuration dictionary with some default
                       settings where the settings from `files` are merged
                       into
    :type defaults: a configuration dictionary or `None`
    :returns: the configuration
    :rtype: ~configmix.config.Configuration

    """
    defaults = kwargs.get("defaults")
    if not files:
        if defaults is None:
            return Configuration()
        else:
            return Configuration(defaults)
    else:
        if defaults is None:
            start = 1
            ex = merge(None, _load_cfg_from_file(files[0]))
        else:
            start = 0
            ex = merge(None, defaults)
        for f in files[start:]:
            ex = merge(_load_cfg_from_file(f), ex)
        return Configuration(ex)


def safe_load(*files, **kwargs):
    """Analogous to :func:`load` but do merging with :func:`safe_merge`
    instead of :func:`merge`

    """
    defaults = kwargs.get("defaults")
    if not files:
        if defaults is None:
            return Configuration()
        else:
            return Configuration(defaults)
    else:
        if defaults is None:
            start = 1
            ex = merge(None, _load_cfg_from_file(files[0]))
        else:
            start = 0
            ex = merge(None, defaults)
        ex = safe_merge(None, _load_cfg_from_file(files[0]))
        for f in files[start:]:
            ex = safe_merge(_load_cfg_from_file(f), ex)
        return Configuration(ex)


def _load_yaml(filename):
    from . import yaml
    with open(u2fs(filename), "rb") as yf:
        return yaml.safe_load(yf)


def _load_json(filename):
    from . import json
    return json.load(filename)


def _load_py(filename):
    from . import py
    return py.load(filename)


def _load_ini(filename):
    from . import ini
    return ini.load(filename)


_default_loaders = [
    ("*.yml", _load_yaml),
    ("*.yaml", _load_yaml),
    ("*.json", _load_json),
    ("*.py", _load_py),
    ("*.ini", _load_ini),
]
"""The builtin default associations of extensions with loaders -- in that
order

"""


DEFAULT_LOADER = object()
"""Marker for the default loader for an extension.

To be used in :func:`set_loader`.
"""

def get_default_loader(pattern):
    """Return the default loader for the :mod:`fnmatch` pattern `pattern`.

    :raises: :class:`KeyError` if the `pattern` is not found.

    """
    for pat, loader in _default_loaders:
        if pattern == pat:
            return loader
    else:
        raise KeyError("No loader for pattern %r" % pattern)


_loaders = []
"""All configured loader functions"""


def clear_loader():
    """Remove all configured loader associations.

    The :data:`_default_loaders` are **not** changed.

    """
    del _loaders[:]


def get_loader(pattern):
    """Return the default loader for the :mod:`fnmatch` pattern `pattern`.

    :raises: :class:`KeyError` if the `pattern` is not found.

    """
    for pat, loader in _loaders:
        if pattern == pat:
            return loader
    else:
        raise KeyError("No loader for pattern %r" % pattern)


def set_loader(fnpattern, loader):
    """Associate a :mod:`fnmatch` pattern `fnpattern` with a callable
    `loader` that will be called when :func:`load` encounters a file
    argument that matches `fnpattern`.

    :param str fnpattern: the :mod:`fnmatch` pattern to associate a loader with
    :param callable loader: a callable that accepts a `filename` argument and
                            returns a parsed configuration from a given file

    This function prepends to the given pattern to the currently defined
    associations.

    The OS specific case-sensitivity behaviour of
    :func:`fnmatch.fnmatch` apply (i.e. :func:`os.path.normpath` will
    be called for both arguments).

    If `loader` is :data:`DEFAULT_LOADER` then the default association
    from :data:`_default_loaders` will be used -- if any.

    """
    if loader is DEFAULT_LOADER:
        for _p, _l in _default_loaders:
            if _p == fnpattern:
                _loaders.insert(0, (fnpattern, _l))
                break
        else:
            raise ValueError("no DEFAULT loader for pattern: %r" % fnpattern)
    else:
        _loaders.insert(0, (fnpattern, loader))


def _load_cfg_from_file(filename):
    for _p, _l in _loaders:
        if fnmatch.fnmatch(filename, _p):
            return _l(filename)
    else:
        raise ValueError("Unknown configuration file type for filename "
                         "%r" % filename)


if 0:
    #
    # From: https://github.com/jet9/python-yconfig/blob/master/yconfig.py
    # License: BSD License
    #
    def dict_merge(a, b):
        """Recursively merges dict's. not just simple a['key'] = b['key'], if
        both a and bhave a key who's value is a dict then dict_merge is called
        on both values and the result stored in the returned dictionary."""

        if not isinstance(b, dict):
            return b
        result = deepcopy(a)
        for k, v in b.iteritems():
            if k in result and isinstance(result[k], dict):
                result[k] = dict_merge(result[k], v)
            else:
                result[k] = deepcopy(v)
        return result


def merge(user, default, filter_comments=True):
    """Logically merge the configuration in `user` into `default`.

    :param ~configmix.config.Configuration user:
                the new configuration that will be logically merged
                into `default`
    :param ~configmix.config.Configuration default:
                the base configuration where `user` is logically merged into
    :param bool filter_comments: flag whether to filter comment keys that
                   start with any of the items in :data:`COMMENTS`
    :returns: `user` with the necessary amendments from `default`.
              If `user` is ``None`` then `default` is returned.

    .. note:: The configuration in `user` is augmented/changed
              **inplace**.

              The configuration in `default` will be changed **inplace**
              when filtering out comments (which is the default).

    From http://stackoverflow.com/questions/823196/yaml-merge-in-python

    """
    if user is None:
        if filter_comments:
            _filter_comments(default)
        return default
    if filter_comments:
        _filter_comments(user)
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k not in user:
                user[k] = v
            else:
                user[k] = _merge(user[k], v, filter_comments)
    return user


def _merge(user, default, filter_comments):
    """Recursion helper for :meth:`merge`

    """
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k not in user:
                user[k] = v
            else:
                user[k] = _merge(user[k], v, filter_comments)
    return user


def safe_merge(user, default, filter_comments=True):
    """A more safe version of :func:`merge` that makes deep copies of
    the returned container objects.

    Contrary to :func:`merge` no given argument is ever changed
    inplace. Every object from `default` is decoupled from the result
    -- so changing the `default` configuration later does not propagate
    into a merged configuration later.

    """
    if user is None:
        if filter_comments:
            _filter_comments(default)
        return copy.deepcopy(default)
    user = copy.deepcopy(user)
    if filter_comments:
        _filter_comments(user)
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k not in user:
                user[k] = copy.deepcopy(v)
            else:
                user[k] = _safe_merge(user[k], v, filter_comments)
    return user


def _safe_merge(user, default, filter_comments):
    """Recursion helper for :meth:`safe_merge`

    """
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k not in user:
                user[k] = copy.deepcopy(v)
            else:
                user[k] = _safe_merge(user[k], v, filter_comments)
    return user


def _filter_comments(d):
    """Recursively filter comments keys in the dict `d`.

    Comment keys are keys that start with any of the items in
    :data:`COMMENTS`.

    """
    if not isinstance(d, dict):
        return
    # use a copy of the keys because we change `d` while iterating
    for k in list(d.keys()):
        if _is_comment(k):
            del d[k]
        else:
            if isinstance(d[k], dict):
                _filter_comments(d[k])


def _is_comment(k):
    for i in COMMENTS:
        if k.startswith(i):
            return True
    return False


#
# Init loader defaults
#
for _pattern, _ld in _default_loaders:
    set_loader(_pattern, _ld)