Mercurial > hgrepos > Python > libs > ConfigMix
view configmix/extras/aws.py @ 284:4aaf74858d07
Some links to AWS docu into the aws moduleSome links to AWS docu into the aws moduleSome links to AWS docu into the aws moduleSome links to AWS docu into the aws moduleSome links to AWS docu into the aws moduleSome links to AWS docu into the aws moduleSome links to AWS docu into the aws moduleSome links to AWS docu into the aws module
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Mon, 07 Dec 2020 01:59:11 +0100 |
| parents | 503768f91a05 |
| children | eed16a1ec8f3 |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2015-2020, Franz Glasner. All rights reserved. # :License: 3-clause BSD. See LICENSE.txt for details. # :- """AWS namespace implementation. .. see:: - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html """ from __future__ import division, absolute_import, print_function __all__ = [] import requests import requests.exceptions import requests.adapters import urllib3 _MARKER = object() URL_META_INSTANCEID = "http://169.254.169.254/latest/meta-data/instance-id" URL_META_REGION = "http://169.254.169.254/latest/meta-data/placement/region" URL_META_AVAILABILITY_ZONE = "http://169.254.169.254/latest/meta-data/availability-zone" URL_DYN_INSTANCE_IDENTITY_DOC = "http://169.254.169.254/latest/dynamic/instance-identity/document" TIMEOUT = 2 # See https://gist.github.com/doublenns/7e3e4b72df4aaeccbeabf87ba767f44e RETRIES = urllib3.Retry(total=3, backoff_factor=0.3) _meta_instanceid = None _meta_region = None _meta_availability_zone = None _dyn_instance_identity_doc = None def _get_text_req(url): with requests.Session() as sess: try: a = requests.adapters.HTTPAdapter(max_retries=RETRIES) sess.mount("http://", a) resp = sess.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.text except requests.exceptions.RequestException: return _MARKER def _get_json_req(url): with requests.Session() as sess: try: a = requests.adapters.HTTPAdapter(max_retries=RETRIES) sess.mount("http://", a) resp = sess.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException: return _MARKER except ValueError: # JSON error return _MARKER def _get_meta_instanceid(): global _meta_instanceid if _meta_instanceid is None: _meta_instanceid = _get_text_req(URL_META_INSTANCEID) return _meta_instanceid def _get_meta_region(): global _meta_region if _meta_region is None: _meta_region = _get_text_req(URL_META_REGION) return _meta_region def _get_meta_avzone(): global _meta_availability_zone if _meta_availability_zone is None: _meta_availability_zone = _get_text_req(URL_META_AVAILABILITY_ZONE) return _meta_availability_zone def _get_dyn_instance_identity_doc(): global _dyn_instance_identity_doc if _dyn_instance_identity_doc is None: _dyn_instance_identity_doc = _get_json_req( URL_DYN_INSTANCE_IDENTITY_DOC) return _dyn_instance_identity_doc def _awslookup(name, default=_MARKER): if name == "metadata.instance-id": v = _get_meta_instanceid() elif name == "metadata.placement.region": v = _get_meta_region() elif name == "metadata.placement.availability-zone": v = _get_meta_avzone() elif name.startswith("dynamic.instance-identity."): idoc = _get_dyn_instance_identity_doc() if idoc is _MARKER: v = _MARKER else: v = idoc.get(name[26:], _MARKER) else: v = _MARKER if v is _MARKER: if default is _MARKER: raise KeyError("key %r not found in the AWS namespace" % name) else: return default return v
