Mercurial > hgrepos > Python > libs > ConfigMix
comparison configmix/yaml.py @ 291:edf5cc1ffd26
Provide an optional "strict" keyword flag to all YAML load functions to detect and prevent duplicate keys within a single YAML document
| author | Franz Glasner <f.glasner@feldmann-mg.com> |
|---|---|
| date | Wed, 10 Feb 2021 14:47:41 +0100 |
| parents | ff964825a75a |
| children | eed16a1ec8f3 |
comparison
equal
deleted
inserted
replaced
| 290:aec97edf7945 | 291:edf5cc1ffd26 |
|---|---|
| 1 # -*- coding: utf-8 -*- | 1 # -*- coding: utf-8 -*- |
| 2 # :- | 2 # :- |
| 3 # :Copyright: (c) 2015-2020, Franz Glasner. All rights reserved. | 3 # :Copyright: (c) 2015-2021, Franz Glasner. All rights reserved. |
| 4 # :License: 3-clause BSD. See LICENSE.txt for details. | 4 # :License: 3-clause BSD. See LICENSE.txt for details. |
| 5 # :- | 5 # :- |
| 6 """Simple wrapper for :mod:`yaml` to support all-unicode strings when | 6 """Simple wrapper for :mod:`yaml` to support all-unicode strings when |
| 7 loading configuration files. | 7 loading configuration files. |
| 8 | 8 |
| 40 This is against YAML specs but within configuration files it seems | 40 This is against YAML specs but within configuration files it seems |
| 41 more natural. | 41 more natural. |
| 42 | 42 |
| 43 """ | 43 """ |
| 44 | 44 |
| 45 def __init__(self, *args, **kwds): | |
| 46 strict = kwds.pop("strict", False) | |
| 47 self.__allow_duplicate_keys = not strict | |
| 48 yaml.Loader.__init__(self, *args, **kwds) | |
| 49 | |
| 45 def construct_yaml_str(self, node): | 50 def construct_yaml_str(self, node): |
| 46 return self.construct_scalar(node) | 51 return self.construct_scalar(node) |
| 47 | 52 |
| 48 if OrderedDict: | 53 # |
| 49 | 54 # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1 |
| 50 # | 55 # (MIT License) |
| 51 # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1 | 56 # |
| 52 # (MIT License) | 57 |
| 53 # | 58 def construct_yaml_map(self, node): |
| 54 | 59 data = DictImpl() |
| 55 def construct_yaml_map(self, node): | 60 yield data |
| 56 data = OrderedDict() | 61 value = self.construct_mapping(node) |
| 57 yield data | 62 data.update(value) |
| 58 value = self.construct_mapping(node) | 63 |
| 59 data.update(value) | 64 def construct_mapping(self, node, deep=False): |
| 60 | 65 if isinstance(node, yaml.MappingNode): |
| 61 def construct_mapping(self, node, deep=False): | 66 self.flatten_mapping(node) |
| 62 if isinstance(node, yaml.MappingNode): | 67 else: |
| 63 self.flatten_mapping(node) | 68 raise yaml.constructor.ConstructorError( |
| 64 else: | 69 None, |
| 65 raise yaml.constructor.ConstructorError( | 70 None, |
| 66 None, | 71 'expected a mapping node, but found %s' % node.id, |
| 67 None, | 72 node.start_mark) |
| 68 'expected a mapping node, but found %s' % node.id, | 73 |
| 69 node.start_mark) | 74 mapping = DictImpl() |
| 70 | 75 for key_node, value_node in node.value: |
| 71 mapping = OrderedDict() | 76 key = self.construct_object(key_node, deep=deep) |
| 72 for key_node, value_node in node.value: | 77 try: |
| 73 key = self.construct_object(key_node, deep=deep) | 78 hash(key) |
| 74 try: | 79 except TypeError as err: |
| 75 hash(key) | 80 raise yaml.constructor.ConstructorError( |
| 76 except TypeError as err: | 81 'while constructing a mapping', node.start_mark, |
| 77 raise yaml.constructor.ConstructorError( | 82 'found unacceptable key (%s)' % (err, |
| 78 'while constructing a mapping', node.start_mark, | 83 key_node.start_mark) |
| 79 'found unacceptable key (%s)' % (err, | 84 ) |
| 80 key_node.start_mark) | 85 value = self.construct_object(value_node, deep=deep) |
| 81 ) | 86 if not self.__allow_duplicate_keys and key in mapping: |
| 82 value = self.construct_object(value_node, deep=deep) | 87 raise yaml.constructor.ConstructorError( |
| 83 mapping[key] = value | 88 'while constructing a mapping', node.start_mark, |
| 84 return mapping | 89 'found duplicate key %r (%s)' % (key, |
| 90 key_node.start_mark) | |
| 91 ) | |
| 92 mapping[key] = value | |
| 93 return mapping | |
| 85 | 94 |
| 86 | 95 |
| 87 ConfigLoader.add_constructor( | 96 ConfigLoader.add_constructor( |
| 88 u("tag:yaml.org,2002:str"), | 97 u("tag:yaml.org,2002:str"), |
| 89 ConfigLoader.construct_yaml_str) | 98 ConfigLoader.construct_yaml_str) |
| 90 if OrderedDict: | 99 ConfigLoader.add_constructor( |
| 91 ConfigLoader.add_constructor( | 100 u("tag:yaml.org,2002:map"), |
| 92 u("tag:yaml.org,2002:map"), | 101 ConfigLoader.construct_yaml_map) |
| 93 ConfigLoader.construct_yaml_map) | 102 ConfigLoader.add_constructor( |
| 94 ConfigLoader.add_constructor( | 103 u("tag:yaml.org,2002:omap"), |
| 95 u("tag:yaml.org,2002:omap"), | 104 ConfigLoader.construct_yaml_map) |
| 96 ConfigLoader.construct_yaml_map) | |
| 97 | 105 |
| 98 | 106 |
| 99 class ConfigSafeLoader(yaml.SafeLoader): | 107 class ConfigSafeLoader(yaml.SafeLoader): |
| 100 | 108 |
| 101 """A safe YAML loader, which makes all ``!!str`` strings to Unicode. | 109 """A safe YAML loader, which makes all ``!!str`` strings to Unicode. |
| 106 This is against YAML specs but within configuration files it seems | 114 This is against YAML specs but within configuration files it seems |
| 107 more natural. | 115 more natural. |
| 108 | 116 |
| 109 """ | 117 """ |
| 110 | 118 |
| 119 def __init__(self, *args, **kwds): | |
| 120 strict = kwds.pop("strict", False) | |
| 121 self.__allow_duplicate_keys = not strict | |
| 122 yaml.SafeLoader.__init__(self, *args, **kwds) | |
| 123 | |
| 111 def construct_yaml_str(self, node): | 124 def construct_yaml_str(self, node): |
| 112 return self.construct_scalar(node) | 125 return self.construct_scalar(node) |
| 113 | 126 |
| 114 if OrderedDict: | 127 # |
| 115 | 128 # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1 |
| 116 # | 129 # (MIT License) |
| 117 # From https://pypi.python.org/pypi/yamlordereddictloader/0.1.1 | 130 # |
| 118 # (MIT License) | 131 |
| 119 # | 132 def construct_yaml_map(self, node): |
| 120 | 133 data = DictImpl() |
| 121 def construct_yaml_map(self, node): | 134 yield data |
| 122 data = OrderedDict() | 135 value = self.construct_mapping(node) |
| 123 yield data | 136 data.update(value) |
| 124 value = self.construct_mapping(node) | 137 |
| 125 data.update(value) | 138 def construct_mapping(self, node, deep=False): |
| 126 | 139 if isinstance(node, yaml.MappingNode): |
| 127 def construct_mapping(self, node, deep=False): | 140 self.flatten_mapping(node) |
| 128 if isinstance(node, yaml.MappingNode): | 141 else: |
| 129 self.flatten_mapping(node) | 142 raise yaml.constructor.ConstructorError( |
| 130 else: | 143 None, |
| 131 raise yaml.constructor.ConstructorError( | 144 None, |
| 132 None, | 145 'expected a mapping node, but found %s' % node.id, |
| 133 None, | 146 node.start_mark) |
| 134 'expected a mapping node, but found %s' % node.id, | 147 |
| 135 node.start_mark) | 148 mapping = DictImpl() |
| 136 | 149 for key_node, value_node in node.value: |
| 137 mapping = OrderedDict() | 150 key = self.construct_object(key_node, deep=deep) |
| 138 for key_node, value_node in node.value: | 151 try: |
| 139 key = self.construct_object(key_node, deep=deep) | 152 hash(key) |
| 140 try: | 153 except TypeError as err: |
| 141 hash(key) | 154 raise yaml.constructor.ConstructorError( |
| 142 except TypeError as err: | 155 'while constructing a mapping', node.start_mark, |
| 143 raise yaml.constructor.ConstructorError( | 156 'found unacceptable key (%s)' % (err, |
| 144 'while constructing a mapping', node.start_mark, | 157 key_node.start_mark) |
| 145 'found unacceptable key (%s)' % (err, | 158 ) |
| 146 key_node.start_mark) | 159 value = self.construct_object(value_node, deep=deep) |
| 147 ) | 160 if not self.__allow_duplicate_keys and key in mapping: |
| 148 value = self.construct_object(value_node, deep=deep) | 161 raise yaml.constructor.ConstructorError( |
| 149 mapping[key] = value | 162 'while constructing a mapping', node.start_mark, |
| 150 return mapping | 163 'found duplicate key %r (%s)' % (key, |
| 164 key_node.start_mark) | |
| 165 ) | |
| 166 mapping[key] = value | |
| 167 return mapping | |
| 151 | 168 |
| 152 | 169 |
| 153 ConfigSafeLoader.add_constructor( | 170 ConfigSafeLoader.add_constructor( |
| 154 u("tag:yaml.org,2002:str"), | 171 u("tag:yaml.org,2002:str"), |
| 155 ConfigSafeLoader.construct_yaml_str) | 172 ConfigSafeLoader.construct_yaml_str) |
| 156 if OrderedDict: | 173 ConfigSafeLoader.add_constructor( |
| 157 ConfigSafeLoader.add_constructor( | 174 u("tag:yaml.org,2002:map"), |
| 158 u("tag:yaml.org,2002:map"), | 175 ConfigSafeLoader.construct_yaml_map) |
| 159 ConfigSafeLoader.construct_yaml_map) | 176 ConfigSafeLoader.add_constructor( |
| 160 ConfigSafeLoader.add_constructor( | 177 u("tag:yaml.org,2002:omap"), |
| 161 u("tag:yaml.org,2002:omap"), | 178 ConfigSafeLoader.construct_yaml_map) |
| 162 ConfigSafeLoader.construct_yaml_map) | 179 |
| 163 | 180 |
| 164 | 181 def config_loader_factory(strict=False): |
| 165 def load(stream, Loader=ConfigLoader): | 182 def _real_factory(*args, **kwds): |
| 183 kwds["strict"] = strict | |
| 184 return ConfigLoader(*args, **kwds) | |
| 185 return _real_factory | |
| 186 | |
| 187 | |
| 188 def config_safe_loader_factory(strict=False): | |
| 189 def _real_factory(*args, **kwds): | |
| 190 kwds["strict"] = strict | |
| 191 return ConfigSafeLoader(*args, **kwds) | |
| 192 return _real_factory | |
| 193 | |
| 194 | |
| 195 def load(stream, Loader=None, strict=False): | |
| 166 """Parse the given `stream` and return a Python object constructed | 196 """Parse the given `stream` and return a Python object constructed |
| 167 from for the first document in the stream. | 197 from for the first document in the stream. |
| 168 | 198 |
| 169 """ | 199 If `strict` is `True` then duplicate mapping keys within a YAML |
| 200 document are detected and prevented. If a `Loader` is given then | |
| 201 `strict` does not apply. | |
| 202 | |
| 203 """ | |
| 204 if Loader is None: | |
| 205 Loader = config_loader_factory(strict=strict) | |
| 170 data = yaml.load(stream, Loader) | 206 data = yaml.load(stream, Loader) |
| 171 # Map an empty document to an empty dict | 207 # Map an empty document to an empty dict |
| 172 if data is None: | 208 if data is None: |
| 173 return DictImpl() | 209 return DictImpl() |
| 174 if not isinstance(data, DictImpl): | 210 if not isinstance(data, DictImpl): |
| 175 raise TypeError("YAML root object must be a mapping") | 211 raise TypeError("YAML root object must be a mapping") |
| 176 return data | 212 return data |
| 177 | 213 |
| 178 | 214 |
| 179 def load_all(stream, Loader=ConfigLoader): | 215 def load_all(stream, Loader=None, strict=False): |
| 180 """Parse the given `stream` and return a sequence of Python objects | 216 """Parse the given `stream` and return a sequence of Python objects |
| 181 corresponding to the documents in the `stream`. | 217 corresponding to the documents in the `stream`. |
| 182 | 218 |
| 183 """ | 219 If `strict` is `True` then duplicate mapping keys within a YAML |
| 220 document are detected and prevented. If a `Loader` is given then | |
| 221 `strict` does not apply. | |
| 222 | |
| 223 """ | |
| 224 if Loader is None: | |
| 225 Loader = config_loader_factory(strict=strict) | |
| 184 data_all = yaml.load_all(stream, Loader) | 226 data_all = yaml.load_all(stream, Loader) |
| 185 rdata = [] | 227 rdata = [] |
| 186 for data in data_all: | 228 for data in data_all: |
| 187 if data is None: | 229 if data is None: |
| 188 rdata.append(DictImpl()) | 230 rdata.append(DictImpl()) |
| 191 raise TypeError("YAML root object must be a mapping") | 233 raise TypeError("YAML root object must be a mapping") |
| 192 rdata.append(data) | 234 rdata.append(data) |
| 193 return rdata | 235 return rdata |
| 194 | 236 |
| 195 | 237 |
| 196 def safe_load(stream): | 238 def safe_load(stream, strict=False): |
| 197 """Parse the given `stream` and return a Python object constructed | 239 """Parse the given `stream` and return a Python object constructed |
| 198 from for the first document in the stream. | 240 from for the first document in the stream. |
| 199 | 241 |
| 200 Recognizes only standard YAML tags and cannot construct an | 242 Recognizes only standard YAML tags and cannot construct an |
| 201 arbitrary Python object. | 243 arbitrary Python object. |
| 202 | 244 |
| 203 """ | 245 If `strict` is `True` then duplicate mapping keys within a YAML document |
| 204 data = yaml.load(stream, Loader=ConfigSafeLoader) | 246 are detected and prevented. |
| 247 | |
| 248 """ | |
| 249 data = yaml.load(stream, | |
| 250 Loader=config_safe_loader_factory(strict=strict)) | |
| 205 # Map an empty document to an empty dict | 251 # Map an empty document to an empty dict |
| 206 if data is None: | 252 if data is None: |
| 207 return DictImpl() | 253 return DictImpl() |
| 208 if not isinstance(data, DictImpl): | 254 if not isinstance(data, DictImpl): |
| 209 raise TypeError("YAML root object must be a mapping") | 255 raise TypeError("YAML root object must be a mapping") |
| 210 return data | 256 return data |
| 211 | 257 |
| 212 | 258 |
| 213 def safe_load_all(stream): | 259 def safe_load_all(stream, strict=False): |
| 214 """Return the list of all decoded YAML documents in the file `stream`. | 260 """Return the list of all decoded YAML documents in the file `stream`. |
| 215 | 261 |
| 216 Recognizes only standard YAML tags and cannot construct an | 262 Recognizes only standard YAML tags and cannot construct an |
| 217 arbitrary Python object. | 263 arbitrary Python object. |
| 218 | 264 |
| 219 """ | 265 If `strict` is `True` then duplicate mapping keys within a YAML document |
| 220 data_all = yaml.load_all(stream, Loader=ConfigSafeLoader) | 266 are detected and prevented. |
| 267 | |
| 268 """ | |
| 269 data_all = yaml.load_all(stream, | |
| 270 Loader=config_safe_loader_factory(strict=strict)) | |
| 221 rdata = [] | 271 rdata = [] |
| 222 for data in data_all: | 272 for data in data_all: |
| 223 if data is None: | 273 if data is None: |
| 224 rdata.append(DictImpl()) | 274 rdata.append(DictImpl()) |
| 225 else: | 275 else: |
