view configmix/__init__.py @ 167:d8155c429171 v0.6

+++++ v0.6
author Franz Glasner <fzglas.hg@dom66.de>
date Thu, 14 Mar 2019 09:37:18 +0100
parents b5ce9a8461bf
children c247a5dc35ed
line wrap: on
line source

# -*- coding: utf-8 -*-
"""A library for helping with configuration files.

:Author:    Franz Glasner
:Copyright: (c) 2015–2019, Franz Glasner.
            All rights reserved.
:License:   3-clause BSD License.
            See LICENSE.txt for details.
:ID:        $Header$

"""

from __future__ import division, print_function, absolute_import


__version__ = "0.6"

__revision__ = "|VCSRevision|"
__date__ = "|VCSJustDate|"


import copy

from .compat import u, u2fs
from .config import Configuration


__all__ = ["load", "safe_load",
           "set_loader", "default_loaders",
           "Configuration"]


COMMENTS = [u("__comment"),
            u("__doc"),
]
"""Prefixes for comment configuration keys that are to be handled as
comments

"""


def load(*files):
    """Load the given configuration files, merge them in the given order
    and return the resulting configuration dictionary.

    :param files: the filenames of the configuration files to read and merge
    :returns: the configuration
    :rtype: ~configmix.config.Configuration

    """
    if not files:
        return Configuration()
    else:
        ex = merge(None, _load_cfg_from_file(files[0]))
        for f in files[1:]:
            ex = merge(_load_cfg_from_file(f), ex)
        return Configuration(ex)


def safe_load(*files):
    """Analogous to :func:`load` but do merging with :func:`safe_merge`
    instead of :func:`merge`

    """
    if not files:
        return Configuration()
    else:
        ex = safe_merge(None, _load_cfg_from_file(files[0]))
        for f in files[1:]:
            ex = safe_merge(_load_cfg_from_file(f), ex)
        return Configuration(ex)


def _load_yaml(filename):
    from . import yaml
    with open(u2fs(filename), "rb") as yf:
        return yaml.safe_load(yf)


def _load_json(filename):
    from . import json
    return json.load(filename)


def _load_py(filename):
    from . import py
    return py.load(filename)


def _load_ini(filename):
    from . import ini
    return ini.load(filename)


default_loaders = {
    ".yml": _load_yaml,
    ".yaml": _load_yaml,
    ".json": _load_json,
    ".py": _load_py,
    ".ini": _load_ini
}
"""The builtin default associations of extensions with loaders"""


DEFAULT_LOADER = object()
"""Marker for the default loader for an extension.

To be used in :func:`set_loader`.
"""

_loaders = {}
"""All configured loader functions"""


def clear_loader():
    """Remove all configured loader associations.

    The :data:`default_loaders` are **not** changed.

    """
    _loaders.clear()


def set_loader(extension, loader):
    """Associate a filename trailer `extension` with a callable `loader` that
    will be called when :func:`load` encounters a file argument that ends
    with `extension`.

    :param str extension: the extension to associate a loader with
    :param callable loader: a callable that accepts a `filename` argument and
                            returns a parsed configuration from a given file

    `extension` should be all lowercase because lookup in the loader database
    is *case-insensitive*.

    If `loader` is :data:`DEFAULT_LOADER` then the default association
    from :data:`default_loaders` will be used -- if any.

    """
    if loader is DEFAULT_LOADER:
        try:
            _loaders[extension] = default_loaders[extension]
        except KeyError:
            raise ValueError("no DEFAULT loader for extension: %r" % extension)
    else:
        _loaders[extension] = loader


def _load_cfg_from_file(filename):
    fnl = filename.lower()
    extensions = list(_loaders.keys())
    extensions.sort(key=lambda x: len(x), reverse=True)
    for ext in extensions:
        if fnl.endswith(ext):
            return _loaders[ext](filename)
    else:
        raise ValueError("Unknown configuration file type for filename "
                         "%r" % filename)


if 0:
    #
    # From: https://github.com/jet9/python-yconfig/blob/master/yconfig.py
    # License: BSD License
    #
    def dict_merge(a, b):
        """Recursively merges dict's. not just simple a['key'] = b['key'], if
        both a and bhave a key who's value is a dict then dict_merge is called
        on both values and the result stored in the returned dictionary."""

        if not isinstance(b, dict):
            return b
        result = deepcopy(a)
        for k, v in b.iteritems():
            if k in result and isinstance(result[k], dict):
                result[k] = dict_merge(result[k], v)
            else:
                result[k] = deepcopy(v)
        return result


def merge(user, default, filter_comments=True):
    """Logically merge the configuration in `user` into `default`.

    :param ~configmix.config.Configuration user:
                the new configuration that will be logically merged
                into `default`
    :param ~configmix.config.Configuration default:
                the base configuration where `user` is logically merged into
    :param bool filter_comments: flag whether to filter comment keys that
                   start with any of the items in :data:`COMMENTS`
    :returns: `user` with the necessary amendments from `default`.
              If `user` is ``None`` then `default` is returned.

    .. note:: The configuration in `user` is augmented/changed
              **inplace**.

              The configuration in `default` will be changed **inplace**
              when filtering out comments (which is the default).

    From http://stackoverflow.com/questions/823196/yaml-merge-in-python

    """
    if user is None:
        if filter_comments:
            _filter_comments(default)
        return default
    if filter_comments:
        _filter_comments(user)
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k not in user:
                user[k] = v
            else:
                user[k] = _merge(user[k], v, filter_comments)
    return user


def _merge(user, default, filter_comments):
    """Recursion helper for :meth:`merge`

    """
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k not in user:
                user[k] = v
            else:
                user[k] = _merge(user[k], v, filter_comments)
    return user


def safe_merge(user, default, filter_comments=True):
    """A more safe version of :func:`merge` that makes deep copies of
    the returned container objects.

    Contrary to :func:`merge` no given argument is ever changed
    inplace. Every object from `default` is decoupled from the result
    -- so changing the `default` configuration later does not propagate
    into a merged configuration later.

    """
    if user is None:
        if filter_comments:
            _filter_comments(default)
        return copy.deepcopy(default)
    user = copy.deepcopy(user)
    if filter_comments:
        _filter_comments(user)
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k not in user:
                user[k] = copy.deepcopy(v)
            else:
                user[k] = _safe_merge(user[k], v, filter_comments)
    return user


def _safe_merge(user, default, filter_comments):
    """Recursion helper for :meth:`safe_merge`

    """
    if isinstance(user, dict) and isinstance(default, dict):
        for k, v in default.items():
            if filter_comments and _is_comment(k):
                continue
            if k not in user:
                user[k] = copy.deepcopy(v)
            else:
                user[k] = _safe_merge(user[k], v, filter_comments)
    return user


def _filter_comments(d):
    """Recursively filter comments keys in the dict `d`.

    Comment keys are keys that start with any of the items in
    :data:`COMMENTS`.

    """
    if not isinstance(d, dict):
        return
    # use a copy of the keys because we change `d` while iterating
    for k in list(d.keys()):
        if _is_comment(k):
            del d[k]
        else:
            if isinstance(d[k], dict):
                _filter_comments(d[k])


def _is_comment(k):
    for i in COMMENTS:
        if k.startswith(i):
            return True
    return False


#
# Init loader defaults
#
for _extension in default_loaders:
    set_loader(_extension, DEFAULT_LOADER)