Mercurial > hgrepos > Python > libs > data-schema
diff tests/test_schema.py @ 5:84dfd1a94926
Add the existing implementation.
All tests work.
The documentation as text file is included also.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Thu, 06 Jul 2023 23:41:41 +0200 |
| parents | |
| children | 940676a0de84 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_schema.py Thu Jul 06 23:41:41 2023 +0200 @@ -0,0 +1,2322 @@ + +import copy +import datetime +import re +import unittest + +import _config + +import configmix.yaml + +import data_schema +import data_schema.util + + +TYPE_RE = type(re.compile(r"\A.+\Z")) + + +def _test_generic_validator_for_yaml(obj, schema, context): + """Callback for loading test1.schema.yml: Always successful""" + yield from () + + +class YAML(unittest.TestCase): + + """Tests to load Python objects from YAML with complex Python-specific + tags. + + .. seealso:: https://pyyaml.org/wiki/PyYAMLDocumentation + + """ + + def test_load_python_name(self): + y = configmix.yaml.load("key: !!python/name:data_schema.validate") + self.assertTrue(callable(y["key"])) + + def test_load_re(self): + y = configmix.yaml.load("key: !!python/object/apply:re.compile\n - '^[0-9]+$'") + self.assertTrue(isinstance(y["key"], TYPE_RE)) + + +class SchemaInYAML(unittest.TestCase): + def test_file(self): + with data_schema.util.get_data_stream( + _config.FILEURI_PREFIX + "test1.schema.yml", + basedir=_config.config.getvar_s("projectdir")) as f: + schema = data_schema._Schema( + None, True, configmix.yaml.load(f)) + + def test_data(self): + with data_schema.util.get_data_stream( + "data:testschematalib:test2.schema.yml") as f: + schema = data_schema._Schema( + None, True, configmix.yaml.load(f)) + + +class SchemaCheck(unittest.TestCase): + + def test_root_creation(self): + schema = data_schema._Schema(None, True) + self.assertIsInstance(schema, dict) + self.assertEqual(0, len(schema)) + self.assertFalse(schema) + + def test_root_creation_wrong(self): + self.assertRaises( + ValueError, + data_schema._Schema, + None, + False) + + def test_root_properties(self): + schema = data_schema._Schema(None, True) + self.assertIsNone(schema.parent) + self.assertIs(schema, schema.ROOT) + self.assertTrue(schema.is_sub_root) + self.assertIs(schema, schema.SELF) + + def test_dict_len_bool(self): + schema = data_schema._Schema(None, True, a=1, b=2) + self.assertTrue(schema) + self.assertEqual(2, len(schema)) + + def test_equality(self): + schema1 = data_schema._Schema(None, True, a=1, b=2) + schema2 = data_schema._Schema(None, True, b=2, a=1) + self.assertEqual(schema1, schema2) + self.assertIsNot(schema1, schema2) + + def test_copy(self): + schema = data_schema._Schema(None, True, type="str") + schema2 = schema.copy() + self.assertEqual(schema, schema2) + + def test_deepcopy(self): + d1 = {} + schema = data_schema._Schema(None, True, type="str", b=d1) + schema2 = copy.deepcopy(schema) + self.assertEqual(schema, schema2) + self.assertIs(schema["b"], d1) + self.assertIsNot(schema2["b"], d1) + self.assertIs(schema.parent, schema2.parent) + self.assertIs(schema.is_sub_root, schema2.is_sub_root) + + def test_nested_copy(self): + d1 = {} + d2 = {} + root_schema = data_schema._Schema(None, True, type="str", b=d1) + child_schema = data_schema._Schema(root_schema, True, type="bool", b=d2) + copied_child = child_schema.copy() + self.assertIs(copied_child.ROOT, root_schema) + self.assertIs(copied_child.SELF, copied_child) + self.assertIsNot(copied_child.SELF, root_schema) + self.assertEqual(child_schema, copied_child) + self.assertIs(copied_child["b"], d2) + + def test_nested_deepcopy(self): + d1 = {} + d2 = {} + root_schema = data_schema._Schema(None, True, type="str", b=d1) + child_schema = data_schema._Schema(root_schema, True, type="bool", b=d2) + copied_child = copy.deepcopy(child_schema) + self.assertIs(copied_child.ROOT, root_schema) + self.assertIs(copied_child.SELF, copied_child) + self.assertEqual(child_schema, copied_child) + self.assertIsNot(copied_child["b"], d2) + self.assertNotEqual(root_schema, child_schema) + + +class ContextCheck(unittest.TestCase): + + def test_root_without_settings(self): + self.assertRaises(TypeError, + data_schema.Context, + None, + root_object=object(), + schema=dict()) + + def test_root_context(self): + obj = object() + schema = object() + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + ctx = data_schema.Context( + None, root_object=obj, root_schema=schema, settings=settings) + self.assertEqual("<ROOT>", str(ctx)) + self.assertTrue(ctx.root_object is obj) + self.assertTrue(ctx.root_schema is schema) + self.assertTrue(ctx.settings is settings) + + def test_parent_of_root_context(self): + obj = object() + schema = object() + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + ctx = data_schema.Context( + None, root_object=obj, root_schema=schema, settings=settings) + self.assertTrue(ctx.is_root) + self.assertIsNone(ctx.parent) + try: + ctx.safe_parent + except TypeError: + pass + else: + self.fail( + "Context.safe_parent was expected to raise for a root context") + + def test_root_context_init_root_empty(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + self.assertRaises( + TypeError, + data_schema.Context, None, key="key", settings=settings) + self.assertRaises( + TypeError, + data_schema.Context, None, index="key", settings=settings) + + def test_root_context_init_only_one_of_key_index(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertRaises( + ValueError, + data_schema.Context, root, key="key", index="index") + + def test_root_context_init_exactly_one(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertRaises(TypeError, data_schema.Context, root) + + def test_nonroot_rootobj_schema(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + obj = object() + schema = object() + ctx = data_schema.Context( + None, root_object=obj, root_schema=schema, settings=settings) + self.assertEqual("<ROOT>", str(ctx)) + self.assertTrue(ctx.root_object is obj) + self.assertTrue(ctx.root_schema is schema) + self.assertTrue(ctx.settings is settings) + self.assertRaises(TypeError, data_schema.Context, ctx, index=0, + root_object=object()) + self.assertRaises(TypeError, data_schema.Context, ctx, index=0, + root_schema=object()) + + def test_str(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + ctx1 = data_schema.Context(root, key="key1") + ctx2 = data_schema.Context(ctx1, index=2) + ctx3 = data_schema.Context(ctx2, key="key3") + self.assertEqual("key1 / INDEX:2 / key3", str(ctx3)) + + def test_repr(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + ctx1 = data_schema.Context(root, key="key1") + ctx2 = data_schema.Context(ctx1, index=2) + ctx3 = data_schema.Context(ctx2, key="key3") + self.assertEqual("<Context path=`key1 / INDEX:2 / key3'>", repr(ctx3)) + + def test_root(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertTrue(root.is_root) + self.assertTrue(root is root.root) + self.assertTrue(root.settings is settings) + ctx1 = data_schema.Context(root, key="key1") + self.assertFalse(ctx1.is_root) + self.assertTrue(ctx1.root is root) + self.assertTrue(ctx1.settings is settings) + ctx2 = data_schema.Context(ctx1, index=2) + self.assertTrue(ctx2.settings is settings) + ctx3 = data_schema.Context(ctx2, key="key3") + self.assertEqual("key1 / INDEX:2 / key3", str(ctx3)) + self.assertFalse(ctx3.is_root) + self.assertTrue(ctx3.root is root) + self.assertTrue(ctx3.settings is settings) + + def test_extra_settings_in_between(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + settings2 = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertTrue(root.is_root) + self.assertTrue(root is root.root) + self.assertTrue(root.settings is settings) + ctx1 = data_schema.Context(root, key="key1") + self.assertFalse(ctx1.is_root) + self.assertTrue(ctx1.root is root) + self.assertTrue(ctx1.settings is settings) + ctx2 = data_schema.Context(ctx1, index=2, settings=settings2) + self.assertTrue(ctx2.settings is settings2) + ctx3 = data_schema.Context(ctx2, key="key3") + self.assertEqual("key1 / INDEX:2 / key3", str(ctx3)) + self.assertFalse(ctx3.is_root) + self.assertTrue(ctx3.root is root) + self.assertTrue(ctx3.settings is settings2) + + def test_key_xor_index(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + root = data_schema.Context(None, settings=settings) + self.assertRaises( + ValueError, + data_schema.Context, + root, + index=0, + key="huhu") + + def test_keyindex_requires_key(self): + settings = data_schema.ValidationSettings( + skip_keys=[], break_on_keynames_problems=True) + self.assertRaises( + ValueError, + data_schema.Context, + None, + key_index=0, + settings=settings) + + +class SchemaReferences(unittest.TestCase): + + def setUp(self): + self.empty_schema = data_schema._Schema(None, True) + + def test_no_fragment(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, settings=None) + self.assertRaises(data_schema.SchemaError, + data_schema.try_get_reference, + "object:", + ctx, + self.empty_schema) + + def test_empty_fragment(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, settings=None) + r = data_schema.try_get_reference( + "object:#", + ctx, + self.empty_schema) + self.assertIs(r, ctx.root_object) + self.assertIs(r, ctx.current_object) + + def test_point_fragment_with_root(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, settings=None) + r = data_schema.try_get_reference( + "object:#.", + ctx, + self.empty_schema) + self.assertIs(r, ctx.root_object) + self.assertIs(r, ctx.current_object) + + def test_point_fragment_with_current_object(self): + current_object = { + "current": { + "object": "value"}} + root_object = {"root": current_object} + ctx = data_schema.Context(None, current_object=current_object, + root_object=root_object, settings=None) + r = data_schema.try_get_reference( + "object:#.", + ctx, + self.empty_schema) + self.assertIs(r, ctx.current_object) + self.assertIs(r, current_object) + self.assertIsNot(r, ctx.root_object) + + r = data_schema.try_get_reference( + "object:#.current", + ctx, + self.empty_schema) + self.assertEqual({"object": "value"}, r) + + r = data_schema.try_get_reference( + "object:#.current.object", + ctx, + self.empty_schema) + self.assertEqual("value", r) + + def test_point_fragment_with_invalid_current_object_refs(self): + current_object = { + "current": { + "object": "value"}} + root_object = {"root": current_object} + ctx = data_schema.Context(None, current_object=current_object, + root_object=root_object, settings=None) + r = data_schema.try_get_reference( + "object:#.", + ctx, + self.empty_schema) + self.assertIs(r, ctx.current_object) + self.assertIs(r, current_object) + + r = data_schema.try_get_reference( + "object:#.non-current", + ctx, + self.empty_schema) + self.assertIsNone(r) + + r = data_schema.try_get_reference( + "object:#.non-current.object", ctx, self.empty_schema) + self.assertIsNone(r) + + r = data_schema.try_get_reference( + "object:#.current.non-object", ctx, self.empty_schema) + self.assertIsNone(r) + + self.assertRaises( + data_schema.SchemaError, + data_schema.try_get_reference, + "object:#.current..", + ctx, + self.empty_schema) + + self.assertRaises( + TypeError, + data_schema.try_get_reference, + "object:#..current.object", + ctx, + self.empty_schema) + + def test_fragment_with_current_object_and_root(self): + current_object = { + "current": { + "object": "value"}} + root_object = {"root": current_object} + ctx = data_schema.Context(None, current_object=current_object, + root_object=root_object, settings=None) + r = data_schema.try_get_reference( + "object:#", ctx, self.empty_schema) + self.assertIs(r, ctx.root_object) + self.assertIs(r, root_object) + + def test_default_schema_ref(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, + settings=None) + r = data_schema.try_get_reference("#foo", ctx, self.empty_schema) + self.assertEqual("bar", r) + r = data_schema.try_get_reference("#bar", ctx, self.empty_schema) + self.assertIsNone(r) + sentinel = object() + r = data_schema.try_get_reference("#bar", ctx, self.empty_schema, + default=sentinel) + self.assertIs(r, sentinel) + + def test_object_schema_ref(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, + settings=None) + r = data_schema.try_get_reference("object:#foo", ctx, + self.empty_schema) + self.assertEqual("bar", r) + r = data_schema.try_get_reference("object:#bar", ctx, + self.empty_schema) + self.assertIsNone(r) + sentinel = object() + r = data_schema.try_get_reference( + "object:#bar", + ctx, + self.empty_schema, + default=sentinel) + self.assertIs(r, sentinel) + + def test_nested_keys(self): + sentinel = object() + ctx = data_schema.Context( + None, + root_object={"foo": "bar", + "k1": {"k2": "v2", + "k3": None}, + "k4": None, + "k5": [1, 2, 3]}, + settings=None) + r = data_schema.try_get_reference( + "#k1.k2", ctx, self.empty_schema) + self.assertEqual("v2", r) + r = data_schema.try_get_reference( + "#k1.k3", ctx, self.empty_schema) + self.assertIsNone(r) + r = data_schema.try_get_reference( + "#k1.k3.fornone", ctx, self.empty_schema) + self.assertIsNone(r) + r = data_schema.try_get_reference( + "#k1.k3.fornone", ctx, self.empty_schema, default=sentinel) + self.assertIs(r, sentinel) + r = data_schema.try_get_reference( + "#k5.0", ctx, self.empty_schema, default=sentinel) + self.assertIs(r, sentinel) + r = data_schema.try_get_reference( + "#k6", ctx, self.empty_schema, default=sentinel) + self.assertIs(r, sentinel) + + def test_url_quoted_fragment(self): + ctx = data_schema.Context( + None, + root_object={"foo": "bar", + "k1": {"k2": "v2", + "k3": None}, + "k4": None, + "k5": [1, 2, 3]}, + settings=None) + r = data_schema.try_get_reference( + "#fo%6F", ctx, self.empty_schema) + self.assertEqual("bar", r) + + def test_no_duplicate_unquoting_in_fragment(self): + ctx = data_schema.Context( + None, + root_object={"fo%o": "bar"}, + settings=None) + r = data_schema.try_get_reference( + "#fo%25o", ctx, self.empty_schema) + self.assertEqual("bar", r) + + def test_schema_ref_must_have_fragment(self): + ctx = data_schema.Context(None, root_schema={"foo": "bar"}, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.try_get_reference, + "schema:", + ctx, + self.empty_schema) + + def test_schema_ref_must_have_absolute_fragment(self): + ctx = data_schema.Context(None, root_schema={"foo": "bar"}, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.try_get_reference, + "schema:#", + ctx, + self.empty_schema) + + def test_schema_ref_root_schema(self): + schema = data_schema._Schema( + None, True, {"foo": "bar"}) + ctx = data_schema.Context(None, root_schema=schema, settings=None) + r = data_schema.try_get_reference( + "schema:#/", ctx, schema) + self.assertIs(r, schema) + + def test_unknown_schema_ref_yet(self): + ctx = data_schema.Context(None, root_object={"foo": "bar"}, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.try_get_reference, + "data:#", + ctx, + self.empty_schema) + + def test_schema_not_found(self): + sentinel = object() + root_schema = data_schema._Schema(None, True, {"foo": "bar"}) + ctx = data_schema.Context(None, root_schema=root_schema, + settings=None) + r = data_schema.try_get_reference( + "schema:#/foo/bar", ctx, root_schema, default=sentinel) + self.assertIs(r, sentinel) + r = data_schema.try_get_reference( + "schema:#/foo2", ctx, root_schema, default=sentinel) + self.assertIs(r, sentinel) + r = data_schema.try_get_reference( + "schema:#/foo3", ctx, root_schema) + self.assertIsNone(r) + + def test_schema_is_found(self): + subsubschema = {"foo3": "bar3"} + subschema = {"foo2": subsubschema} + schema = data_schema._Schema(None, True, {"foo": subschema}) + ctx = data_schema.Context(None, root_schema=schema, settings=None) + r = data_schema.try_get_reference( + "schema:#/foo/foo2", ctx, schema) + self.assertEqual(subsubschema, r) + + def test_schema_with_trailing_slash_is_found(self): + subsubschema = {"foo3": "bar3"} + subschema = {"foo2": subsubschema} + schema = data_schema._Schema(None, True, {"foo": subschema}) + ctx = data_schema.Context(None, root_schema=schema, settings=None) + r = data_schema.try_get_reference( + "schema:#/foo/foo2/", ctx, schema) + self.assertIsNone(r) + + def test_schema_is_found_with_quoted_fragment(self): + subsubschema = {"foo3": "bar3"} + subschema = {"foo2": subsubschema} + schema = data_schema._Schema(None, True, {"foo": subschema}) + ctx = data_schema.Context(None, root_schema=schema, settings=None) + r = data_schema.try_get_reference( + "schema:#/f%6Fo/foo%32", ctx, schema) + self.assertEqual(subsubschema, r) + + +class SchemaConditionals(unittest.TestCase): + + def setUp(self): + self._ctx = data_schema.Context( + None, root_object={"foo": "bar", "foo2": None}, settings=None) + + def test_no_cond(self): + schema = data_schema._Schema(None, True, {"type": None}) + self.assertIs(data_schema.process_schema_conditionals( + schema, self._ctx), + schema) + + def test_cond_is_none(self): + schema = data_schema._Schema(None, True, {"type": None, + "cond": None}) + self.assertIs(data_schema.process_schema_conditionals( + schema, self._ctx), + schema) + + def test_ambiguous(self): + schema = data_schema._Schema(None, True, {"type": None, + "cond": None, + "match": None}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_cond_not_a_sequence(self): + schema = data_schema._Schema(None, True, {"type": None, + "cond": {"type": None}}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_match_not_a_sequence(self): + schema = data_schema._Schema( + None, True, {"type": None, + "match": {"type": None}}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_condline_not_a_dict(self): + schema = data_schema._Schema(None, True, {"type": None, + "cond": [None]}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_matchline_not_a_dict(self): + schema = {"type": None, + "match": [None]} + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_cond_unknown_predicate(self): + schema = data_schema._Schema( + None, True, {"type": None, + "cond": [ + {"unexisting-when-xxxx": None, + "then": {}} + ]}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_match_unknown_predicate(self): + schema = data_schema._Schema( + None, True, {"type": None, + "match": [ + {"unexisting-when-xxxx": None, + "then": {}} + ]}) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_simple_replace_when_true(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": True, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("cond" in r) + + def test_simple_replace_when_not_false(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": {"not": False}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("cond" in r) + + def test_simple_merge_when_true(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "old-key": "here I am", + "cond": [ + {"when": True, + "then-merge": { + "require": ["r1", "r2", "r3"], + "new-key": None, + "old-key": "{{::DEL::}}" + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha", "r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("old-key" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_first_wins_1(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha", "hehe"], + "cond": [ + {"when": True, + "then": { + "new-key2": "v2"}}, + {"when": True, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha", "hehe"], r["require"]) + self.assertTrue("new-key2" in r) + self.assertEqual("v2", r["new-key2"]) + self.assertFalse("cond" in r) + + def test_simple_replace_first_wins_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha", "hehe"], + "cond": [ + {"when": False, + "then": { + "new-key2": "v2"}}, + {"when": True, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("new-key2" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_false(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": False, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha"], r["require"]) + self.assertFalse("new-key" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_true(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when-ref-true": '#foo', + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_true_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": {"ref-true": '#foo'}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_is_not_true(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when-ref-true": '#not-a-foo', + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha"], r["require"]) + self.assertTrue("new-key" not in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_is_not_true_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": {"ref-true": '#not-a-foo'}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["huhu", "haha"], r["require"]) + self.assertTrue("new-key" not in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_exists(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when-ref-exists": '#foo2', + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }}, + {"when": True, + "then": { + "new-key3": "val"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("new-key3" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_exists_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "require": ["huhu", "haha"], + "cond": [ + {"when": {"ref-exists": '#foo2'}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }}, + {"when": True, + "then": { + "new-key3": "val"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual(["r1", "r2", "r3"], r["require"]) + self.assertTrue("new-key" in r) + self.assertIsNone(r["new-key"]) + self.assertFalse("new-key3" in r) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_exists_is_false(self): + schema = data_schema._Schema( + None, True, {"type": None, + "cond": [ + {"when-ref-exists": '#foo-not-existing', + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }}, + {"when": True, + "then": { + "new-key3": "val"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertFalse("require" in r) + self.assertFalse("new-key" in r) + self.assertEqual("val", r["new-key3"]) + self.assertFalse("cond" in r) + + def test_simple_replace_when_ref_exists_is_false_2(self): + schema = data_schema._Schema( + None, True, {"type": None, + "cond": [ + {"when": {"ref-exists": '#foo-not-existing'}, + "then": { + "require": ["r1", "r2", "r3"], + "new-key": None, + }}, + {"when": True, + "then": { + "new-key3": "val"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertFalse("require" in r) + self.assertFalse("new-key" in r) + self.assertEqual("val", r["new-key3"]) + self.assertFalse("cond" in r) + + def test_allOf_true(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"all-of": [ + True, + {"ref-exists": '#foo2'}, + {"ref-true": '#foo'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual("string", r["type"]) + + def test_allOf_false(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"all-of": [ + True, + {"ref-exists": '#foo-non-existing'}, + {"ref-true": '#foo'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_short_allOf_true(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": [ + True, + {"ref-exists": '#foo2'}, + {"ref-true": '#foo'}], + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual("string", r["type"]) + + def test_short_allOf_false(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": [ + True, + {"ref-exists": '#foo-non-existing'}, + {"ref-true": '#foo'}], + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_anyOf_true(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"any-of": [ + False, + {"ref-exists": '#foo2'}, + {"ref-true": '#foo'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual("string", r["type"]) + + def test_anyOf_false(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"any-of": [ + False, + {"ref-exists": '#foo2-non'}, + {"ref-true": '#foo2'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_oneOf_true(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"one-of": [ + False, + {"ref-exists": '#foo2'}, + {"not": {"ref-true": '#foo'}}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertEqual("string", r["type"]) + + def test_oneOf_false(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"one-of": [ + False, + {"ref-exists": '#foo2'}, + {"ref-true": '#foo'}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_oneOf_false_2(self): + schema = data_schema._Schema( + None, True, {"cond": [ + {"when": {"one-of": [ + False, + {"not": {"ref-exists": '#foo2'}}, + {"not": {"ref-true": '#foo'}}]}, + "then": {"type": "string"}}, + {"when": True, + "then": {"type": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertIsNot(r, schema) + self.assertIsNone(r["type"]) + + def test_match_nothing(self): + schema = data_schema._Schema( + None, + True, + { + "match": [ + {"when": False, + "then": {"new-key": None}}, + {"when": False, + "then": {"new-key2": None}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertFalse("new-key" in r) + self.assertFalse("new-key2" in r) + + def test_match_all(self): + schema = data_schema._Schema( + None, + True, + { + "match": [ + {"when": True, + "then": {"new-key": "value"}}, + {"when": True, + "then": {"new-key2": "value2"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual("value", r["new-key"]) + self.assertEqual("value2", r["new-key2"]) + + def test_match_some(self): + schema = data_schema._Schema( + None, + True, + { + "match": [ + {"when": True, + "then": {"new-key": "value"}}, + {"when": False, + "then": {"new-key2": "value2"}}, + {"when": True, + "then": {"new-key3": "value3"}}, + {"when": False, + "then": {"new-key4": "value4"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual("value", r["new-key"]) + self.assertFalse("new-key2" in r) + self.assertEqual("value3", r["new-key3"]) + self.assertFalse("new-key4" in r) + + def test_match_some_merge(self): + schema = data_schema._Schema( + None, + True, + {"match": [ + {"when": True, + "then": {"new-key": [1, 2]}}, + {"when": False, + "then": {"new-key2": "value2"}}, + {"when": True, + "then-merge": {"new-key": ["value3"]}}, + {"when": False, + "then": {"new-key3": "value3"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual([1, 2, "value3"], r["new-key"]) + self.assertFalse("new-key2" in r) + self.assertFalse("new-key3" in r) + + def test_match_some_replace(self): + schema = data_schema._Schema( + None, + True, + {"match": [ + {"when": True, + "then": {"new-key": [1, 2]}}, + {"when": False, + "then": {"new-key2": "value2"}}, + {"when": True, + "then-replace": {"new-key": ["value3"]}}, + {"when": True, + "then": {"new-key3": "value3"}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual(["value3"], r["new-key"]) + self.assertFalse("new-key2" in r) + self.assertEqual("value3", r["new-key3"]) + + def test_match_some_merge_existing(self): + # the typical case within vlobby: just extend "required" + schema = data_schema._Schema( + None, True, {"required": [1, 2], + "match": [ + {"when": True, + "then": {"required": [3, 4]}}, + {"when": False, + "then": {"required": [0]}}, + {"when": True, + "then-merge": {"required": [5, 6, 7]}}, + {"when": True, + "then-merge": {"required": [4, 8]}} + ]}) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual([3, 4, 5, 6, 7, 4, 8], r["required"]) + + def test_equal_ref_and_value(self): + schema = data_schema._Schema( + None, True, {"foos": "bar", + "match": [{ + "when": { + "equals": [ + {"ref": "object:#foo"}, + {"value": "bar"}]}, + "then-replace": { + "foos": "new-bar"}}] + }) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual("new-bar", r["foos"]) + + def test_equal_val_and_ref(self): + schema = data_schema._Schema( + None, True, {"foos": "bar", + "cond": [{ + "when": { + "equals": [ + {"val": "bar"}, + {"ref": "object:#foo"}]}, + "then-replace": { + "foos": "new-bar"}}] + }) + r = data_schema.process_schema_conditionals(schema, self._ctx) + self.assertEqual("new-bar", r["foos"]) + + def test_equal_no_list(self): + schema = data_schema._Schema( + None, True, {"foos": "bar", + "match": [{ + "when": { + "equals": {"type": None}, + "then-replace": { + "foos": "new-bar"}}}] + }) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_equal_list_length_mismatch_1(self): + schema = data_schema._Schema( + None, True, {"foo": "bar", + "match": [{ + "when": { + "equals": [ + {"ref": "object:#foo"}]}, + "then-replace": { + "foo": "new-bar"}}] + }) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_equal_list_length_mismatch_3(self): + schema = data_schema._Schema( + None, True, {"foo": "bar", + "match": [{ + "when": { + "equals": [ + {"ref": "object:#foo"}, + {"ref": "object:#foo"}, + {"ref": "object:#foo"}]}, + "then-replace": { + "foo": "new-bar"}}] + }) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_conditionals, + schema, + self._ctx) + + def test_raise_if_scheme_ref_is_not_the_single_key(self): + schema = data_schema._Schema( + None, True, {"$ref": "schema:#/", + "type": None + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_references, + schema, + ctx) + + def test_raise_if_scheme_ref_is_not_the_single_key_root(self): + schema = data_schema._Schema( + None, True, {"$ref": "schema:#/subschema", + "subschema": { + "type": None + } + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + r = data_schema.process_schema_references( + schema, + ctx, + check_single_ref_key=False) + self.assertEqual({"type": None}, r) + + def test_recursive_schema_scheme(self): + barschema = { + "type": "null" + } + fooschema = { + "$ref": "schema:#/bar" + } + schema = data_schema._Schema( + None, True, {"foo": fooschema, + "bar": barschema, + "$ref": "schema:#/foo" + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + r = data_schema.process_schema_references( + schema, + ctx, + check_single_ref_key=False) + self.assertEqual({"type": "null"}, r) + + def test_recursive_schema_scheme_raises_if_non_root_is_not_single_key(self): + barschema = { + "type": "null" + } + fooschema = { + "$ref": "schema:#/bar", + "type": "dict", + } + schema = data_schema._Schema( + None, True, { + "foo": fooschema, + "bar": barschema, + "$ref": "schema:#/foo" + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_references, + schema, + ctx, + check_single_ref_key=False) + + def test_recursive_schema_scheme_ref_to_non_existing_schema_raises(self): + barschema = { + "type": "null" + } + fooschema = { + "$ref": "schema:#/non-bar", + } + schema = data_schema._Schema( + None, True, { + "foo": fooschema, + "bar": barschema, + "$ref": "schema:#/foo" + }) + ctx = data_schema.Context( + None, root_schema=schema, settings=None) + self.assertRaises( + data_schema.SchemaError, + data_schema.process_schema_references, + schema, + ctx, + check_single_ref_key=False) + + +class BasicValidation(unittest.TestCase): + + def test_schema_must_be_a_dict_alike(self): + try: + pr = list(data_schema.validate(None, None)) + except data_schema.SchemaError: + pass + else: + self.assertFalse( + "no SchemaError raised when a non-dict given as schema") + + def test_problem_ctor_nonexisting_code(self): + self.assertRaises(ValueError, data_schema.ValidationProblem, code=2) + + def test_problem_ctor_no_code(self): + self.assertRaises(TypeError, data_schema.ValidationProblem, code=None) + + def test_error_ctor(self): + v = data_schema.ValidationProblem(code=10000) + self.assertEqual(data_schema.ERROR, v.severity) + + def test_warning_ctor(self): + v = data_schema.ValidationProblem(code=80000) + self.assertEqual(data_schema.WARNING, v.severity) + + def test_d1(self): + x = list(data_schema.validate({}, {"type": "dict"})) + self.assertEqual(0, len(x)) + + def test_d1_not_nullable(self): + x = list(data_schema.validate(None, {"type": "dict"})) + self.assertEqual(1, len(x)) + self.assertEqual(10000, x[0].code) + + def test_d1_nullable(self): + x = list(data_schema.validate(None, {"type": "dict", + "nullable": True})) + self.assertEqual(0, len(x)) + + def test_d2(self): + x = list(data_schema.validate([], {"type": "map"})) + self.assertEqual(1, len(x)) + self.assertEqual(data_schema.ERROR, x[0].severity) + self.assertEqual(10000, x[0].code) + + def test_d3(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "required": ["key2"]})) + self.assertEqual(2, len(x)) + self.assertEqual(data_schema.ERROR, x[0].severity) + self.assertEqual(10004, x[0].code) + self.assertEqual("key", x[0].hint) + self.assertEqual(data_schema.ERROR, x[1].severity) + self.assertEqual(10005, x[1].code) + self.assertEqual(["key2"], x[1].hint) + + def test_d4(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "keys": { + "key": {"type": "string"}, + }, + "required": ["key"]})) + self.assertEqual(0, len(x)) + + def test_d5(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "additionalKeys": False})) + self.assertEqual(1, len(x)) + self.assertEqual(10004, x[0].code) + self.assertEqual("key", x[0].hint) + + def test_d5_2(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "additionalKeys": False}, + skip_keys=["key"])) + self.assertEqual(0, len(x)) + + def test_d5_3(self): + x = list(data_schema.validate( + {"key": "value", + "key2": "value"}, + {"type": "dict", + "additionalKeys": False}, + skip_keys=[re.compile(r"\Akey\d*\Z")])) + self.assertEqual(0, len(x)) + + def test_d5_4(self): + x = list(data_schema.validate( + {"key": "value", + "key2": "value"}, + {"type": "dict", + "additionalKeys": False}, + skip_keys=[re.compile(r"\A__.+"), re.compile(r"\Akey\d+\Z")])) + self.assertEqual(1, len(x)) + self.assertEqual(10004, x[0].code) + self.assertEqual("key", x[0].hint) + + def test_d6(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "additionalKeys": True})) + self.assertEqual(0, len(x)) + + def test_d7(self): + x = list(data_schema.validate( + {"key": "value"}, + {"type": "dict", + "additionalKeys": { + "type": "string"}})) + self.assertEqual(0, len(x)) + + def test_d8(self): + x = list(data_schema.validate( + {"key": 1234}, + {"type": "dict", + "additionalKeys": { + "type": "string"}})) + self.assertEqual(1, len(x)) + self.assertEqual(10002, x[0].code) + self.assertEqual(1234, x[0].hint) + + def test_d8_2(self): + x = list(data_schema.validate( + {"key": 1234}, + {"type": "dict", + "additionalKeys": { + "type": "string"}}, + skip_keys=["key"])) + self.assertEqual(0, len(x)) + + def test_d9_non_string_keys(self): + pr = list(data_schema.validate( + {0: "value"}, + {"type": "dict", + "additionalKeys": True})) + self.assertEqual(1, len(pr)) + self.assertEqual(10003, pr[0].code) + + def test_d10_int_dict_keys(self): + pr = list(data_schema.validate( + {1: "value", 2: "value2"}, + {"type": "dict", + "keys": { + 1: {"type": "string"}}, + "additionalKeys": True, + "keyNames": {"type": "int"}})) + self.assertEqual(0, len(pr)) + + def test_error_message(self): + self.assertEqual("dict expected", + data_schema.problem_message(10000)) + pr = data_schema.ValidationProblem(code=10000) + self.assertEqual("dict expected", data_schema.problem_message(pr)) + + self.assertEqual("duplicate dict key", + data_schema.problem_message(80000)) + pr = data_schema.ValidationProblem(code=80000) + self.assertEqual("duplicate dict key", + data_schema.problem_message(pr)) + + self.assertRaises(KeyError, data_schema.problem_message, 1234) + + def test_str_enum(self): + pr = list(data_schema.validate( + "e1", + {"type": "string", + "enum": ["e1", "e2"]})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + "e2", + {"type": "string", + "enum": ["e1", "e2"]})) + self.assertEqual(0, len(pr)) + + def test_str_not_in_enum(self): + pr = list(data_schema.validate( + "e3", + {"type": "string", + "enum": ["e1", "e2"]})) + self.assertEqual(1, len(pr)) + self.assertEqual(10043, pr[0].code) + + def test_str_minlen(self): + pr = list(data_schema.validate( + "", + {"type": "string", + "minLength": 0})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + "", + {"type": "string", + "minLength": 1})) + self.assertEqual(1, len(pr)) + self.assertEqual(10006, pr[0].code) + + pr = list(data_schema.validate( + "x", + {"type": "string", + "minLength": 1})) + self.assertEqual(0, len(pr)) + + def test_str_maxlen(self): + pr = list(data_schema.validate( + "", + {"type": "string", + "maxLength": 0})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + "x", + {"type": "string", + "maxLength": 0})) + self.assertEqual(1, len(pr)) + self.assertEqual(10007, pr[0].code) + + pr = list(data_schema.validate( + "x", + {"type": "string", + "maxLength": 1})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + b"x", + {"type": "string", + "maxLength": 1})) + self.assertEqual(1, len(pr)) + self.assertEqual(10002, pr[0].code) + + @staticmethod + def _pattern_check_function(obj, schema, context=None): + if obj == " 5 ": + yield data_schema.ValidationProblem(code=10009) + + def test_str_re(self): + pr = list(data_schema.validate( + "abc", + {"type": "string", + "pattern": r'\A[0-9]+\Z'})) + self.assertEqual(1, len(pr)) + self.assertEqual(10008, pr[0].code) + + pr = list(data_schema.validate( + "123", + {"type": "string", + "pattern": re.compile(r'\A[a-z]+\Z')})) + self.assertEqual(1, len(pr)) + self.assertEqual(10008, pr[0].code) + + pr = list(data_schema.validate( + "123", + {"type": "string", + "pattern": self._pattern_check_function})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + " 5 ", + {"type": "string", + "pattern": self._pattern_check_function})) + self.assertEqual(1, len(pr)) + self.assertEqual(10009, pr[0].code) + + def test_binary_basic(self): + pr = list(data_schema.validate( + b"", + {"type": "binary"})) + self.assertEqual(0, len(pr)) + + def test_str_is_not_binary(self): + pr = list(data_schema.validate( + "", + {"type": "binary"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10035, pr[0].code) + + @staticmethod + def _binary_pattern_check_function(obj, schema, context=None): + if obj != b"\x00": + yield data_schema.ValidationProblem(code=10009) + + def test_binary_pattern_check(self): + pr = list(data_schema.validate( + b"\x00", + {"type": "binary", + "pattern": self._binary_pattern_check_function})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + b"\x01", + {"type": "binary", + "pattern": self._binary_pattern_check_function})) + self.assertEqual(1, len(pr)) + self.assertEqual(10009, pr[0].code) + + def test_binary_re_str_match(self): + pr = list(data_schema.validate( + b"\x00\x00\x00", + {"type": "binary", + "pattern": u"\\x00+"})) + self.assertEqual(0, len(pr)) + + def test_binary_re_bytes_match(self): + pr = list(data_schema.validate( + b"\x00\x00\x00", + {"type": "binary", + "pattern": b"\x00+"})) + self.assertEqual(0, len(pr)) + + def test_binary_re_str_mismatch(self): + pr = list(data_schema.validate( + b"\x00\x00\x00", + {"type": "binary", + "pattern": u"\\x01+"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10047, pr[0].code) + + def test_binary_re_bytes_mismatch(self): + pr = list(data_schema.validate( + b"\x00\x00\x00", + {"type": "binary", + "pattern": b"\x01+"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10047, pr[0].code) + + def test_binary_length(self): + pr = list(data_schema.validate( + b"", + {"type": "binary", + "minLength": 1})) + self.assertEqual(1, len(pr)) + self.assertEqual(10036, pr[0].code) + + pr = list(data_schema.validate( + b"1", + {"type": "binary", + "maxLength": 0})) + self.assertEqual(1, len(pr)) + self.assertEqual(10037, pr[0].code) + + def test_deny(self): + pr = list(data_schema.validate("abc", {"type": "deny"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10010, pr[0].code) + + def test_accept(self): + pr = list(data_schema.validate("abc", {"type": "accept"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": "accept"})) + self.assertEqual(0, len(pr)) + + def test_null(self): + pr = list(data_schema.validate(None, {"type": "none"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": "null"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": "nil"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": None})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate({}, {"type": None})) + self.assertEqual(1, len(pr)) + self.assertEqual(10011, pr[0].code) + + def test_l1(self): + pr = list(data_schema.validate([], {"type": "list"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(tuple(), {"type": "list"})) + self.assertEqual(0, len(pr)) + + def test_l1_not_nullable(self): + pr = list(data_schema.validate(None, {"type": "list"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10001, pr[0].code) + + def test_l1_nullable(self): + pr = list(data_schema.validate( + None, {"type": "list", "nullable": True})) + self.assertEqual(0, len(pr)) + + def test_l2_default_schema_for_items_is_deny(self): + pr = list(data_schema.validate(["a", "b", "c"], {"type": "list"})) + self.assertEqual(3, len(pr)) + for i in range(0, 3): + self.assertEqual(10010, pr[i].code) + + def test_l3_schema_for_items(self): + pr = list(data_schema.validate( + ["a", "b", "c"], + {"type": "array", + "items": {"type": "string"}})) + self.assertEqual(0, len(pr)) + + def test_t1(self): + pr = list(data_schema.validate([], {"type": "tuple"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(tuple(), {"type": "tuple"})) + self.assertEqual(0, len(pr)) + + def test_t1_not_nullable(self): + pr = list(data_schema.validate(None, {"type": "tuple"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10014, pr[0].code) + + def test_t1_nullable(self): + pr = list(data_schema.validate( + None, {"type": "tuple", "nullable": True})) + self.assertEqual(0, len(pr)) + + def test_t2(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}, + {"type": "accept"}]})) + self.assertEqual(0, len(pr)) + + def test_t3(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}, + {"type": "deny"}]})) + self.assertEqual(1, len(pr)) + self.assertEqual(10010, pr[0].code) + self.assertEqual(2, pr[0].context.index) + + def test_t4(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}]})) + self.assertEqual(1, len(pr)) + self.assertEqual(10017, pr[0].code) + self.assertEqual(2, pr[0].context.index) + + def test_t5(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}], + "additionalItems": True})) + self.assertEqual(0, len(pr)) + + def test_t6(self): + pr = list(data_schema.validate( + ["a", None, {"key": "value"}, {"key": "value"}], + {"type": "tuple", + "items": [ + {"type": "string"}, + {"type": None}], + "additionalItems": { + "type": "dict", + "keys": {"key": {"type": "string"}} + }})) + self.assertEqual(0, len(pr)) + + def test_t7(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + ["a", None, {"key": "value"}, {"key": "value"}, {"key2": "value"}], + {"type": "tuple", + "maxLength": 4, + "items": [ + {"type": "string"}, + {"type": None}], + "additionalItems": { + "type": "dict", + "keys": {"key": {"type": "string"}} + }})) + self.assertEqual(1, len(pr)) + self.assertEqual(10016, pr[0].code) + + def test_t8(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + ["a", None, {"key": "value"}, {"key": "value"}, {"key2": "value"}], + {"type": "tuple", + "minLength": 6, + "maxLength": 4, + "items": [ + {"type": "string"}, + {"type": None}], + "additionalItems": { + "type": "dict", + "keys": {"key": {"type": "string"}} + }})) + self.assertEqual(2, len(pr)) + self.assertEqual(10015, pr[0].code) + self.assertEqual(10016, pr[1].code) + + def test_set1(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + set(["a", None, "b"]), + {"type": "set", + "minLength": 3, + "items": {"any-of": [ + {"type": "string"}, + {"type": None}]}} + )) + self.assertEqual(0, len(pr)) + + def test_set1_not_nullable(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + None, + {"type": "set"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10038, pr[0].code) + + def test_set1_nullable(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + None, + {"type": "set", "nullable": True})) + self.assertEqual(0, len(pr)) + + def test_set2(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + set(["a", None, "b"]), + {"type": "set", + "minLength": 4, + "items": {"any-of": [ + {"type": "string"}, + {"type": None}]}} + )) + self.assertEqual(1, len(pr)) + self.assertEqual(10039, pr[0].code) + + def test_set3(self): + # do not check anything that exceeds maxLength + pr = list(data_schema.validate( + set(["a", None, "b"]), + {"type": "set", + "maxLength": 2, + "items": {"any-of": [ + {"type": "string"}, + {"type": None}]}} + )) + self.assertEqual(1, len(pr)) + self.assertEqual(10040, pr[0].code) + + def test_set4_itemschema(self): + pr = list(data_schema.validate( + set(["a", None, "b"]), + {"type": "set", + "items": {"any-of": [ + {"type": "string"}, + {"type": "int"}]}} + )) + codes = set([p.code for p in pr]) + + self.assertEqual(1, len(pr)) + self.assertEqual(10055, pr[0].code) + self.assertEqual(2, len(pr[0].cause)) + self.assertEqual(10056, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10002, pr[0].cause[0].cause[0].code) + self.assertEqual(10056, pr[0].cause[1].code) + self.assertEqual(1, len(pr[0].cause[1].cause)) + self.assertEqual(10020, pr[0].cause[1].cause[0].code) + + def test_empty(self): + pr = list(data_schema.validate(None, {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate([], {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(["a"], {"type": "empty"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10018, pr[0].code) + + pr = list(data_schema.validate(tuple(), {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(set(), {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(frozenset(), {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(tuple(["a"]), {"type": "empty"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10018, pr[0].code) + + pr = list(data_schema.validate({}, {"type": "empty"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate({"key": "value"}, {"type": "empty"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10018, pr[0].code) + + pr = list(data_schema.validate("", {"type": "empty"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10018, pr[0].code) + + def test_allOf(self): + pr = list(data_schema.validate( + None, + {"type": { + "all-of": [ + {"type": None}, + {"type": "accept"}, + ] + }})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + None, + {"type": { + "all-of": [ + {"type": None}, + {"type": "accept"}, + {"type": "deny"}, + ] + }})) + self.assertEqual(1, len(pr)) + self.assertEqual(10057, pr[0].code) + self.assertEqual(1, len(pr[0].cause)) + self.assertEqual(10058, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10010, pr[0].cause[0].cause[0].code) + + def test_anyOf(self): + pr = list(data_schema.validate( + None, + {"type": { + "any-of": [ + {"type": "deny"}, + {"type": None}, + ] + }})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + None, + {"type": { + "any-of": [ + {"type": "string"}, + {"type": "deny"}, + ] + }})) + self.assertEqual(1, len(pr)) + self.assertEqual(10055, pr[0].code) + self.assertEqual(2, len(pr[0].cause)) + self.assertEqual(10056, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10002, pr[0].cause[0].cause[0].code) + self.assertEqual(10056, pr[0].cause[1].code) + self.assertEqual(1, len(pr[0].cause[1].cause)) + self.assertEqual(10010, pr[0].cause[1].cause[0].code) + + def test_anyOf_with_list(self): + pr = list(data_schema.validate( + None, + {"type": [ + {"type": "deny"}, + {"type": None}, + ]})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + None, + {"type": [ + {"type": "string"}, + {"type": "deny"}, + ]})) + self.assertEqual(1, len(pr)) + self.assertEqual(10055, pr[0].code) + self.assertEqual(2, len(pr[0].cause)) + self.assertEqual(10056, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10002, pr[0].cause[0].cause[0].code) + self.assertEqual(10056, pr[0].cause[1].code) + self.assertEqual(1, len(pr[0].cause[1].cause)) + self.assertEqual(10010, pr[0].cause[1].cause[0].code) + + def test_oneOf(self): + pr = list(data_schema.validate( + None, + {"type": { + "one-of": [ + {"type": "deny"}, + {"type": None}, + ] + }})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + None, + {"type": { + "one-of": [ + {"type": "string"}, + {"type": "deny"}, + ] + }})) + + self.assertEqual(1, len(pr)) + self.assertEqual(10053, pr[0].code) + self.assertEqual(2, len(pr[0].cause)) + self.assertEqual(10054, pr[0].cause[0].code) + self.assertEqual(1, len(pr[0].cause[0].cause)) + self.assertEqual(10002, pr[0].cause[0].cause[0].code) + self.assertEqual(10054, pr[0].cause[1].code) + self.assertEqual(1, len(pr[0].cause[1].cause)) + self.assertEqual(10010, pr[0].cause[1].cause[0].code) + + pr = list(data_schema.validate( + None, + {"type": { + "one-of": [ + {"type": "string"}, + {"type": "deny"}, + {"type": "empty"}, + {"type": None}, + ] + }})) + self.assertEqual(1, len(pr)) + self.assertEqual(10019, pr[0].code) + self.assertEqual("2,3", pr[0].hint) + + def test_not(self): + pr = list(data_schema.validate( + None, + {"type": { + "not": { + "type": "empty"}}})) + self.assertEqual(1, len(pr)) + self.assertEqual(10029, pr[0].code) + + pr = list(data_schema.validate( + None, + {"type": { + "not": { + "type": { + "not": { + "type": "empty"}}}}})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2, + {"type": { + "not": { + "type": "int"}}})) + self.assertEqual(1, len(pr)) + self.assertEqual(10029, pr[0].code) + + pr = list(data_schema.validate( + 1, + {"type": { + "not": { + "type": { + "not": { + "type": "int"}}}}})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2.0, + {"type": { + "not": { + "type": "int"}}})) + self.assertEqual(0, len(pr)) + + def test_not_shortcut(self): + pr = list(data_schema.validate( + None, + {"not": { + "type": "empty"}})) + self.assertEqual(1, len(pr)) + self.assertEqual(10029, pr[0].code) + + pr = list(data_schema.validate( + None, + {"not": { + "not": { + "type": "empty"}}})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2, + {"not": { + "type": "int"}})) + self.assertEqual(1, len(pr)) + self.assertEqual(10029, pr[0].code) + + pr = list(data_schema.validate( + 1, + {"not": { + "not": { + "type": "int"}}})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2.0, + {"not": { + "type": "int"}})) + self.assertEqual(0, len(pr)) + + def test_integer(self): + pr = list(data_schema.validate(1, {"type": "integer"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(1, {"type": "float"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10023, pr[0].code) + + pr = list(data_schema.validate( + 2, + {"type": "int", + "minValue": 3, + "maxValue": 1})) + + self.assertEqual(2, len(pr)) + self.assertEqual(10021, pr[0].code) + self.assertEqual(10022, pr[1].code) + + def test_float(self): + pr = list(data_schema.validate(1.8, {"type": "float"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(1, {"type": "float"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10023, pr[0].code) + + pr = list(data_schema.validate( + 2.0, + {"type": "real", + "minValue": 2.1, + "maxValue": 1.9})) + + self.assertEqual(2, len(pr)) + self.assertEqual(10024, pr[0].code) + self.assertEqual(10025, pr[1].code) + + def test_number(self): + pr = list(data_schema.validate(1.8, {"type": "number"})) + self.assertEqual(0, len(pr)) + pr = list(data_schema.validate(1, {"type": "num"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + 2.0, + {"type": "number", + "minValue": 3, + "maxValue": 1.3})) + + self.assertEqual(2, len(pr)) + self.assertEqual(10031, pr[0].code) + self.assertEqual(10032, pr[1].code) + + pr = list(data_schema.validate({}, {"type": "number"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10030, pr[0].code) + + def test_bool(self): + pr = list(data_schema.validate(True, {"type": "bool"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(True, {"type": "boolean", + "value": True})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(True, {"type": "boolean", + "value": False})) + self.assertEqual(1, len(pr)) + self.assertEqual(10028, pr[0].code) + + pr = list(data_schema.validate(False, {"type": "boolean"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(False, {"type": "boolean", + "value": False})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(False, {"type": "boolean", + "value": True})) + self.assertEqual(1, len(pr)) + self.assertEqual(10027, pr[0].code) + + def test_bool_real(self): + pr = list(data_schema.validate([1, 2], {"type": "bool"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10026, pr[0].code) + + pr = list(data_schema.validate([], {"type": "bool"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10026, pr[0].code) + + pr = list(data_schema.validate(None, {"type": "bool"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10026, pr[0].code) + + @staticmethod + def _check_value_ts(obj, schema, context): + if obj == datetime.datetime.fromtimestamp(0): + yield data_schema.ValidationProblem( + code=10042, hint=obj, context=context) + + def test_timestamp(self): + pr = list(data_schema.validate( + datetime.datetime.utcnow(), + {"type": "timestamp"})) + + pr = list(data_schema.validate( + datetime.datetime.fromtimestamp(180000), + {"type": "datetime", + "value": self._check_value_ts})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate( + datetime.datetime.fromtimestamp(0), + {"type": "datetime", + "value": self._check_value_ts})) + self.assertEqual(1, len(pr)) + self.assertEqual(10042, pr[0].code) + + def test_scalar(self): + pr = list(data_schema.validate(1, {"type": "scalar"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate("", {"type": "scalar"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(False, {"type": "scalar"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(datetime.datetime.utcnow(), + {"type": "scalar"})) + self.assertEqual(0, len(pr)) + + pr = list(data_schema.validate(None, {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate({}, {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate([], {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate(tuple(), {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate(set(), {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate(frozenset(), {"type": "scalar"})) + self.assertEqual(1, len(pr)) + self.assertEqual(10033, pr[0].code) + + pr = list(data_schema.validate( + None, + {"type": { + "one-of": [ + {"type": "scalar"}, + {"type": None}, + ]}})) + self.assertEqual(0, len(pr)) + + +if __name__ == "__main__": + unittest.main()
