view configmix/yaml.py @ 428:090a25f36a3d

FIX: Allow jailed configurations to use correctly use base configurations that use a different "default" marker object. Jailed configurations assumed that their "default" marker object is identical to the "default" marker object in the unjailed base configuration. This is not always true, especially if "_JailedConfiguration.rebind()" is used. Removed the explicit "default" keyword argument and passed the complete keywords argument dictionary to the base instead. This triggers correct default handling in the base.
author Franz Glasner <f.glasner@feldmann-mg.com>
date Thu, 09 Dec 2021 13:02:17 +0100
parents eed16a1ec8f3
children f454889e41fa
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2015-2021, Franz Glasner. All rights reserved.
# :License:   BSD-3-Clause. See LICENSE.txt for details.
# :-
"""Simple wrapper for :mod:`yaml` to support all-unicode strings when
loading configuration files.

"""

from __future__ import division, print_function, absolute_import


__all__ = ["safe_load", "safe_load_all", "load", "load_all"]


try:
    from collections import OrderedDict
except ImportError:
    try:
        from ordereddict import OrderedDict
    except ImportError:
        OrderedDict = None
import yaml
import yaml.constructor

from .compat import u


DictImpl = OrderedDict or dict


class ConfigLoader(yaml.Loader):

    """A YAML loader, which makes all ``!!str`` strings to Unicode.
    Standard PyYAML does this only in the non-ASCII case.

    If an `OrderedDict` implementation is available then all "map" and
    "omap" nodes are constructed as `OrderedDict`.
    This is against YAML specs but within configuration files it seems
    more natural.

    """

    def __init__(self, *args, **kwds):
        strict = kwds.pop("strict", False)
        self.__allow_duplicate_keys = not strict
        yaml.Loader.__init__(self, *args, **kwds)

    def construct_yaml_str(self, node):
        return self.construct_scalar(node)

    #
    # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1
    # (MIT License)
    #

    def construct_yaml_map(self, node):
        data = DictImpl()
        yield data
        value = self.construct_mapping(node)
        data.update(value)

    def construct_mapping(self, node, deep=False):
        if isinstance(node, yaml.MappingNode):
            self.flatten_mapping(node)
        else:
            raise yaml.constructor.ConstructorError(
                None,
                None,
                'expected a mapping node, but found %s' % node.id,
                node.start_mark)

        mapping = DictImpl()
        for key_node, value_node in node.value:
            key = self.construct_object(key_node, deep=deep)
            try:
                hash(key)
            except TypeError as err:
                raise yaml.constructor.ConstructorError(
                    'while constructing a mapping', node.start_mark,
                    'found unacceptable key (%s)' % (err,
                                                     key_node.start_mark)
                )
            value = self.construct_object(value_node, deep=deep)
            if not self.__allow_duplicate_keys and key in mapping:
                raise yaml.constructor.ConstructorError(
                    'while constructing a mapping', node.start_mark,
                    'found duplicate key %r (%s)' % (key,
                                                     key_node.start_mark)
                )
            mapping[key] = value
        return mapping


ConfigLoader.add_constructor(
    u("tag:yaml.org,2002:str"),
    ConfigLoader.construct_yaml_str)
ConfigLoader.add_constructor(
    u("tag:yaml.org,2002:map"),
    ConfigLoader.construct_yaml_map)
ConfigLoader.add_constructor(
    u("tag:yaml.org,2002:omap"),
    ConfigLoader.construct_yaml_map)


class ConfigSafeLoader(yaml.SafeLoader):

    """A safe YAML loader, which makes all ``!!str`` strings to Unicode.
    Standard PyYAML does this only in the non-ASCII case.

    If an `OrderedDict` implementation is available then all "map" and
    "omap" nodes are constructed as `OrderedDict`.
    This is against YAML specs but within configuration files it seems
    more natural.

    """

    def __init__(self, *args, **kwds):
        strict = kwds.pop("strict", False)
        self.__allow_duplicate_keys = not strict
        yaml.SafeLoader.__init__(self, *args, **kwds)

    def construct_yaml_str(self, node):
        return self.construct_scalar(node)

    #
    # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1
    # (MIT License)
    #

    def construct_yaml_map(self, node):
        data = DictImpl()
        yield data
        value = self.construct_mapping(node)
        data.update(value)

    def construct_mapping(self, node, deep=False):
        if isinstance(node, yaml.MappingNode):
            self.flatten_mapping(node)
        else:
            raise yaml.constructor.ConstructorError(
                None,
                None,
                'expected a mapping node, but found %s' % node.id,
                node.start_mark)

        mapping = DictImpl()
        for key_node, value_node in node.value:
            key = self.construct_object(key_node, deep=deep)
            try:
                hash(key)
            except TypeError as err:
                raise yaml.constructor.ConstructorError(
                    'while constructing a mapping', node.start_mark,
                    'found unacceptable key (%s)' % (err,
                                                     key_node.start_mark)
                )
            value = self.construct_object(value_node, deep=deep)
            if not self.__allow_duplicate_keys and key in mapping:
                raise yaml.constructor.ConstructorError(
                    'while constructing a mapping', node.start_mark,
                    'found duplicate key %r (%s)' % (key,
                                                     key_node.start_mark)
                )
            mapping[key] = value
        return mapping


ConfigSafeLoader.add_constructor(
    u("tag:yaml.org,2002:str"),
    ConfigSafeLoader.construct_yaml_str)
ConfigSafeLoader.add_constructor(
    u("tag:yaml.org,2002:map"),
    ConfigSafeLoader.construct_yaml_map)
ConfigSafeLoader.add_constructor(
    u("tag:yaml.org,2002:omap"),
    ConfigSafeLoader.construct_yaml_map)


def config_loader_factory(strict=False):
    def _real_factory(*args, **kwds):
        kwds["strict"] = strict
        return ConfigLoader(*args, **kwds)
    return _real_factory


def config_safe_loader_factory(strict=False):
    def _real_factory(*args, **kwds):
        kwds["strict"] = strict
        return ConfigSafeLoader(*args, **kwds)
    return _real_factory


def load(stream, Loader=None, strict=False):
    """Parse the given `stream` and return a Python object constructed
    from for the first document in the stream.

    If `strict` is `True` then duplicate mapping keys within a YAML
    document are detected and prevented. If a `Loader` is given then
    `strict` does not apply.

    """
    if Loader is None:
        Loader = config_loader_factory(strict=strict)
    data = yaml.load(stream, Loader)
    # Map an empty document to an empty dict
    if data is None:
        return DictImpl()
    if not isinstance(data, DictImpl):
        raise TypeError("YAML root object must be a mapping")
    return data


def load_all(stream, Loader=None, strict=False):
    """Parse the given `stream` and return a sequence of Python objects
    corresponding to the documents in the `stream`.

    If `strict` is `True` then duplicate mapping keys within a YAML
    document are detected and prevented. If a `Loader` is given then
    `strict` does not apply.

    """
    if Loader is None:
        Loader = config_loader_factory(strict=strict)
    data_all = yaml.load_all(stream, Loader)
    rdata = []
    for data in data_all:
        if data is None:
            rdata.append(DictImpl())
        else:
            if not isinstance(data, DictImpl):
                raise TypeError("YAML root object must be a mapping")
            rdata.append(data)
    return rdata


def safe_load(stream, strict=False):
    """Parse the given `stream` and return a Python object constructed
    from for the first document in the stream.

    Recognizes only standard YAML tags and cannot construct an
    arbitrary Python object.

    If `strict` is `True` then duplicate mapping keys within a YAML document
    are detected and prevented.

    """
    data = yaml.load(stream,
                     Loader=config_safe_loader_factory(strict=strict))
    # Map an empty document to an empty dict
    if data is None:
        return DictImpl()
    if not isinstance(data, DictImpl):
        raise TypeError("YAML root object must be a mapping")
    return data


def safe_load_all(stream, strict=False):
    """Return the list of all decoded YAML documents in the file `stream`.

    Recognizes only standard YAML tags and cannot construct an
    arbitrary Python object.

    If `strict` is `True` then duplicate mapping keys within a YAML document
    are detected and prevented.

    """
    data_all = yaml.load_all(stream,
                             Loader=config_safe_loader_factory(strict=strict))
    rdata = []
    for data in data_all:
        if data is None:
            rdata.append(DictImpl())
        else:
            if not isinstance(data, DictImpl):
                raise TypeError("YAML root object must be a mapping")
            rdata.append(data)
    return data_all