Mercurial > hgrepos > Python > libs > ConfigMix
view configmix/config.py @ 428:090a25f36a3d
FIX: Allow jailed configurations to use correctly use base configurations that use a different "default" marker object.
Jailed configurations assumed that their "default" marker object is
identical to the "default" marker object in the unjailed base
configuration. This is not always true, especially if
"_JailedConfiguration.rebind()" is used.
Removed the explicit "default" keyword argument and passed the complete
keywords argument dictionary to the base instead. This triggers correct
default handling in the base.
| author | Franz Glasner <f.glasner@feldmann-mg.com> |
|---|---|
| date | Thu, 09 Dec 2021 13:02:17 +0100 |
| parents | 84d4f82ffe59 |
| children | b96f49c9c76b |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2015-2021, Franz Glasner. All rights reserved. # :License: BSD-3-Clause. See LICENSE.txt for details. # :- """The unified configuration dictionary with attribute support or variable substitution. """ from __future__ import division, absolute_import, print_function __all__ = ["Configuration"] import warnings try: from collections import OrderedDict as ConfigurationBase except ImportError: try: from ordereddict import OrderedDict as ConfigurationBase except ImportError: ConfigurationBase = dict try: from urllib.parse import urlsplit except ImportError: from urlparse import urlsplit from .variables import lookup_varns, lookup_filter from .compat import u, uchr from .constants import REF_NAMESPACE, NONE_FILTER, EMPTY_FILTER _MARKER = object() class _AttributeDict(ConfigurationBase): def __getattr__(self, name): try: v = self[name] except KeyError: raise AttributeError("%s has no attribute %r" % (type(self), name)) else: # Wrap a dict into another dict with attribute access support if isinstance(v, dict): return _AttributeDict(v) else: return v class CoercingMethodsMixin(object): """Mixin to provide some common implementations for retrieval methods that convert return values to a fixed type (int, bool, float). Both :class:`~.Configuration` and :class:`~._JailedConfiguration` use this mixin. """ def getintvarl_s(self, *path, **kwds): """Get a (possibly substituted) variable and coerce text to a number. """ s = self.getvarl_s(*path, **kwds) if isinstance(s, Configuration._TEXTTYPE): return int(s, 0) else: return s def getfirstintvarl_s(self, *paths, **kwds): """Get a (possibly substituted) variable and coerce text to a number. """ s = self.getfirstvarl_s(*paths, **kwds) if isinstance(s, Configuration._TEXTTYPE): return int(s, 0) else: return s def getintvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and coerce text to a number. """ s = self.getvar_s(varname, default=default) if isinstance(s, Configuration._TEXTTYPE): return int(s, 0) else: return s def getfirstintvar_s(self, *varnames, **kwds): """A variant of :meth:`~.getintvar_s` that returns the first found variable in the list of given variables in `varnames`. """ s = self.getfirstvar_s(*varnames, **kwds) if isinstance(s, Configuration._TEXTTYPE): return int(s, 0) else: return s def getboolvarl_s(self, *path, **kwds): """Get a (possibly substituted) variable and convert text to a boolean """ s = self.getvarl_s(*path, **kwds) if isinstance(s, Configuration._TEXTTYPE): sl = s.strip().lower() if sl not in self._BOOL_CVT: raise ValueError("Not a boolean: %r" % s) return self._BOOL_CVT[sl] else: return s def getfirstboolvarl_s(self, *paths, **kwds): """Get a (possibly substituted) variable and convert text to a boolean """ s = self.getfirstvarl_s(*paths, **kwds) if isinstance(s, Configuration._TEXTTYPE): sl = s.strip().lower() if sl not in self._BOOL_CVT: raise ValueError("Not a boolean: %r" % s) return self._BOOL_CVT[sl] else: return s def getboolvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and convert text to a boolean """ s = self.getvar_s(varname, default=default) if isinstance(s, Configuration._TEXTTYPE): sl = s.strip().lower() if sl not in self._BOOL_CVT: raise ValueError("Not a boolean: %r" % s) return self._BOOL_CVT[sl] else: return s def getfirstboolvar_s(self, *varnames, **kwds): """A variant of :meth:`~.getboolvar_s` that returns the first found variable in the list of given variables in `varnames`. """ s = self.getfirstvar_s(*varnames, **kwds) if isinstance(s, Configuration._TEXTTYPE): sl = s.strip().lower() if sl not in self._BOOL_CVT: raise ValueError("Not a boolean: %r" % s) return self._BOOL_CVT[sl] else: return s # Conversion of booleans _BOOL_CVT = { u('1'): True, u('yes'): True, u('true'): True, u('on'): True, u('0'): False, u('no'): False, u('false'): False, u('off'): False } def getfloatvarl_s(self, *path, **kwds): """Get a (possibly substituted) variable and convert text to a float """ s = self.getvarl_s(*path, **kwds) if isinstance(s, Configuration._TEXTTYPE): return float(s) else: return s def getfirstfloatvarl_s(self, *path, **kwds): """Get a (possibly substituted) variable and convert text to a float """ s = self.getfirstvarl_s(*path, **kwds) if isinstance(s, Configuration._TEXTTYPE): return float(s) else: return s def getfloatvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and convert text to a float """ s = self.getvar_s(varname, default) if isinstance(s, Configuration._TEXTTYPE): return float(s) else: return s def getfirstfloatvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and convert text to a float """ s = self.getfirstvar_s(varname, default) if isinstance(s, Configuration._TEXTTYPE): return float(s) else: return s class Configuration(CoercingMethodsMixin, _AttributeDict): """The configuration dictionary with attribute support or variable substitution. .. note:: When retrieving by attribute names variables will *not* substituted. """ # Speed _TEXTTYPE = type(u("")) _STARTTOK = u(b"{{") _ENDTOK = u(b"}}") _HIER_SEPARATOR = u(b'.') _NS_SEPARATOR = u(b':') _FILTER_SEPARATOR = u(b'|') _STARTTOK_REF = _STARTTOK + REF_NAMESPACE + _NS_SEPARATOR _ENDTOK_REF = _ENDTOK _DOT = u(b'.') _QUOTE = u(b'%') _COMMENT = u(b'#') is_jail = False """Flag to show that this is not a jail for another configuration""" def getvarl(self, *path, **kwds): """Get a variable where the hierarchy is given in `path` as sequence and the namespace is given in the `namespace` keyword argument. No variable interpolation is done and no filters are applied. Quoting of `path` and `namespace` is *not* needed and wrong. """ default = kwds.pop("default", _MARKER) namespace = kwds.pop("namespace", None) try: if not namespace: lookupfn = self._lookupvar else: if namespace == REF_NAMESPACE: lookupfn = self._lookupref else: lookupfn = lookup_varns(namespace) varvalue = lookupfn(*path) except KeyError: if default is _MARKER: raise KeyError("Variable %r not found" % (path,)) else: return default else: return varvalue def getkeysl(self, *path, **kwds): """Yield the keys of a variable value. :rtype: A generator :raise KeyError: .. note:: Dictionary keys are not subject to interpolation. """ if "default" in kwds: raise TypeError("got unexpected keyword argument: default") for k in self.getvarl(*path, **kwds).keys(): yield k def getfirstvarl(self, *paths, **kwds): """A variant of :meth:`~.getvarl` that returns the first found variable in the `paths` list. Every item in `paths` is either a `tuple` or `list` or a `dict`. If the path item is a `dict` then it must have two keys "namespace" and "path". If the path item is a `list` or `tuple` then the namespace is assumed to be `None`. Note that a caller that wants to use variables from a non-default namespace must use a sequence of dicts. No variable interpolation is done and no filters are applied. Quoting of anything in `paths` is *not* needed and wrong. """ default = kwds.pop("default", _MARKER) for path in paths: if isinstance(path, (list, tuple)): try: varvalue = self.getvarl(*path) except KeyError: pass else: return varvalue elif isinstance(path, dict): try: namespace = path["namespace"] p = path["path"] except KeyError: raise TypeError("a paths dict item must have a `path'" " and a `namespace' key") else: try: varvalue = self.getvarl(*p, namespace=namespace) except KeyError: pass else: return varvalue else: raise TypeError("a paths item must be a dict, list or tuple") if default is _MARKER: raise KeyError( "none of the given variables found: %r" % (paths,)) else: return default def getvar(self, varname, default=_MARKER): """Get a variable of the form ``[ns:][[key1.]key2.]name`` - including variables from other namespaces. No variable interpolation is done and no filters are applied. Special characters (e.g. ``:`` and ``.``) must be quoted when using the default namespace. See also :meth:`~.quote`. """ varns, varname = self._split_ns(varname) if not varns: if varname: varnameparts = [ self.unquote(vp) for vp in varname.split(self._HIER_SEPARATOR) ] else: varnameparts = tuple() else: varnameparts = (varname,) return self.getvarl(*varnameparts, namespace=varns, default=default) def getkeys(self, varname): """Yield all the keys of a variable value. :rtype: A generator :raise KeyError: .. note:: Dictionary keys are not subject to interpolation. """ for k in self.getvar(varname).keys(): yield k def getfirstvar(self, *varnames, **kwds): """A variant of :meth:`~.getvar` that returns the first found variable in the list of given variables in `varnames`. """ default = kwds.pop("default", _MARKER) for varname in varnames: try: varvalue = self.getvar(varname) except KeyError: pass else: return varvalue if default is _MARKER: raise KeyError( "none of the given variables found: %r" % (varnames,)) else: return default def getvarl_s(self, *path, **kwds): """Get a variable - including variables from other namespaces. `path` and `namespace` are interpreted as in :meth:`.getvarl`. But variables will be interpolated recursively within the variable values and filters are applied. For more details see chapter :ref:`variable-interpolation`. """ default = kwds.pop("default", _MARKER) namespace = kwds.pop("namespace", None) try: obj = self.getvarl(*path, namespace=namespace) return self.substitute_variables_in_obj(obj) except KeyError: if default is _MARKER: raise else: return default def getfirstvarl_s(self, *paths, **kwds): """A variant of :meth:`~.getfirstvarl` that does variable interpolation. `paths` and `kwds` are interpreted as in :meth:`.getfirstvarl`. But variables will be interpolated recursively within the variable values and filters are applied. For more details see chapter :ref:`variable-interpolation`. """ default = kwds.pop("default", _MARKER) for path in paths: if isinstance(path, (list, tuple)): try: obj = self.getvarl(*path) except KeyError: pass else: return self.substitute_variables_in_obj(obj) elif isinstance(path, dict): try: namespace = path["namespace"] p = path["path"] except KeyError: raise TypeError("a paths dict item must have a `path'" " and a `namespace' key") else: try: obj = self.getvarl(*p, namespace=namespace) except KeyError: pass else: return self.substitute_variables_in_obj(obj) else: raise TypeError("a paths item must be a dict, list or tuple") if default is _MARKER: raise KeyError( "none of the given variables found: %r" % (paths,)) else: return default def getvar_s(self, varname, default=_MARKER): """Get a variable - including variables from other namespaces. `varname` is interpreted as in :meth:`.getvar`. But variables will be interpolated recursively within the variable values and filters are applied. For more details see chapter :ref:`variable-interpolation`. """ try: obj = self.getvar(varname) return self.substitute_variables_in_obj(obj) except KeyError: if default is _MARKER: raise else: return default def getfirstvar_s(self, *varnames, **kwds): """A variant of :meth:`~.getvar_s` that returns the first found variable in the list of given variables in `varnames`. """ default = kwds.pop("default", _MARKER) for varname in varnames: try: obj = self.getvar(varname) except KeyError: pass else: return self.substitute_variables_in_obj(obj) if default is _MARKER: raise KeyError( "none of the given variables found: %r" % (varnames,)) else: return default def _split_ns(self, s): nameparts = s.split(self._NS_SEPARATOR, 1) if len(nameparts) == 1: return (None, s, ) else: return (self.unquote(nameparts[0]), nameparts[1], ) def _split_filters(self, s): nameparts = s.split(self._FILTER_SEPARATOR) if len(nameparts) == 1: return (s, [], ) else: return (nameparts[0].rstrip(), nameparts[1:], ) def _lookupvar(self, *path, **kwds): """Lookup a variable within a hierarchy. If no default is given an unexisting `path` raises a `KeyError` else `default` is returned. """ default = kwds.pop("default", _MARKER) if not path: return self try: v = self.expand_if_reference(self[path[0]]) for p in path[1:]: v = self.expand_if_reference(v[p]) except TypeError: raise KeyError( "Configuration variable %r not found" "(missing intermediate keys?)" % (path,)) except KeyError: if default is _MARKER: raise KeyError( "Configuration variable %r not found" % (path,)) else: return default return v def _lookupref(self, key, default=_MARKER): """ `key` must be a configuration reference URI without any (namespace) prefixes and suffixes """ return self.expand_ref_uri(key, default=default) def expand_if_reference(self, v, default=_MARKER): """Check whether `v` is a configuration reference and -- if true -- then expand it. `v` must match the pattern ``{{ref:<REFERENCE>}}`` All non-matching texttypes and all non-texttypes are returned unchanged. """ if not isinstance(v, self._TEXTTYPE): return v if not v.startswith(self._STARTTOK_REF) \ or not v.endswith(self._ENDTOK_REF): return v return self.expand_ref_uri( v[len(self._STARTTOK_REF):-len(self._ENDTOK_REF)], default=default) def expand_ref_uri(self, uri, default=_MARKER): pu = urlsplit(uri) if pu.scheme or pu.netloc or pu.path or pu.query: raise ValueError("only fragment-only URIs are supported") if not pu.fragment: return self if pu.fragment.startswith(self._DOT): raise ValueError("relative refs not supported") return self.getvar(pu.fragment, default=default) def substitute_variables_in_obj(self, obj): """Recursively expand variables in the object tree `obj`.""" if isinstance(obj, self._TEXTTYPE): # a string - really replace the value return self.expand_variable(obj) elif isinstance(obj, list): return [self.substitute_variables_in_obj(i) for i in obj] elif isinstance(obj, tuple): tmp = [self.substitute_variables_in_obj(i) for i in obj] return type(obj)(tmp) elif isinstance(obj, dict): newdict = type(obj)() for k in obj: newdict[k] = self.substitute_variables_in_obj(obj[k]) return newdict elif isinstance(obj, set): newset = type(obj)() for i in obj: newset.add(self.substitute_variables_in_obj(i)) else: return obj def expand_variable(self, s): """Expand variables in the single string `s`""" start = s.find(self._STARTTOK, 0) while start != -1: end = s.find(self._ENDTOK, start) if end < 0: return s varname, filters = self._split_filters(s[start+2:end]) try: if NONE_FILTER in filters: varvalue = self._apply_filters( filters, self.getvar_s(varname, default=None)) elif EMPTY_FILTER in filters: varvalue = self._apply_filters( filters, self.getvar_s(varname, default=u(""))) else: varvalue = self._apply_filters( filters, self.getvar_s(varname)) except KeyError: warnings.warn("Cannot expand variable %r in string " "%r" % (varname, s, ), UserWarning, stacklevel=1) raise # # Dont apply and type conversions to the variable value if # the whole `s` is just one expansion # if (start == 0) and (end + 2 == len(s)): return varvalue if varvalue is None: varvalue = u("") replaced = s[:start] + u(str(varvalue)) s = replaced + s[end+2:] # don't re-evaluate because `self.getvar_s()` expands already start = s.find(self._STARTTOK, len(replaced)) return s def _apply_filters(self, filters, value): for name in filters: try: filterfn = lookup_filter(name) except KeyError: # # Convert to NameError because we find a missing filters # a very serious error. # raise NameError("Filter %r not found" % name) else: value = filterfn(self, value) return value @classmethod def quote(klass, s): """Replace important special characters in string `s` by replacing them with ``%xNN`` where `NN` are the two hexadecimal digits of the characters unicode codepoint value. Handled are the important special chars: ``%``, ``.``, ``:``, ``#``; ``'``, ``"``, ``|``, ``{``, ``}``, ``[`` and ``]``. See also the :ref:`quoting` section. """ qc = klass._QUOTE s = s.replace(qc, qc + "x25") s = s.replace(klass._DOT, qc + "x2e") s = s.replace(klass._NS_SEPARATOR, qc + "x3a") s = s.replace(klass._COMMENT, qc + "x23") s = s.replace(klass._FILTER_SEPARATOR, qc + "x7c") s = s.replace('"', qc + "x22") s = s.replace("'", qc + "x27") s = s.replace('{', qc + "x7b") s = s.replace('}', qc + "x7d") s = s.replace('[', qc + "x5b") return s.replace(']', qc + "x5d") @classmethod def unquote(klass, s): """Unquote the content of `s`: handle all patterns ``%xNN``, ``%uNNNN`` or ``%UNNNNNNNN``. This is the inverse of :meth:`~.quote`. """ if klass._QUOTE not in s: return s res = [] parts = s.split(klass._QUOTE) res.append(parts[0]) for p in parts[1:]: if p.startswith(u(b'x')): if len(p) < 3: raise ValueError("quote syntax: length too small") res.append(uchr(int(p[1:3], 16))) res.append(p[3:]) elif p.startswith(u(b'u')): if len(p) < 5: raise ValueError("quote syntax: length too small") res.append(uchr(int(p[1:5], 16))) res.append(p[5:]) elif p.startswith(u(b'U')): if len(p) < 9: raise ValueError("quote syntax: length too small") res.append(uchr(int(p[1:9], 16))) res.append(p[9:]) else: raise ValueError("unknown quote syntax string: {}".format(s)) return ''.join(res) def jailed(self, rootpath=None, root=None, bind_root=True): """Return a "jailed" configuration of the current configuration. :param rootpath: a sequence of strings that shall emcompass the chroot-like jail of the returned configuration :type rootpath: list or tuple :param str root: a string path expression that shall encompass the chroot-like jail of the returned configuration :param bool bind_root: if you do a :meth:`~.rebind` just after creation of a jailed config you can set `bind_root` to `False`; otherwise use the default :return: a jailed (aka restricted) configuration :rtype: _JailedConfiguration Exactly one of `rootpath` or `root` must be given. """ if rootpath is not None and root is not None: raise ValueError("only one of `rootpath' or `root' can be given") if rootpath is None and root is None: raise ValueError("one of `rootpath' or `root' must be given") if rootpath is not None and not isinstance(rootpath, (list, tuple)): raise TypeError("`rootpath' must be a list or a tuple") if root is not None: # convert to path varns, varname = self._split_ns(root) if varns: raise ValueError( "jailed configurations do not support namespaces") if varname: rootpath = [ self.unquote(p) for p in root.split( self._HIER_SEPARATOR) ] else: rootpath = tuple() jc = _JailedConfiguration(*rootpath) if bind_root: jc.rebind(self) return jc class _JailedConfiguration(CoercingMethodsMixin): """A jailed and restricted variant of :class:`Configuration`. Restriction is two-fold: - The access to configuration variables in `config` is restricted to the configuration sub-tree that is configured in `path`. - Not all access-methods of :class:`Configuration` are implemented yet. .. seealso:: :ref:`jailed-configuration` .. note:: There is no namespace support. .. note:: Do not call the constructor directly. Instantiate a jailed configuration from the parent configuration's :meth:`~.Configuration.jailed` factory method. """ __slots__ = ("_base", "_path", "_pathstr") is_jail = True """Flag to show that this is a jail for another configuration""" def __init__(self, *path): super(_JailedConfiguration, self).__init__() self._path = path if path: self._pathstr = \ Configuration._HIER_SEPARATOR.join( [Configuration.quote(p) for p in path]) \ + Configuration._HIER_SEPARATOR else: self._pathstr = '' @property def base(self): """Ask for the base (aka parent) configuration". This configuration is always unjailed. """ return self._base def rebind(self, new_base): """Bind the jail to a new unjailed configuration `new_base`. The new configuration base also must have an existing path to the root. :param Configuration new_base: the new base """ if new_base.is_jail: raise TypeError("can only bind to an unjailed configuration") self._base = new_base # # Early error out if the chroot does not exist but allow # degenerated case if `self._path` is empty. # if self._path: new_base.getvarl(*self._path) def getvarl(self, *path, **kwds): return self._base.getvarl(*(self._path + path), **kwds) def getkeysl(self, *path, **kwds): for k in self._base.getkeysl(*(self._path + path), **kwds): yield k def getfirstvarl(self, *paths, **kwds): real_paths = [] for path in paths: if isinstance(path, (list, tuple)): real_paths.append(self._path + path) elif isinstance(path, dict): raise TypeError( "a `dict' is not supported in a jailed configuration") else: raise TypeError("a paths item must be a list or tuple") return self._base.getfirstvarl(*real_paths, **kwds) def getvarl_s(self, *path, **kwds): return self._base.getvarl_s(*(self._path + path), **kwds) def getfirstvarl_s(self, *paths, **kwds): real_paths = [] for path in paths: if isinstance(path, (list, tuple)): real_paths.append(self._path + path) elif isinstance(path, dict): raise TypeError( "a `dict' is not supported in a jailed configuration") else: raise TypeError("a paths item must be a list or tuple") return self._base.getfirstvarl_s(*real_paths, **kwds) def getvar(self, varname, **kwds): return self._base.getvar(self._pathstr + varname, **kwds) def getkeys(self, varname): for k in self._base.getkeys(self._pathstr + varname): yield k def getfirstvar(self, *varnames, **kwds): real_varnames = [self._pathstr + vn for vn in varnames] return self._base.getfirstvar(*real_varnames, **kwds) def getvar_s(self, varname, **kwds): return self._base.getvar_s(self._pathstr + varname, **kwds) def getfirstvar_s(self, *varnames, **kwds): real_varnames = [self._pathstr + vn for vn in varnames] return self._base.getfirstvar_s(*real_varnames, **kwds) def jailed(self, rootpath=None, root=None, bind_root=True): """Return a "jailed" configuration that effectively is a subjail of the current jail For a more complete description see :meth:`.Configuration.jailed`. """ if rootpath is not None and root is not None: raise ValueError("only one of `rootpath' or `root' can be given") if rootpath is None and root is None: raise ValueError("one of `rootpath' or `root' must be given") if rootpath is not None and not isinstance(rootpath, (list, tuple)): raise TypeError("`rootpath' must be a list or a tuple") if root is not None: # convert to path varns, varname = self._base._split_ns(root) if varns: raise ValueError( "sub-jails do not support namespaces") if varname: rootpath = [ self._base.unquote(p) for p in varname.split( self._base._HIER_SEPARATOR) ] else: rootpath = tuple() if self._path: new_rootpath = self._path + tuple(rootpath) else: new_rootpath = rootpath sjc = _JailedConfiguration(*new_rootpath) if bind_root: sjc.rebind(self._base) return sjc
