view configmix/yaml.py @ 676:e1fd2fca8694

>>>>> Tag v0.21.2 for changeset 2ca35f4b03a5
author Franz Glasner <fzglas.hg@dom66.de>
date Wed, 12 Apr 2023 09:27:20 +0200
parents f454889e41fa
children 80d203ed3556
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2015-2022, Franz Glasner. All rights reserved.
# :License:   BSD-3-Clause. See LICENSE.txt for details.
# :-
"""Simple wrapper for :mod:`yaml` to support all-unicode strings when
loading configuration files.

"""

from __future__ import division, print_function, absolute_import


__all__ = ["safe_load", "safe_load_all", "load", "load_all"]


try:
    from collections import OrderedDict
except ImportError:
    try:
        from ordereddict import OrderedDict
    except ImportError:
        OrderedDict = None
import yaml
import yaml.constructor

from .compat import u


DictImpl = OrderedDict or dict


class ConfigLoader(yaml.Loader):

    """A YAML loader, which makes all ``!!str`` strings to Unicode.
    Standard PyYAML does this only in the non-ASCII case.

    If an `OrderedDict` implementation is available then all "map" and
    "omap" nodes are constructed as `OrderedDict`.
    This is against YAML specs but within configuration files it seems
    more natural.

    """

    def __init__(self, *args, **kwds):
        strict = kwds.pop("strict", False)
        self.__allow_duplicate_keys = not strict
        yaml.Loader.__init__(self, *args, **kwds)

    def construct_yaml_str(self, node):
        return self.construct_scalar(node)

    #
    # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1
    # (MIT License)
    #

    def construct_yaml_map(self, node):
        data = DictImpl()
        yield data
        value = self.construct_mapping(node)
        data.update(value)

    def construct_mapping(self, node, deep=False):
        if isinstance(node, yaml.MappingNode):
            self.flatten_mapping(node)
        else:
            raise yaml.constructor.ConstructorError(
                None,
                None,
                'expected a mapping node, but found %s' % node.id,
                node.start_mark)

        mapping = DictImpl()
        for key_node, value_node in node.value:
            key = self.construct_object(key_node, deep=deep)
            try:
                hash(key)
            except TypeError as err:
                raise yaml.constructor.ConstructorError(
                    'while constructing a mapping', node.start_mark,
                    'found unacceptable key (%s)' % (err,
                                                     key_node.start_mark)
                )
            value = self.construct_object(value_node, deep=deep)
            if not self.__allow_duplicate_keys and key in mapping:
                raise yaml.constructor.ConstructorError(
                    'while constructing a mapping', node.start_mark,
                    'found duplicate key %r (%s)' % (key,
                                                     key_node.start_mark)
                )
            mapping[key] = value
        return mapping


ConfigLoader.add_constructor(
    u("tag:yaml.org,2002:str"),
    ConfigLoader.construct_yaml_str)
ConfigLoader.add_constructor(
    u("tag:yaml.org,2002:map"),
    ConfigLoader.construct_yaml_map)
ConfigLoader.add_constructor(
    u("tag:yaml.org,2002:omap"),
    ConfigLoader.construct_yaml_map)


class ConfigSafeLoader(yaml.SafeLoader):

    """A safe YAML loader, which makes all ``!!str`` strings to Unicode.
    Standard PyYAML does this only in the non-ASCII case.

    If an `OrderedDict` implementation is available then all "map" and
    "omap" nodes are constructed as `OrderedDict`.
    This is against YAML specs but within configuration files it seems
    more natural.

    """

    def __init__(self, *args, **kwds):
        strict = kwds.pop("strict", False)
        self.__allow_duplicate_keys = not strict
        yaml.SafeLoader.__init__(self, *args, **kwds)

    def construct_yaml_str(self, node):
        return self.construct_scalar(node)

    #
    # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1
    # (MIT License)
    #

    def construct_yaml_map(self, node):
        data = DictImpl()
        yield data
        value = self.construct_mapping(node)
        data.update(value)

    def construct_mapping(self, node, deep=False):
        if isinstance(node, yaml.MappingNode):
            self.flatten_mapping(node)
        else:
            raise yaml.constructor.ConstructorError(
                None,
                None,
                'expected a mapping node, but found %s' % node.id,
                node.start_mark)

        mapping = DictImpl()
        for key_node, value_node in node.value:
            key = self.construct_object(key_node, deep=deep)
            try:
                hash(key)
            except TypeError as err:
                raise yaml.constructor.ConstructorError(
                    'while constructing a mapping', node.start_mark,
                    'found unacceptable key (%s)' % (err,
                                                     key_node.start_mark)
                )
            value = self.construct_object(value_node, deep=deep)
            if not self.__allow_duplicate_keys and key in mapping:
                raise yaml.constructor.ConstructorError(
                    'while constructing a mapping', node.start_mark,
                    'found duplicate key %r (%s)' % (key,
                                                     key_node.start_mark)
                )
            mapping[key] = value
        return mapping


ConfigSafeLoader.add_constructor(
    u("tag:yaml.org,2002:str"),
    ConfigSafeLoader.construct_yaml_str)
ConfigSafeLoader.add_constructor(
    u("tag:yaml.org,2002:map"),
    ConfigSafeLoader.construct_yaml_map)
ConfigSafeLoader.add_constructor(
    u("tag:yaml.org,2002:omap"),
    ConfigSafeLoader.construct_yaml_map)


def config_loader_factory(strict=False):
    def _real_factory(*args, **kwds):
        kwds["strict"] = strict
        return ConfigLoader(*args, **kwds)
    return _real_factory


def config_safe_loader_factory(strict=False):
    def _real_factory(*args, **kwds):
        kwds["strict"] = strict
        return ConfigSafeLoader(*args, **kwds)
    return _real_factory


def load(stream, Loader=None, strict=False):
    """Parse the given `stream` and return a Python object constructed
    from for the first document in the stream.

    If `strict` is `True` then duplicate mapping keys within a YAML
    document are detected and prevented. If a `Loader` is given then
    `strict` does not apply.

    """
    if Loader is None:
        Loader = config_loader_factory(strict=strict)
    data = yaml.load(stream, Loader)
    # Map an empty document to an empty dict
    if data is None:
        return DictImpl()
    if not isinstance(data, DictImpl):
        raise TypeError("YAML root object must be a mapping")
    return data


def load_all(stream, Loader=None, strict=False):
    """Parse the given `stream` and return a sequence of Python objects
    corresponding to the documents in the `stream`.

    If `strict` is `True` then duplicate mapping keys within a YAML
    document are detected and prevented. If a `Loader` is given then
    `strict` does not apply.

    """
    if Loader is None:
        Loader = config_loader_factory(strict=strict)
    data_all = yaml.load_all(stream, Loader)
    rdata = []
    for data in data_all:
        if data is None:
            rdata.append(DictImpl())
        else:
            if not isinstance(data, DictImpl):
                raise TypeError("YAML root object must be a mapping")
            rdata.append(data)
    return rdata


def safe_load(stream, strict=False):
    """Parse the given `stream` and return a Python object constructed
    from for the first document in the stream.

    Recognizes only standard YAML tags and cannot construct an
    arbitrary Python object.

    If `strict` is `True` then duplicate mapping keys within a YAML document
    are detected and prevented.

    """
    data = yaml.load(stream,
                     Loader=config_safe_loader_factory(strict=strict))
    # Map an empty document to an empty dict
    if data is None:
        return DictImpl()
    if not isinstance(data, DictImpl):
        raise TypeError("YAML root object must be a mapping")
    return data


def safe_load_all(stream, strict=False):
    """Return the list of all decoded YAML documents in the file `stream`.

    Recognizes only standard YAML tags and cannot construct an
    arbitrary Python object.

    If `strict` is `True` then duplicate mapping keys within a YAML document
    are detected and prevented.

    """
    data_all = yaml.load_all(stream,
                             Loader=config_safe_loader_factory(strict=strict))
    rdata = []
    for data in data_all:
        if data is None:
            rdata.append(DictImpl())
        else:
            if not isinstance(data, DictImpl):
                raise TypeError("YAML root object must be a mapping")
            rdata.append(data)
    return data_all