view configmix/variables.py @ 691:4643f801379e

Changelog
author Franz Glasner <fzglas.hg@dom66.de>
date Sat, 10 Jun 2023 22:33:11 +0200
parents f454889e41fa
children 57fe110c50c8
line wrap: on
line source

# -*- coding: utf-8 -*-
# :-
# :Copyright: (c) 2015-2022, Franz Glasner. All rights reserved.
# :License:   BSD-3-Clause. See LICENSE.txt for details.
# :-
"""Variable interpolation: implementation of namespaces and filters

"""

from __future__ import division, absolute_import, print_function


__all__ = []


import os
import platform
from functools import wraps

from .compat import PY2, native_os_str_to_text, text_to_native_os_str, u
from .constants import REF_NAMESPACE, NONE_FILTER, EMPTY_FILTER


_MARKER = object()


def _envlookup(name, default=_MARKER):
    """Lookup an environment variable"""
    try:
        return native_os_str_to_text(os.environ[name])
    except KeyError:
        if default is _MARKER:
            raise
        else:
            return default


def _oslookup(name, default=_MARKER):
    """Lookup some process and/or OS state """
    if name == "cwd":
        return native_os_str_to_text(os.getcwd())
    elif name == "node":
        return native_os_str_to_text(platform.node())
    else:
        if default is _MARKER:
            raise KeyError("key %r not found in the namespace" % name)
        else:
            return default


def _pylookup(name, default=_MARKER):
    """Lookup Python specific information"""
    if name == "version":
        return u(platform.python_version())
    elif name == "implementation":
        return u(platform.python_implementation())
    elif name == "version_maj_min":
        t = platform.python_version_tuple()
        return u('.'.join(t[:2]))
    elif name == "version_maj":
        t = platform.python_version_tuple()
        return u(t[0])
    else:
        if default is _MARKER:
            raise KeyError("variable %r not found in namespace PY" % name)
        else:
            return default


_varns_registry = {}
"""Namespace registry"""


def add_varns(name, fn):
    """Register a new variable namespace `name` and it's implementing
    function `fn`

    ..note:: This function checks that `name` is not the special
             namespace :data:`~configmix.constants.REF_NAMESPACE`.

    """
    if name == REF_NAMESPACE:
        raise ValueError(
            "the special namespace `%s' is not allowed here" % REF_NAMESPACE)
    _varns_registry[name] = fn


def lookup_varns(name):
    """Lookup the variable namespace `name` and return it's implementing
    function

    :param str name: the namespace name
    :returns: the implementing function
    :exception KeyError: if the namespace `name` doesn't exist

    ..note:: This function checks that `name` is not the special
             namespace :data:`~configmix.constants.REF_NAMESPACE`.

    """
    if name == REF_NAMESPACE:
        raise ValueError(
            "the special namespace `%s' is not allowed here" % REF_NAMESPACE)
    return _varns_registry[_normalized(name)]


_filter_registry = {}
"""Filter registry"""


def add_filter(name, fn):
    """Register a variable filter function with name `name` and
    implementation `fn`

    """
    _filter_registry[_normalized(name)] = fn


def lookup_filter(name):
    """Lookup a variable filter with name `name` and return it's
    implementation function

    :param str name: the logical filter name
    :returns: the implementing filter function
    :exception KeyError: if the filter cannot be found

    """
    return _filter_registry[_normalized(name)]


def filter(name):
    """Decorator for a filter function.

    Example usage::

       @filter("myfilter")
       def myfilter_impl(appconfig, variable_value):
           filtered_value = ...
           return filtered_value


    """

    def _decorator(f):

        @wraps(f)
        def _f(appconfig, v):
            return f(appconfig, v)

        add_filter(name, _f)
        return _f

    return _decorator


def _normalized(name):
    return name.replace('-', '_')


#
# Some pre-defined filter functions
#
if PY2:

    @filter("urlquote")
    def urlquote(config, v):
        """Filter function to replace all special characters in string `v`
        using the ``%xx`` escape

        """
        from urllib import quote
        return quote(v.encode("utf-8"), safe=b"").decode("utf-8")


    @filter("urlquote_plus")
    def urlquote_plus(config, v):
        """Filter function to replace all special characters (including
        spaces) in string `v` using the ``%xx`` escape

        """
        from urllib import quote_plus
        return quote_plus(v.encode("utf-8"), safe=b"").decode("utf-8")

else:

    @filter("urlquote")
    def urlquote(config, v):
        """Filter function to replace all special characters in string `v`
        using the ``%xx`` escape

        """
        from urllib.parse import quote
        return quote(v, safe="")


    @filter("urlquote_plus")
    def urlquote_plus(config, v):
        """Filter function to replace all special characters (including
        spaces) in string `v` using the ``%xx`` escape

        """
        from urllib.parse import quote_plus
        return quote_plus(v, safe="")


@filter("saslprep")
def saslprep(config, v):
    """Filter function to perform a `SASLprep` according to :rfc:`4013` on
    `v`.

    This is a Stringprep Profile for usernames and passwords

    """
    import passlib.utils
    return passlib.utils.saslprep(v)


@filter("normpath")
def normpath_impl(config, v):
    """Implementation of the `normpath` filter function"""
    return os.path.normpath(v)


@filter("abspath")
def abspath_impl(config, v):
    """Implementation of the `abspath` filter function"""
    return os.path.abspath(v)


@filter("posixpath")
def posixpath_impl(config, v):
    """Implementation of the `posixpath` filter function"""
    return v.replace(u('\\'), u('/'))


@filter("lower")
def lower_impl(config, v):
    """Implementation of the `lower` filter function"""
    return v.lower()


@filter("upper")
def upper_impl(config, v):
    """Implementation of the `upper` filter function"""
    return v.upper()


@filter(text_to_native_os_str(NONE_FILTER, encoding="ascii"))
def None_filter_impl(config, v):
    """Identity.

    The `None` filter is just a marker to not throw :exc:`KeyError`
    but return `None`. It is a no-op within the filter-chain itself.

    """
    return v


@filter(text_to_native_os_str(EMPTY_FILTER, encoding="ascii"))
def Empty_filter_impl(config, v):
    """Identity.

    The `Empty` filter is just a marker to not throw :exc:`KeyError`
    but return the empty string. It is a no-op within the filter-chain itself.

    """
    return v


# Register the default namespaces
add_varns("ENV", _envlookup)
add_varns("OS", _oslookup)
add_varns("PY", _pylookup)
try:
    from .extras import aws
except ImportError:
    pass
else:
    add_varns("AWS", aws._awslookup)