Mercurial > hgrepos > Python > libs > ConfigMix
view configmix/extras/aws.py @ 654:0d6673d06c2c
Add support for using "tomllib" (in Python's stdlib since 3.11) and "tomli" TOML packages.
They are preferred if they are found to be installed.
But note that the declared dependency for the "toml" extra nevertheless
is the "toml" package. Because it is available for all supported Python
versions.
So use Python 3.11+ or install "tomli" manually if you want to use the
alternate packages.
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Thu, 19 May 2022 22:10:59 +0200 |
| parents | f454889e41fa |
| children | a90273abc8a4 |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2015-2022, Franz Glasner. All rights reserved. # :License: BSD-3-Clause. See LICENSE.txt for details. # :- """AWS namespace implementation. .. see:: - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html """ from __future__ import division, absolute_import, print_function __all__ = [] import requests import requests.exceptions import requests.adapters import urllib3 _MARKER = object() URL_META_INSTANCEID = "http://169.254.169.254/latest/meta-data/instance-id" URL_META_REGION = "http://169.254.169.254/latest/meta-data/placement/region" URL_META_AVAILABILITY_ZONE = "http://169.254.169.254/latest/meta-data/availability-zone" URL_DYN_INSTANCE_IDENTITY_DOC = "http://169.254.169.254/latest/dynamic/instance-identity/document" TIMEOUT = 2 # See https://gist.github.com/doublenns/7e3e4b72df4aaeccbeabf87ba767f44e RETRIES = urllib3.Retry(total=3, backoff_factor=0.3) _meta_instanceid = None _meta_region = None _meta_availability_zone = None _dyn_instance_identity_doc = None def _get_text_req(url): with requests.Session() as sess: try: a = requests.adapters.HTTPAdapter(max_retries=RETRIES) sess.mount("http://", a) resp = sess.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.text except requests.exceptions.RequestException: return _MARKER def _get_json_req(url): with requests.Session() as sess: try: a = requests.adapters.HTTPAdapter(max_retries=RETRIES) sess.mount("http://", a) resp = sess.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException: return _MARKER except ValueError: # JSON error return _MARKER def _get_meta_instanceid(): global _meta_instanceid if _meta_instanceid is None: _meta_instanceid = _get_text_req(URL_META_INSTANCEID) return _meta_instanceid def _get_meta_region(): global _meta_region if _meta_region is None: _meta_region = _get_text_req(URL_META_REGION) return _meta_region def _get_meta_avzone(): global _meta_availability_zone if _meta_availability_zone is None: _meta_availability_zone = _get_text_req(URL_META_AVAILABILITY_ZONE) return _meta_availability_zone def _get_dyn_instance_identity_doc(): global _dyn_instance_identity_doc if _dyn_instance_identity_doc is None: _dyn_instance_identity_doc = _get_json_req( URL_DYN_INSTANCE_IDENTITY_DOC) return _dyn_instance_identity_doc def _awslookup(name, default=_MARKER): if name == "metadata.instance-id": v = _get_meta_instanceid() elif name == "metadata.placement.region": v = _get_meta_region() elif name == "metadata.placement.availability-zone": v = _get_meta_avzone() elif name.startswith("dynamic.instance-identity."): idoc = _get_dyn_instance_identity_doc() if idoc is _MARKER: v = _MARKER else: v = idoc.get(name[26:], _MARKER) else: v = _MARKER if v is _MARKER: if default is _MARKER: raise KeyError("key %r not found in the AWS namespace" % name) else: return default return v
