Mercurial > hgrepos > Python > libs > ConfigMix
changeset 314:043a6412be3c
Implemented new access methods .getvarl() and .getvarl_s
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Wed, 05 May 2021 01:32:07 +0200 |
| parents | 15a1d5fd0aa1 |
| children | 21fa4b4bfdaa |
| files | CHANGES.txt configmix/config.py tests/test.py |
| diffstat | 3 files changed, 224 insertions(+), 21 deletions(-) [+] |
line wrap: on
line diff
--- a/CHANGES.txt Thu Apr 29 08:43:15 2021 +0200 +++ b/CHANGES.txt Wed May 05 01:32:07 2021 +0200 @@ -28,6 +28,13 @@ Configuration tree references are implemented in the ``ref`` namespace + .. change:: + :tags: feature + + Implemented new access methods + :py:meth:`configmix.config.Configuration.getvarl` and + :py:meth:`configmix.config.Configuration.getvarl_s` + .. changelog:: :version: 0.13 :released: 2021-04-21
--- a/configmix/config.py Thu Apr 29 08:43:15 2021 +0200 +++ b/configmix/config.py Wed May 05 01:32:07 2021 +0200 @@ -64,12 +64,37 @@ _TEXTTYPE = type(u("")) _STARTTOK = u(b"{{") _ENDTOK = u(b"}}") + _HIER_SEPARATOR = u(b'.') _NS_SEPARATOR = u(b':') _FILTER_SEPARATOR = u(b'|') _STARTTOK_REF = _STARTTOK + REF_NAMESPACE + _NS_SEPARATOR _ENDTOK_REF = _ENDTOK _DOT = u(b'.') + def getvarl(self, *names, default=_MARKER, namespace=None): + """Get a variable where the hierarchy is given in `names` as sequence + and the namespace is given in `namespace`. + + No variable interpolation is done and no filters are applied. + + """ + try: + if not namespace: + lookupfn = self._lookupvar + else: + if namespace == REF_NAMESPACE: + lookupfn = self._lookupref + else: + lookupfn = lookup_varns(namespace) + varvalue = lookupfn(*names) + except KeyError: + if default is _MARKER: + raise KeyError("Variable %r not found" % (names,)) + else: + return default + else: + return varvalue + def getvar(self, varname, default=_MARKER): """Get a variable of the form ``[ns:][[key1.]key2.]name`` - including variables from other namespaces. @@ -78,22 +103,31 @@ """ varns, varname = self._split_ns(varname) + if not varns: + varnameparts = varname.split(self._HIER_SEPARATOR) + else: + varnameparts = (varname,) + return self.getvarl(*varnameparts, namespace=varns) + + def getvarl_s(self, *names, default=_MARKER, namespace=None): + """Get a variable - including variables from other namespaces. + + `names` and `namespace` are interpreted as in + :meth:`.getvarl`. But variables will be interpolated + recursively within the variable values and filters are + applied. + + For more details see chapter :ref:`variable-interpolation`. + + """ try: - if not varns: - lookupfn = self._lookupvar - else: - if varns == REF_NAMESPACE: - lookupfn = self._lookupref - else: - lookupfn = lookup_varns(varns) - varvalue = lookupfn(varname) + obj = self.getvarl(*names, namespace=namespace) + return self.substitute_variables_in_obj(obj) except KeyError: if default is _MARKER: - raise KeyError("Variable %r not found" % varname) + raise else: return default - else: - return varvalue def getvar_s(self, varname, default=_MARKER): """Get a variable - including variables from other namespaces. @@ -114,23 +148,48 @@ else: return default + def getintvarl_s(self, *names, default=_MARKER, namespace=None): + """Get a (possibly substituted) variable and coerce text to a + number. + + """ + s = self.getvarl_s(*names, default=default, namespace=namespace) + if isinstance(s, self._TEXTTYPE): + return int(s, 0) + else: + return s + def getintvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and coerce text to a number. """ - s = self.getvar_s(varname, default) + s = self.getvar_s(varname, default=default) if isinstance(s, self._TEXTTYPE): return int(s, 0) else: return s + def getboolvarl_s(self, *names, default=_MARKER, namespace=None): + """Get a (possibly substituted) variable and convert text to a + boolean + + """ + s = self.getvarl_s(*names, default=default, namespace=namespace) + if isinstance(s, self._TEXTTYPE): + sl = s.strip().lower() + if sl not in self._BOOL_CVT: + raise ValueError("Not a boolean: %r" % s) + return self._BOOL_CVT[sl] + else: + return s + def getboolvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and convert text to a boolean """ - s = self.getvar_s(varname, default) + s = self.getvar_s(varname, default=default) if isinstance(s, self._TEXTTYPE): sl = s.strip().lower() if sl not in self._BOOL_CVT: @@ -145,6 +204,17 @@ u('0'): False, u('no'): False, u('false'): False, u('off'): False } + def getfloatvarl_s(self, *names, default=_MARKER, namespace=None): + """Get a (possibly substituted) variable and convert text to a + float + + """ + s = self.getvarl_s(*names, default=default, namespace=namespace) + if isinstance(s, self._TEXTTYPE): + return float(s) + else: + return s + def getfloatvar_s(self, varname, default=_MARKER): """Get a (possibly substituted) variable and convert text to a float @@ -170,24 +240,24 @@ else: return (nameparts[0].rstrip(), nameparts[1:], ) - def _lookupvar(self, key, default=_MARKER): - """Lookup a variable. + def _lookupvar(self, *names, default=_MARKER): + """Lookup a variable within a hierarchy. - If no default is given an unexisting `key` raises a `KeyError` + If no default is given an unexisting `name` raises a `KeyError` else `default` is returned. """ - parts = key.split('.') try: - v = self.expand_if_reference(self[parts[0]]) - for p in parts[1:]: + v = self.expand_if_reference(self[names[0]]) + for p in names[1:]: v = self.expand_if_reference(v[p]) except TypeError: raise KeyError( "Configuration variable %r not found" - "(missing intermediate keys?)" % key) + "(missing intermediate keys?)" % (names,)) except KeyError: if default is _MARKER: - raise KeyError("Configuration variable %r not found" % key) + raise KeyError( + "Configuration variable %r not found" % (names,)) else: return default return v
--- a/tests/test.py Thu Apr 29 08:43:15 2021 +0200 +++ b/tests/test.py Wed May 05 01:32:07 2021 +0200 @@ -288,6 +288,15 @@ self.assertEqual(u(platform.python_version()), cfg.getvar_s("PY:version")) + def test03_namespace_l(self): + cfg = self._load( + os.path.join(TESTDATADIR, "conf20.yml"), + os.path.join(TESTDATADIR, "conf21.yml"), + os.path.join(TESTDATADIR, "conf22.ini")) + self.assertEqual(u(os.getcwd()), cfg.getvarl("cwd", namespace="OS")) + self.assertEqual(u(platform.python_version()), + cfg.getvarl_s("version", namespace="PY")) + def test04_no_filter(self): cfg = self._load( os.path.join(TESTDATADIR, "conf20.yml"), @@ -299,6 +308,17 @@ self.assertRaises(KeyError, _look) + def test04_no_filter_l(self): + cfg = self._load( + os.path.join(TESTDATADIR, "conf20.yml"), + os.path.join(TESTDATADIR, "conf21.yml"), + os.path.join(TESTDATADIR, "conf22.ini")) + + def _look(): + return cfg.getvarl("cwd|upper", namespace="OS") + + self.assertRaises(KeyError, _look) + def test05_comments(self): cfg = self._load(os.path.join(TESTDATADIR, "conf20.yml"), os.path.join(TESTDATADIR, "conf21.yml"), @@ -317,6 +337,24 @@ self.assertRaises(KeyError, _c("db.__comment1")) self.assertRaises(KeyError, _c("db.user.__doc2")) + def test05_comments_l(self): + cfg = self._load(os.path.join(TESTDATADIR, "conf20.yml"), + os.path.join(TESTDATADIR, "conf21.yml"), + os.path.join(TESTDATADIR, "conf22.ini"), + os.path.join(TESTDATADIR, "conf23.json"), + os.path.join(TESTDATADIR, "conf24.toml")) + + def _c(*names): + def _f(): + cfg.getvarl_s(*names) + return _f + + # Variables with leading underscores are *not* imported by default + self.assertEqual(0o0027, int(cfg.getvarl_s("process", "umask"), 0)) + self.assertRaises(KeyError, _c("process", "__doc1")) + self.assertRaises(KeyError, _c("db", "__comment1")) + self.assertRaises(KeyError, _c("db", "user", "__doc2")) + def test06_check_all_comments(self): cfg = self._load(os.path.join(TESTDATADIR, "conf20.yml"), os.path.join(TESTDATADIR, "conf21.yml"), @@ -350,6 +388,25 @@ self.assertEqual("the last value", cfg.getvar_s("to-be-deleted-but-reassigned")) + def test07_deletions_l(self): + cfg = self._load(os.path.join(TESTDATADIR, "conf20.yml"), + os.path.join(TESTDATADIR, "conf21.yml"), + os.path.join(TESTDATADIR, "conf22.ini"), + os.path.join(TESTDATADIR, "conf23.json"), + os.path.join(TESTDATADIR, "conf24.toml"), + os.path.join(TESTDATADIR, "delete-in-dict.yml")) + # automatic clean-up + self.assertRaises(KeyError, cfg.getvarl_s, "not-deleted") + # explicit deletion + self.assertRaises(KeyError, cfg.getvarl_s, "to-be-deleted") + self.assertRaises(KeyError, cfg.getvarl_s, "db" "user.name") + self.assertEqual("the-database-password-2", + cfg.getvarl_s("db", "user", "pwd")) + self.assertRaises(KeyError, cfg.getvarl_s, "test", "Str") + self.assertEqual("not a list any more", cfg.getvarl_s("test", "List")) + self.assertEqual("the last value", + cfg.getvarl_s("to-be-deleted-but-reassigned")) + class T02LoadAndMerge(_T02MixinLoadAndMerge, unittest.TestCase): @@ -490,18 +547,34 @@ cfg = configmix.load(os.path.join(TESTDATADIR, "conf1.ini")) self.assertEqual(2, cfg.getvar_s("key100")) + def test01_expand_int_ini_l(self): + cfg = configmix.load(os.path.join(TESTDATADIR, "conf1.ini")) + self.assertEqual(2, cfg.getvarl_s("key100")) + def test02_expand_int_indirect_ini(self): cfg = configmix.load(os.path.join(TESTDATADIR, "conf1.ini")) self.assertEqual(2, cfg.getvar_s("key102")) + def test02_expand_int_indirect_ini_l(self): + cfg = configmix.load(os.path.join(TESTDATADIR, "conf1.ini")) + self.assertEqual(2, cfg.getvarl_s("key102")) + def test03_expand_int2str_ini(self): cfg = configmix.load(os.path.join(TESTDATADIR, "conf1.ini")) self.assertEqual("the 2 value", cfg.getvar_s("key101")) + def test03_expand_int2str_ini_l(self): + cfg = configmix.load(os.path.join(TESTDATADIR, "conf1.ini")) + self.assertEqual("the 2 value", cfg.getvarl_s("key101")) + def test04_expand_intint2str_ini(self): cfg = configmix.load(os.path.join(TESTDATADIR, "conf1.ini")) self.assertEqual("22", cfg.getvar_s("key103")) + def test04_expand_intint2str_ini_l(self): + cfg = configmix.load(os.path.join(TESTDATADIR, "conf1.ini")) + self.assertEqual("22", cfg.getvarl_s("key103")) + class T06References(unittest.TestCase): @@ -533,6 +606,18 @@ self.assertEqual(self._cfg.getvar("wsgi.debugger"), self._cfg.getvar("wsgi.profiler.params")) + def test01_reference_without_expansions_l(self): + self.assertTrue(isinstance(self._cfg.getvarl("wsgi", "profiler"), dict)) + self.assertTrue(isinstance( + self._cfg.getvarl("wsgi", "profiler", "params"), dict)) + self.assertEqual( + "werkzeug", + self._cfg.getvarl("wsgi", "profiler", "params", "type")) + self.assertTrue(self._cfg.getvarl( + "wsgi", "profiler", "params", "params", "evalex")) + self.assertEqual(self._cfg.getvarl("wsgi", "debugger"), + self._cfg.getvarl("wsgi", "profiler", "params")) + def test02_reference__with_expansions(self): self.assertTrue(isinstance(self._cfg.getvar_s("wsgi.profiler"), dict)) self.assertTrue(isinstance( @@ -542,6 +627,17 @@ self.assertEqual("werkzeug", self._cfg.getvar_s("wsgi.profiler.params.type")) + def test02_reference__with_expansions_l(self): + self.assertTrue(isinstance( + self._cfg.getvarl_s("wsgi", "profiler"), dict)) + self.assertTrue(isinstance( + self._cfg.getvarl_s("wsgi", "profiler", "params"), dict)) + self.assertTrue( + self._cfg.getvarl_s("wsgi", "profiler", "params", "params", "evalex")) + self.assertEqual( + "werkzeug", + self._cfg.getvarl_s("wsgi", "profiler", "params", "type")) + def test03_no_direct_attribute_access_to_expanded_references(self): self.assertEqual( "{{ref:#wsgi.debugger}}", @@ -560,21 +656,43 @@ self.assertTrue( self._cfg.getvar_s("testref.here.params.params.evalex")) + def test04_indirect_recursive_references_l(self): + self.assertEqual( + "werkzeug", + self._cfg.getvarl_s("testref", "here", "params", "type")) + self.assertTrue( + self._cfg.getvarl_s("testref", "here", "params", "params", "evalex")) + def test05_recursive_expansion(self): c = self._cfg.getvar_s("testref") self.assertTrue(c["here"]["params"]["params"]["evalex"]) + def test05_recursive_expansion_l(self): + c = self._cfg.getvarl_s("testref") + self.assertTrue(c["here"]["params"]["params"]["evalex"]) + def test06_no_recursive_expansion_in_getvar_parents(self): v = self._cfg.getvar("wsgi.profiler") self.assertEqual( "{{ref:#wsgi.debugger}}", v["params"]) + def test06_no_recursive_expansion_in_getvar_parents_l(self): + v = self._cfg.getvarl("wsgi", "profiler") + self.assertEqual( + "{{ref:#wsgi.debugger}}", + v["params"]) + def test07_explicit_reference_expansion(self): v = self._cfg.getvar("wsgi.profiler") self.assertTrue(isinstance(self._cfg.expand_if_reference(v["params"]), dict)) + def test07_explicit_reference_expansion_l(self): + v = self._cfg.getvarl("wsgi", "profiler") + self.assertTrue(isinstance(self._cfg.expand_if_reference(v["params"]), + dict)) + def test08_explicit_indirect_expansion_through_value(self): v = self._cfg.getvar_s("expand-ref-value.key0") self.assertTrue(isinstance(v, bool)) @@ -583,6 +701,14 @@ v2 = self._cfg.getvar("expand-ref-value.key0") self.assertEqual("{{testref.here.params.params.evalex}}", v2) + def test08_explicit_indirect_expansion_through_value_l(self): + v = self._cfg.getvarl_s("expand-ref-value", "key0") + self.assertTrue(isinstance(v, bool)) + self.assertTrue(v) + # but not that .getvar does not **not** + v2 = self._cfg.getvarl("expand-ref-value", "key0") + self.assertEqual("{{testref.here.params.params.evalex}}", v2) + if __name__ == "__main__": unittest.main()
