Mercurial > hgrepos > Python > libs > ConfigMix
view configmix/extras/aws.py @ 282:da1596034954
Implemented an "AWS" namespace to retrieve some AWS-specific metadata
| author | Franz Glasner <fzglas.hg@dom66.de> |
|---|---|
| date | Mon, 07 Dec 2020 01:06:33 +0100 |
| parents | |
| children | 503768f91a05 |
line wrap: on
line source
# -*- coding: utf-8 -*- # :- # :Copyright: (c) 2015-2020, Franz Glasner. All rights reserved. # :License: 3-clause BSD. See LICENSE.txt for details. # :- """AWS namespace implementation """ from __future__ import division, absolute_import, print_function __all__ = [] import requests import requests.exceptions _MARKER = object() URL_META_INSTANCEID = "http://169.254.169.254/latest/meta-data/instance-id" URL_META_REGION = "http://169.254.169.254/latest/meta-data/placement/region" URL_META_AVAILABILITY_ZONE = "http://169.254.169.254/latest/meta-data/availability-zone" URL_DYN_INSTANCE_IDENTITY_DOC = "http://169.254.169.254/latest/dynamic/instance-identity/document" TIMEOUT = 10 _meta_instanceid = None _meta_region = None _meta_availability_zone = None _dyn_instance_identity_doc = None def _get_text_req(url): with requests.Session() as sess: try: resp = sess.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.text except requests.exceptions.RequestException: return _MARKER def _get_json_req(url): with requests.Session() as sess: try: resp = sess.get(url, timeout=TIMEOUT) resp.raise_for_status() return resp.json() except requests.exceptions.RequestException: return _MARKER except ValueError: # JSON error return _MARKER def _get_meta_instanceid(): global _meta_instanceid if _meta_instanceid is None: _meta_instanceid = _get_text_req(URL_META_INSTANCEID) return _meta_instanceid def _get_meta_region(): global _meta_region if _meta_region is None: _meta_region = _get_text_req(URL_META_REGION) return _meta_region def _get_meta_avzone(): global _meta_availability_zone if _meta_availability_zone is None: _meta_availability_zone = _get_text_req(URL_META_AVAILABILITY_ZONE) return _meta_availability_zone def _get_dyn_instance_identity_doc(): global _dyn_instance_identity_doc if _dyn_instance_identity_doc is None: _dyn_instance_identity_doc = _get_json_req( URL_DYN_INSTANCE_IDENTITY_DOC) return _dyn_instance_identity_doc def _awslookup(name, default=_MARKER): if name == "metadata.instance-id": v = _get_meta_instanceid() elif name == "metadata.placement.region": v = _get_meta_region() elif name == "metadata.placement.availability-zone": v = _get_meta_avzone() elif name.startswith("dynamic.instance-identity."): idoc = _get_dyn_instance_identity_doc() if idoc is _MARKER: v = _MARKER else: v = idoc.get(name[26:], _MARKER) else: v = _MARKER if v is _MARKER: if default is _MARKER: raise KeyError("key %r not found in the AWS namespace" % name) else: return default return v
