Mercurial > hgrepos > Python > libs > ConfigMix
view configmix/config.py @ 708:e692216f8756
Allow also "," characters to act as a separator within a filter-chain.
The first filter is introduced by a "|", optional other ones may be introduced
by a "|" or a ",".
| author | Franz Glasner <f.glasner@feldmann-mg.com> |
|---|---|
| date | Tue, 15 Aug 2023 11:14:51 +0200 |
| parents | 10fbc23b4dba |
| children | 6557cf9ecea5 |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2015-2022, Franz Glasner. All rights reserved. # :License: BSD-3-Clause. See LICENSE.txt for details. # :- """The unified configuration dictionary with attribute support or variable substitution. """ from __future__ import division, absolute_import, print_function __all__ = ["Configuration"] import warnings try: from collections import OrderedDict as ConfigurationBase except ImportError: try: from ordereddict import OrderedDict as ConfigurationBase except ImportError: ConfigurationBase = dict try: from urllib.parse import urlsplit except ImportError: from urlparse import urlsplit from .variables import lookup_varns, lookup_filter from .compat import u, uchr, n, str_and_u, PY2 from .constants import REF_NAMESPACE, NONE_FILTER, EMPTY_FILTER, DEL_VALUE try: from ._speedups import (fast_unquote, fast_quote, fast_pathstr2path, _fast_split_ns, _fast_split_filters, _fast_getvarl, _fast_getvarl_s, _fast_getvar, _fast_getvar_s, _fast_interpolate_variables, _sync_MISSING, _sync_MARKER) except ImportError: fast_unquote = None fast_quote = None fast_pathstr2path = None _fast_split_ns = None _fast_split_filters = None _fast_getvarl = None _fast_getvarl_s = None _fast_getvar = None _fast_getvar_s = None _fast_interpolate_variables = None _sync_MISSING = None _MARKER = object() _MISSING = object() class _AttributeDict(ConfigurationBase): def __getattr__(self, name): try: v = self[name] except KeyError: raise AttributeError("%s has no attribute %r" % (type(self), name)) else: # Wrap a dict into another dict with attribute access support if isinstance(v, dict): return _AttributeDict(v) else: return v class CoercingMethodsMixin(object): """Mixin to provide some common implementations for retrieval methods that convert return values to a fixed type (int, bool, float). Both :class:`~.Configuration` and :class:`~._JailedConfiguration` use this mixin. """ def getintvarl_s(self, *path, **kwds): """Get a (possibly substituted) variable and coerce text to a number. """ s = self.getvarl_s(*path, **kwds) if isinstance(s, _TEXTTYPE): return int(s, 0) else: return s def getfirstintvarl_s(self, *paths, **kwds): """Get a (possibly substituted) variable and coerce text to a number. """ s = self.getfirstvarl_s(*paths, **kwds) if isinstance(s, _TEXTTYPE): return int(s, 0) else: return s def getintvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and coerce text to a number. """ s = self.getvar_s(varname, default=default) if isinstance(s, _TEXTTYPE): return int(s, 0) else: return s def getfirstintvar_s(self, *varnames, **kwds): """A variant of :meth:`~.getintvar_s` that returns the first found variable in the list of given variables in `varnames`. """ s = self.getfirstvar_s(*varnames, **kwds) if isinstance(s, _TEXTTYPE): return int(s, 0) else: return s def getboolvarl_s(self, *path, **kwds): """Get a (possibly substituted) variable and convert text to a boolean """ s = self.getvarl_s(*path, **kwds) if isinstance(s, _TEXTTYPE): sl = s.strip().lower() if sl not in self._BOOL_CVT: raise ValueError("Not a boolean: %r" % (s, )) return self._BOOL_CVT[sl] else: return s def getfirstboolvarl_s(self, *paths, **kwds): """Get a (possibly substituted) variable and convert text to a boolean """ s = self.getfirstvarl_s(*paths, **kwds) if isinstance(s, _TEXTTYPE): sl = s.strip().lower() if sl not in self._BOOL_CVT: raise ValueError("Not a boolean: %r" % (s, )) return self._BOOL_CVT[sl] else: return s def getboolvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and convert text to a boolean """ s = self.getvar_s(varname, default=default) if isinstance(s, _TEXTTYPE): sl = s.strip().lower() if sl not in self._BOOL_CVT: raise ValueError("Not a boolean: %r" % (s, )) return self._BOOL_CVT[sl] else: return s def getfirstboolvar_s(self, *varnames, **kwds): """A variant of :meth:`~.getboolvar_s` that returns the first found variable in the list of given variables in `varnames`. """ s = self.getfirstvar_s(*varnames, **kwds) if isinstance(s, _TEXTTYPE): sl = s.strip().lower() if sl not in self._BOOL_CVT: raise ValueError("Not a boolean: %r" % (s, )) return self._BOOL_CVT[sl] else: return s # Conversion of booleans _BOOL_CVT = { u('1'): True, u('yes'): True, u('true'): True, u('on'): True, u('0'): False, u('no'): False, u('false'): False, u('off'): False } def getfloatvarl_s(self, *path, **kwds): """Get a (possibly substituted) variable and convert text to a float """ s = self.getvarl_s(*path, **kwds) if isinstance(s, _TEXTTYPE): return float(s) else: return s def getfirstfloatvarl_s(self, *path, **kwds): """Get a (possibly substituted) variable and convert text to a float """ s = self.getfirstvarl_s(*path, **kwds) if isinstance(s, _TEXTTYPE): return float(s) else: return s def getfloatvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and convert text to a float """ s = self.getvar_s(varname, default) if isinstance(s, _TEXTTYPE): return float(s) else: return s def getfirstfloatvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and convert text to a float """ s = self.getfirstvar_s(varname, default) if isinstance(s, _TEXTTYPE): return float(s) else: return s # Speed _EMPTY_STR = u("") _TEXTTYPE = type(_EMPTY_STR) _STARTTOK = u(b"{{") _ENDTOK = u(b"}}") _HIER_SEPARATOR = u(b'.') _NS_SEPARATOR = u(b':') _FILTER_SEPARATOR = u(b'|') _FILTER_SEPARATOR_2 = u(b',') _STARTTOK_REF = _STARTTOK + REF_NAMESPACE + _NS_SEPARATOR _ENDTOK_REF = _ENDTOK _ENDTOK_FILTER = _FILTER_SEPARATOR + _ENDTOK _DOT = u(b'.') _TILDE = u(b'~') _QUOTE = u(b'%') _QUOTE_x = u(b'x') _QUOTE_u = u(b'u') _QUOTE_U = u(b'U') _COMMENT = u(b'#') _QUOTE_MAP = { 0x25: u(b'%x25'), # _QUOTE 0x2e: u(b'%x2e'), # _DOT 0x3a: u(b'%x3a'), # _NS_SEPARATOR 0x23: u(b'%x23'), # _COMMENT / anchor 0x7c: u(b'%x7c'), # _FILTER_SEPARATOR 0x22: u(b'%x22'), 0x27: u(b'%x27'), 0x7b: u(b'%x7b'), 0x7d: u(b'%x7d'), 0x5b: u(b'%x5b'), 0x5d: u(b'%x5d'), 0x7e: u(b'%x7e'), # tilde `~` } _QUOTE_SAFE = u(b'abcdefghijklmnopqrstuvwxyz' b'ABCDEFGHIJKLMNOPQRSTUVWXYZ' b'0123456789' b'-_@!$&/\\()=?*+;,<>^') """Mostly used configuration key characters that do not need any quoting """ def py_quote(s): """Replace important special characters in string `s` by replacing them with ``%xNN`` where `NN` are the two hexadecimal digits of the character's unicode codepoint value. Handled are the important special chars: ``%``, ``.``, ``:``, ``#``; ``'``, ``"``, ``|``, ``{``, ``}``, ``[`` and ``]``. See also the :ref:`quoting` section. """ try: # Quick check whether all of the chars are in _QUOTE_SAFE if not s.lstrip(_QUOTE_SAFE): return s except AttributeError: # # Check whether s is an index (int) and return the special tag if # it is so # if isinstance(s, int): return "~%d~" % (s, ) else: raise # Slow path re_encode = False if PY2: # Use the Unicode translation variant in PY2 if isinstance(s, str): s = s.decode("latin1") re_encode = True s = s.translate(_QUOTE_MAP) if re_encode: return s.encode("latin1") else: return s if fast_quote: quote = fast_quote else: quote = py_quote def py_unquote(s): """Unquote the content of `s`: handle all patterns ``%xNN``, ``%uNNNN`` or ``%UNNNNNNNN``. This is the inverse of :func:`.quote`. """ s_len = len(s) if s_len > 2 and s[0] == _TILDE and s[-1] == _TILDE: try: v = int(s[1:-1], 10) if v // 10 > 3275: # be compatible to the fast C implementation raise OverflowError("index too large") return v except (ValueError, OverflowError): pass if _QUOTE not in s: return s parts = s.split(_QUOTE) res = [parts[0]] res_append = res.append for p in parts[1:]: try: qc = p[0] except IndexError: raise ValueError("unknown quote syntax string: {}".format(s)) if qc == _QUOTE_x: if len(p) < 3: raise ValueError("quote syntax: length too small") res_append(uchr(int(p[1:3], 16))) res_append(p[3:]) elif qc == _QUOTE_u: if len(p) < 5: raise ValueError("quote syntax: length too small") res_append(uchr(int(p[1:5], 16))) res_append(p[5:]) elif qc == _QUOTE_U: if len(p) < 9: raise ValueError("quote syntax: length too small") res_append(uchr(int(p[1:9], 16))) res_append(p[9:]) else: raise ValueError("unknown quote syntax string: {}".format(s)) return _EMPTY_STR.join(res) if fast_unquote: unquote = fast_unquote else: unquote = py_unquote def py_pathstr2path(varname): """Parse a dot-separated path string `varname` into a tuple of unquoted path items :param str varname: The quoted and dot-separated path string :return: The unquoted and parsed path items :rtype: tuple Used e.g. by :meth:`~.Configuration.getvar`, :meth:`~.Configuration.getvar_s` and :meth:`~.Configuration.jailed`. The returned value is suitable as input for :meth:`~.Configuration.getvarl`, :meth:`~.Configuration.getvarl_s` and friends. An empty `varname` returns an empty tuple. """ # # Because str.split yields a non-empty list for an empty string handle # the empty string separately. # if varname: return tuple([unquote(p) for p in varname.split(_HIER_SEPARATOR)]) else: return tuple() if fast_pathstr2path: pathstr2path = fast_pathstr2path else: pathstr2path = py_pathstr2path def _py_split_ns(varname): """Split the variable name string `varname` into the namespace and the namespace-specific name :type varname: str :return: A tuple containing the namespace (or `None`) and the namespace-specific (variable-)name :rtype: tuple(str or None, str) .. note:: The returned namespace may be an empty string if the namespace separator is found. """ ns, sep, rest = varname.partition(_NS_SEPARATOR) if sep: return (unquote(ns), rest) else: return (None, ns) if _fast_split_ns: _split_ns = _fast_split_ns else: _split_ns = _py_split_ns def _py_split_filters(varname, direction): """Split off the filter part from the `varname` string :param str varname: The string where to search for filters :param int direction: +1 means to do a forward search, -1 a backward search :return: The tuple of the variable name without the filters and a list of filters :rtype: tuple(str, list) """ if direction == 1: name, sep, filters = varname.partition(_FILTER_SEPARATOR) elif direction == -1: name, sep, filters = varname.rpartition(_FILTER_SEPARATOR) else: raise ValueError("`direction' must be -1 or +1") if sep: filters = filters.strip() if filters: if direction == 1: if _FILTER_SEPARATOR_2 in filters: return (name, filters.split(_FILTER_SEPARATOR_2)) else: return (name, filters.split(_FILTER_SEPARATOR)) else: return (name, filters.split(_FILTER_SEPARATOR_2)) else: return (name, []) else: return (name, []) if _fast_split_filters: _split_filters = _fast_split_filters else: _split_filters = _py_split_filters class Configuration(CoercingMethodsMixin, _AttributeDict): """The configuration dictionary with attribute support or variable substitution. .. note:: When retrieving by attribute names variables will *not* substituted. """ is_jail = False """Flag to show that this is not a jail for another configuration""" def __init__(self, *args, **kwds): # # PY2.7 compat: must be set before calling the superclass' __init__ # self.enable_cache() super(Configuration, self).__init__(*args, **kwds) def clear_cache(self): """Clear the internal lookup cache and the interpolation cache""" if self.__lookup_cache is not None: self.__lookup_cache.clear() if self.__interpolation_cache is not None: self.__interpolation_cache.clear() def disable_cache(self): self.__lookup_cache = None self.__interpolation_cache = None def enable_cache(self): self.__lookup_cache = {} self.__interpolation_cache = {} def __getitem__(self, key): """Mapping and list interface that forwards to :meth:`~.getvarl_s` """ if isinstance(key, (tuple, list)): return self.getvarl_s(*key) else: return self.getvarl_s(key) def get(self, key, default=None): """Mapping interface that forwards to :meth:`~.getvarl_s` """ if isinstance(key, (tuple, list)): return self.getvarl_s(*key, default=default) else: return self.getvarl_s(key, default=default) def __contains__(self, key): """Containment test""" if isinstance(key, (tuple, list)): # No namespace and quoting support here try: self._lookupvar(*key) except KeyError: return False else: return True else: return super(Configuration, self).__contains__(key) def getitem_ns(self, key): """Just forward to the *original* :meth:`dict.__getitem__`. No variable interpolation and key path access. """ return super(Configuration, self).__getitem__(key) def items(self): """Items without interpolation""" for k in self: yield (k, self.getitem_ns(k)) def values(self): """Values without interpolation""" for k in self: yield self.getitem_ns(k) def py_getvarl(self, *path, **kwds): """Get a variable where the hierarchy is given in `path` as sequence and the namespace is given in the `namespace` keyword argument. No variable interpolation is done and no filters are applied. Quoting of `path` and `namespace` is *not* needed and wrong. """ default = kwds.pop("default", _MARKER) namespace = kwds.pop("namespace", None) try: if not namespace: lookupfn = self._lookupvar else: if namespace == REF_NAMESPACE: lookupfn = self._lookupref else: lookupfn = lookup_varns(namespace) varvalue = lookupfn(*path) except KeyError: if default is _MARKER: raise KeyError("Variable %r not found" % (path,)) else: return default else: return varvalue if _fast_getvarl: def fast_getvarl(self, *args, **kwds): return _fast_getvarl(self, args, **kwds) getvarl = fast_getvarl else: getvarl = py_getvarl def getkeysl(self, *path, **kwds): """Yield the keys of a variable value. :rtype: A generator :raise KeyError: .. note:: Dictionary keys are not subject to interpolation. """ if "default" in kwds: raise TypeError("got unexpected keyword argument: default") for k in self.getvarl(*path, **kwds).keys(): yield k def getfirstvarl(self, *paths, **kwds): """A variant of :meth:`~.getvarl` that returns the first found variable in the `paths` list. Every item in `paths` is either a `tuple` or `list` or a `dict`. If the path item is a `dict` then it must have two keys "namespace" and "path". If the path item is a `list` or `tuple` then the namespace is assumed to be `None`. Note that a caller that wants to use variables from a non-default namespace must use a sequence of dicts. No variable interpolation is done and no filters are applied. Quoting of anything in `paths` is *not* needed and wrong. """ default = kwds.pop("default", _MARKER) for path in paths: if isinstance(path, (list, tuple)): try: varvalue = self.getvarl(*path) except KeyError: pass else: return varvalue elif isinstance(path, dict): try: namespace = path["namespace"] p = path["path"] except KeyError: raise TypeError("a paths dict item must have a `path'" " and a `namespace' key") else: try: varvalue = self.getvarl(*p, namespace=namespace) except KeyError: pass else: return varvalue else: raise TypeError("a paths item must be a dict, list or tuple") if default is _MARKER: raise KeyError( "none of the given variables found: %r" % (paths,)) else: return default def py_getvar(self, varname, default=_MARKER): """Get a variable of the form ``[ns:][[key1.]key2.]name`` - including variables from other namespaces. No variable interpolation is done and no filters are applied. Special characters (e.g. ``:`` and ``.``) must be quoted when using the default namespace. See also :func:`.quote`. """ varns, varname = _split_ns(varname) if not varns: return self.getvarl(*pathstr2path(varname), default=default) else: return self.getvarl(varname, namespace=varns, default=default) if _fast_getvar: def fast_getvar(self, varname, default=_MARKER): return _fast_getvar(self, varname, default) getvar = fast_getvar else: getvar = py_getvar def getkeys(self, varname): """Yield all the keys of a variable value. :rtype: A generator :raise KeyError: .. note:: Dictionary keys are not subject to interpolation. """ for k in self.getvar(varname).keys(): yield k def getfirstvar(self, *varnames, **kwds): """A variant of :meth:`~.getvar` that returns the first found variable in the list of given variables in `varnames`. """ default = kwds.pop("default", _MARKER) for varname in varnames: try: varvalue = self.getvar(varname) except KeyError: pass else: return varvalue if default is _MARKER: raise KeyError( "none of the given variables found: %r" % (varnames,)) else: return default def py_getvarl_s(self, *path, **kwds): """Get a variable - including variables from other namespaces. `path` and `namespace` are interpreted as in :meth:`.getvarl`. But variables will be interpolated recursively within the variable values and filters are applied. For more details see chapter :ref:`variable-interpolation`. """ default = kwds.pop("default", _MARKER) namespace = kwds.pop("namespace", None) try: obj = self.getvarl(*path, namespace=namespace) return self.substitute_variables_in_obj(obj) except KeyError: if default is _MARKER: raise else: return default if _fast_getvarl_s: def fast_getvarl_s(self, *path, **kwds): return _fast_getvarl_s(self, path, **kwds) getvarl_s = fast_getvarl_s else: getvarl_s = py_getvarl_s def getfirstvarl_s(self, *paths, **kwds): """A variant of :meth:`~.getfirstvarl` that does variable interpolation. `paths` and `kwds` are interpreted as in :meth:`.getfirstvarl`. But variables will be interpolated recursively within the variable values and filters are applied. For more details see chapter :ref:`variable-interpolation`. """ default = kwds.pop("default", _MARKER) for path in paths: if isinstance(path, (list, tuple)): try: obj = self.getvarl(*path) except KeyError: pass else: return self.substitute_variables_in_obj(obj) elif isinstance(path, dict): try: namespace = path["namespace"] p = path["path"] except KeyError: raise TypeError("a paths dict item must have a `path'" " and a `namespace' key") else: try: obj = self.getvarl(*p, namespace=namespace) except KeyError: pass else: return self.substitute_variables_in_obj(obj) else: raise TypeError("a paths item must be a dict, list or tuple") if default is _MARKER: raise KeyError( "none of the given variables found: %r" % (paths,)) else: return default def py_getvar_s(self, varname, default=_MARKER): """Get a variable - including variables from other namespaces. `varname` is interpreted as in :meth:`.getvar`. But variables will be interpolated recursively within the variable values and filters are applied. For more details see chapter :ref:`variable-interpolation`. """ varns, varname = _split_ns(varname) try: if not varns: return self.substitute_variables_in_obj( self.getvarl(*pathstr2path(varname))) else: return self.substitute_variables_in_obj( self.getvarl(varname, namespace=varns)) except KeyError: if default is _MARKER: raise else: return default if _fast_getvar_s: def fast_getvar_s(self, varname, default=_MARKER): return _fast_getvar_s(self, varname, default) getvar_s = fast_getvar_s else: getvar_s = py_getvar_s def _py_getvar_s_with_cache_info(self, varname): """Internal variant of :meth:`~.getvar_s` that returns some information whether caching of interpolated values is allowed Caching is currently not allowed when namespaces are used. Currently used by :meth:`~.interpolate_variables`. """ varns, varname = _split_ns(varname) if not varns: # no namespace -> cacheable return ( self.substitute_variables_in_obj( self.getvarl(*pathstr2path(varname))), True ) else: # results from namespaced lookups are currently not cacheable return ( self.substitute_variables_in_obj( self.getvarl(varname, namespace=varns)), False ) def getfirstvar_s(self, *varnames, **kwds): """A variant of :meth:`~.getvar_s` that returns the first found variable in the list of given variables in `varnames`. """ default = kwds.pop("default", _MARKER) for varname in varnames: try: obj = self.getvar(varname) except KeyError: pass else: return self.substitute_variables_in_obj(obj) if default is _MARKER: raise KeyError( "none of the given variables found: %r" % (varnames,)) else: return default def _lookupvar(self, *path): """Lookup a variable within a hierarchy. :raise KeyError: An unexisting `path` raises a `KeyError` """ if not path: return self use_cache = self.__lookup_cache is not None if use_cache: v = self.__lookup_cache.get(path, _MARKER) if v is not _MARKER: if v is _MISSING: raise KeyError( "Configuration variable %r not found" " (negative internal cache value)" % (path,)) else: return v eiref = self.expand_if_reference try: v = eiref(super(Configuration, self).__getitem__(path[0])) for p in path[1:]: v = eiref(v[p]) except TypeError: if use_cache: self.__lookup_cache[path] = _MISSING raise KeyError( "Configuration variable %r not found" "(missing intermediate keys?)" % (path,)) except KeyError: if use_cache: self.__lookup_cache[path] = _MISSING raise if use_cache: self.__lookup_cache[path] = v return v def _lookupref(self, key): """ `key` must be a configuration reference URI without any (namespace) prefixes and suffixes :raise KeyError: If the reference is not found """ return self.expand_ref_uri(key) def expand_if_reference(self, v): """Check whether `v` is a configuration reference and -- if true -- then expand it. `v` must match the pattern ``{{ref:<REFERENCE>}}`` All non-matching texttypes and all non-texttypes are returned unchanged. :raise KeyError: If the reference cannot found """ if not isinstance(v, _TEXTTYPE): return v if v.startswith(_STARTTOK_REF) and v.endswith(_ENDTOK_REF): return self.expand_ref_uri( v[len(_STARTTOK_REF):-len(_ENDTOK_REF)]) else: return v def expand_ref_uri(self, uri): """ :raises KeyError: If the reference URI is not found """ pu = urlsplit(uri) if pu.scheme or pu.netloc or pu.path or pu.query: raise ValueError("only fragment-only URIs are supported") if not pu.fragment: return self if pu.fragment.startswith(_DOT): raise ValueError("relative refs not supported") return self.getvar(pu.fragment) def try_get_reference_uri(self, v): """Check whether `v` is a configuration reference and -- if true -- return the configuration path where the reference points to. If `v` is not a text type or not a reference return `None`. Does not check whether the referenced configuration object exists. :rtype: None or str """ if not isinstance(v, _TEXTTYPE): return None if v.startswith(_STARTTOK_REF) and v.endswith(_ENDTOK_REF): uri = v[len(_STARTTOK_REF):-len(_ENDTOK_REF)] pu = urlsplit(uri) if pu.scheme or pu.netloc or pu.path or pu.query: raise ValueError("only fragment-only URIs are supported") if not pu.fragment: return _EMPTY_STR if pu.fragment.startswith(_DOT): raise ValueError("relative refs not supported") return pu.fragment return None def substitute_variables_in_obj(self, obj): """Recursively expand variables in the object tree `obj`.""" ty = type(obj) if issubclass(ty, _TEXTTYPE): # a string - really replace the value return self.interpolate_variables(obj) elif issubclass(ty, dict): newdict = ty() for k, v in obj.items(): newdict[k] = self.substitute_variables_in_obj(v) return newdict elif issubclass(ty, list): return [self.substitute_variables_in_obj(i) for i in obj] elif issubclass(ty, tuple): return ty([self.substitute_variables_in_obj(i) for i in obj]) elif issubclass(ty, set): newset = ty() for i in obj: newset.add(self.substitute_variables_in_obj(i)) else: return obj def py_interpolate_variables(self, s): """Expand all variables in the single string `s`""" len_s = len(s) if len_s < 4: return s if s == DEL_VALUE: return s start = s.find(_STARTTOK, 0) if start < 0: return s use_cache = self.__interpolation_cache is not None if use_cache: res = self.__interpolation_cache.get(s, _MARKER) if res is not _MARKER: if res is _MISSING: warnings.warn("Cannot interpolate variables in string " "%r (cached)" % (s, ), UserWarning, stacklevel=1) raise KeyError("Cannot interpolate variables in string " "%r (cached)" % (s, )) else: return res if ((len_s >= 6) and (s[2] == _FILTER_SEPARATOR) and (start == 0)): if s.find(_ENDTOK_FILTER, 3) != (len_s - 3): raise ValueError("`{{|' global filter interpolation must end with `|}}'") new_s, filters = _split_filters(s[3:-3], -1) try: varvalue = self.py_interpolate_variables(new_s) except KeyError: if NONE_FILTER in filters: varvalue = None elif EMPTY_FILTER in filters: varvalue = _EMPTY_STR else: raise varvalue = self._apply_filters(filters, varvalue) return varvalue res = [] res_append = res.append rest = 0 cacheable = True while start != -1: res_append(s[rest:start]) end = s.find(_ENDTOK, start) if end < 0: rest = start break varname, filters = _split_filters( s[start+2:end], 1) # noqa: E226 try: varvalue, cacheable = self._py_getvar_s_with_cache_info(varname) except KeyError: cacheable = True if NONE_FILTER in filters: varvalue = None elif EMPTY_FILTER in filters: varvalue = _EMPTY_STR else: if use_cache and cacheable: self.__interpolation_cache[s] = _MISSING warnings.warn("Cannot interpolate variable %r in string " "%r" % (varname, s, ), UserWarning, stacklevel=1) raise if not cacheable: cacheable = False varvalue = self._apply_filters(filters, varvalue) rest = end + 2 # # Dont apply and type conversions to the variable value if # the whole `s` is just one expansion # if (start == 0) and (rest == len_s): if use_cache and cacheable: self.__interpolation_cache[s] = varvalue return varvalue if varvalue is None: pass else: res_append(str_and_u(varvalue)) # don't re-evaluate because `self.getvar_s()` expands already start = s.find(_STARTTOK, rest) res_append(s[rest:]) res = _EMPTY_STR.join(res) if use_cache and cacheable: self.__interpolation_cache[s] = res return res if _fast_interpolate_variables: def fast_interpolate_variables(self, s): return _fast_interpolate_variables( self, s, self.__interpolation_cache) interpolate_variables = fast_interpolate_variables else: interpolate_variables = py_interpolate_variables def _apply_filters(self, filters, value): for name in filters: try: filterfn = lookup_filter(name) except KeyError: # # Convert to NameError because we find a missing filters # a very serious error. # raise NameError("Filter %r not found" % (name, )) else: value = filterfn(self, value) return value def jailed(self, rootpath=None, root=None, bind_root=True): """Return a "jailed" configuration of the current configuration. :param rootpath: a sequence of strings (or objects) that shall emcompass the chroot-like jail of the returned configuration :type rootpath: list or tuple :param str root: a string path expression that shall encompass the chroot-like jail of the returned configuration :param bool bind_root: if you do a :meth:`~.rebind` just after creation of a jailed config you can set `bind_root` to `False`; otherwise use the default :return: a jailed (aka restricted) configuration :rtype: _JailedConfiguration Exactly one of `rootpath` or `root` must be given. """ if rootpath is not None and root is not None: raise ValueError("only one of `rootpath' or `root' can be given") if rootpath is None and root is None: raise ValueError("one of `rootpath' or `root' must be given") if rootpath is not None and not isinstance(rootpath, (list, tuple)): raise TypeError("`rootpath' must be a list or a tuple") if root is not None: # convert to path varns, varname = _split_ns(root) if varns: raise ValueError( "jailed configurations do not support namespaces") rootpath = pathstr2path(root) # if resolve_ref: # while True: # target = self.getvarl(*rootpath) # target_uri = self.try_get_reference_uri(target) # if target_uri is None: # break # rootpath = pathstr2path(target_uri) jc = _JailedConfiguration(*rootpath) if bind_root: jc.rebind(self) return jc def iter_jailed(self, rootpath=None, root=None): """Iterator that yields properly jailed configurations. `rootpath` or `root` must refer to a `list` or `dict` container. """ return self.jailed(rootpath=rootpath, root=root).iter_jailed() class _JailedConfiguration(CoercingMethodsMixin): """A jailed and restricted variant of :class:`Configuration`. Restriction is two-fold: - The access to configuration variables in `config` is restricted to the configuration sub-tree that is configured in `path`. - Not all access-methods of :class:`Configuration` are implemented yet. .. seealso:: :ref:`jailed-configuration` .. note:: There is no namespace support. .. note:: Do not call the constructor directly. Instantiate a jailed configuration from the parent configuration's :meth:`~.Configuration.jailed` factory method. """ __slots__ = ("_base", "_path", "_path_string") is_jail = True """Flag to show that this is a jail for another configuration""" def __init__(self, *path): super(_JailedConfiguration, self).__init__() self._path = path self._path_string = None @property def _pathstr(self): v = self._path_string if v is None: if self._path: v = _HIER_SEPARATOR.join([quote(p) for p in self._path]) \ + _HIER_SEPARATOR else: v = _EMPTY_STR self._path_string = v return v @property def base(self): """Ask for the base (aka parent) configuration". This configuration is always unjailed. """ return self._base def rebind(self, new_base): """Bind the jail to a new unjailed configuration `new_base`. The new configuration base also must have an existing path to the root. :param Configuration new_base: the new base """ if new_base.is_jail: raise TypeError("can only bind to an unjailed configuration") self._base = new_base # # Early error out if the chroot does not exist but allow # degenerated case if `self._path` is empty. # if self._path and self._path not in new_base: raise KeyError( "base key path %r not available in the new base" % (self._path, )) def __getattr__(self, name): """Attribute-style access. Result values are interpolated (i.e. forwarded to :meth:`~.getvarl_s`) """ try: v = self._base.getvarl_s(*(self._path + (name, ))) except KeyError: raise AttributeError("%s has no attribute %r" % (type(self), name)) else: # Wrap a dict into another dict with attribute access support if isinstance(v, dict): return _AttributeDict(v) else: return v def __getitem__(self, key): """Mapping and list interface that forwards to :meth:`~.getvarl_s` """ if isinstance(key, tuple): return self._base.getvarl_s(*(self._path + key)) elif isinstance(key, list): return self._base.getvarl_s(*(self._path + tuple(key))) else: return self._base.getvarl_s(*self._path)[key] def get(self, key, default=None): if isinstance(key, tuple): return self._base.get(self._path + key, default=default) elif isinstance(key, list): return self._base.get(self._path + tuple(key), default=default) else: return self._base.get(self._path + (key, ), default=default) def __contains__(self, key): """Containment support for containers""" if isinstance(key, tuple): return (self._path + key) in self._base elif isinstance(key, list): return (self._path + tuple(key)) in self._base else: return (self._path + (key, )) in self._base def getvarl(self, *path, **kwds): return self._base.getvarl(*(self._path + path), **kwds) def getkeysl(self, *path, **kwds): for k in self._base.getkeysl(*(self._path + path), **kwds): yield k def getfirstvarl(self, *paths, **kwds): real_paths = [] for path in paths: if isinstance(path, (list, tuple)): real_paths.append(self._path + tuple(path)) elif isinstance(path, dict): raise TypeError( "a `dict' is not supported in a jailed configuration") else: raise TypeError("a paths item must be a list or tuple") return self._base.getfirstvarl(*real_paths, **kwds) def getvarl_s(self, *path, **kwds): return self._base.getvarl_s(*(self._path + path), **kwds) def getfirstvarl_s(self, *paths, **kwds): real_paths = [] for path in paths: if isinstance(path, (list, tuple)): real_paths.append(self._path + tuple(path)) elif isinstance(path, dict): raise TypeError( "a `dict' is not supported in a jailed configuration") else: raise TypeError("a paths item must be a list or tuple") return self._base.getfirstvarl_s(*real_paths, **kwds) def getvar(self, varname, **kwds): return self._base.getvarl(*(self._path + pathstr2path(varname)), **kwds) def getkeys(self, varname): for k in self._base.getkeysl(*(self._path + pathstr2path(varname))): yield k def getfirstvar(self, *varnames, **kwds): real_varnames = [self._pathstr + vn for vn in varnames] return self._base.getfirstvar(*real_varnames, **kwds) def getvar_s(self, varname, **kwds): return self._base.getvarl_s(*(self._path + pathstr2path(varname)), **kwds) def getfirstvar_s(self, *varnames, **kwds): real_varnames = [self._pathstr + vn for vn in varnames] return self._base.getfirstvar_s(*real_varnames, **kwds) def __iter__(self): """Iteration support for containers""" return iter(self._base.getvarl_s(*self._path)) def __len__(self): """Length support for containers""" return len(self._base.getvarl_s(*self._path)) def iter_jailed(self): """Iteration support for containers which yields properly jailed sub-jails. Only supported for type `list` or type `dict` jails. """ container = self._base.getvarl(*self._path) if isinstance(container, dict): for k in container: yield self.jailed(rootpath=(k, )) elif isinstance(container, list): len_container = len(container) idx = 0 while idx < len_container: yield self.jailed(rootpath=(idx, )) idx += 1 else: raise TypeError( "jailed iterators only supported for lists and dicts") if PY2: def __nonzero__(self): """Map- and list-style evaluation in boolean context""" return bool(self._base.getvarl_s(*self._path)) else: def __bool__(self): """Map- and list-style evaluation in boolean context""" return bool(self._base.getvarl_s(*self._path)) def jailed(self, rootpath=None, root=None, bind_root=True): """Return a "jailed" configuration that effectively is a subjail of the current jail For a more complete description see :meth:`.Configuration.jailed`. """ if rootpath is not None and root is not None: raise ValueError("only one of `rootpath' or `root' can be given") if rootpath is None and root is None: raise ValueError("one of `rootpath' or `root' must be given") if rootpath is not None and not isinstance(rootpath, (list, tuple)): raise TypeError("`rootpath' must be a list or a tuple") if root is not None: # convert to path varns, varname = _split_ns(root) if varns: raise ValueError( "sub-jails do not support namespaces") rootpath = pathstr2path(varname) if self._path: new_rootpath = self._path + tuple(rootpath) else: new_rootpath = tuple(rootpath) # if resolve_ref: # while True: # target = self._base.getvarl(*new_rootpath) # target_uri = self._base.try_get_reference_uri(target) # if target_uri is None: # break # new_rootpath = pathstr2path(target_uri) sjc = _JailedConfiguration(*new_rootpath) if bind_root: sjc.rebind(self._base) return sjc def __repr__(self): r = "_JailedConfiguration(rootpath=%s)" % n(repr(self._path)) return r if _sync_MISSING: _sync_MISSING(_MISSING) _sync_MARKER(_MARKER)
