import six
import abc
import sys
import traceback
from requests import Request,Session,Response
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import PoolManager
from dockerreg.log import LOG
import dockerreg.exceptions as ex
from dockerreg.auth import BearerToken,BearerTokenCache
from dockerreg.util.applicable \
import Applicable, ApplicableClass, ApplicableMethod, ApplicableFormatter, \
DEFAULT_FORMATTER
@ApplicableFormatter(
kwargs=[dict(name='text',action='store_true'),
dict(name='json',action='store_true')])
def _requests_formatter(result,text=None,json=None):
"""
Format a value as a string or JSON object string.
:param text: display value as plaintext
:param json: display value as a JSON object
"""
if not isinstance(result,Response):
return DEFAULT_FORMATTER(result,text=text,json=json)
if json is True:
return result.json()
else:
return result.content
[docs]class SourceAddressAdapter(HTTPAdapter):
[docs] def __init__(self, source_address,**kwargs):
self.source_address = source_address
super(SourceAddressAdapter,self).__init__(**kwargs)
[docs] def init_poolmanager(self,connections,maxsize,block=False):
self.poolmanager = PoolManager(
num_pools=connections,maxsize=maxsize,block=block,
source_address=self.source_address)
#@ApplicableClass()
[docs]class BaseApiClient(object):
"""
A class providing access to the low-level Docker Registry API.
"""
[docs] def __init__(self,host,url_prefix=None,username=None,
auth=None,auth_url=None,cache=None,
verify=True,cert=None,key=None,version=None,
source_address=None,**kwargs):
"""
:param host (str): the FQDN of the registry host, possibly postfixed by `:port`.
:param url_prefix (str):
"""
if not version:
raise Exception("Unspecified version!")
self._proto = "https"
self._host = host
self._source_address = source_address
self._url_prefix = url_prefix
self._auth = auth
self._auth_url = auth_url
self._verify = verify
self._cert = cert
self._key = key
self._api_version = version
self._username = username
self._cache = cache
self._realm = None
self._service = None
self._session = Session()
if self._source_address:
self._session.mount('http://',SourceAddressAdapter(self._source_address))
self._session.mount('https://',SourceAddressAdapter(self._source_address))
if self._url_prefix:
self._base_url = self._url_prefix
if not self._base_url.startswith("/"):
self._base_url = "/" + self._base_url
if not self._base_url.endswith("/"):
self._base_url += "/"
else:
self._base_url = "/"
self._api_url = self._base_url + "v" + str(self._api_version)
@property
def host(self):
return self._host
@property
def version(self):
return self._api_version
@property
def baseurl(self):
return self._proto + "://" + self._host + self._base_url
@property
def apiurl(self):
return self._proto + "://" + self._host + self._api_url
@abc.abstractmethod
[docs] def registry(self):
"""
:returns: a :class:`dockerreg.models.BaseRegistry` corresponding to this :class:`dockerreg.api.BaseApiClient`.
"""
def __handle_http_error(self,resp):
LOG.debug("resp %r" % (repr(resp)))
if resp.status_code in [ 200,201,202 ]:
return
error_list = []
try:
error_list = resp.json()["errors"]
except:
pass
if resp.status_code == 401:
raise ex.AuthorizationRequiredError(errors=error_list)
raise ex.DockerRegistryAPIError(resp.status_code,errors=error_list)
def _handle_www_auth_basic(self,hdr):
authvalue = self._auth.get_auth(self._host,username=self._username)
if not authvalue:
raise ex.MissingCredentialsError(self._host)
return authvalue
def _parse_www_auth_bearer(self,hdr,required=[]):
"""
:returns: a (realm,fields) tuple for a Bearer WWW-Authenticate header. Throws a :class:`dockerreg.exceptions.BearerRedirectError` if there is no 'realm' field.
:param hdr (str): the HTTP WWW-Authenticate header value.
:param required (list): a list of strings specifying fields that must be present in the hdr; if any are not present, a :class:`dockerreg.exceptions.BearerRedirectError` will be thrown.
"""
if not required:
required = []
realm = None
fields = {}
bkeys = hdr[len("Bearer"):].split(",")
nbkeys = []
curkey = ''
for i in range(0,len(bkeys)):
if bkeys[i][-1] == "'" or bkeys[i][-1] == '"':
nbkeys.append(curkey+bkeys[i])
curkey = ''
continue
if bkeys[i][-1] != "'" and bkeys[i][-1] != '"':
curkey += bkeys[i] + ","
for key in nbkeys:
key = key.lstrip().rstrip()
arr = key.split("=",1)
if len(arr) != 2:
continue
key = arr[0]
val = arr[1]
if val[0] == '"' and val[-1] == '"':
val = val[1:-1]
elif val[0] == "'" and val[-1] == "'":
val = val[1:-1]
if key in ["Realm","realm"]:
realm = val
else:
fields[key] = val
if realm is None:
raise ex.BearerRedirectError(
"missing Bearer 'realm' field in"
" Www-authenticate header '%s'" % (hdr))
for f in required:
if not f in fields:
raise ex.BearerRedirectError(
"missing Bearer '%s' field in"
" Www-authenticate header '%s'" % (f,hdr))
LOG.debug("hdr = %s :: (%s %s)" % (hdr,realm,fields))
if not self._realm:
self._realm = realm
if not self._service and 'service' in fields:
self._service = fields['service']
return (realm,fields)
def _handle_www_auth_bearer(self,hdr):
(realm,fields) = self._parse_www_auth_bearer(hdr)
if self._cache:
av = self._cache.get(realm,username=self._username,fields=fields)
if av:
LOG.debug("using cached %r" % (repr(av)))
return av
av = self._auth.get_auth(self._host,username=self._username)
if av:
headers = { 'authorization':"%s %s" % (av.scheme(),av.token()) }
else:
headers = {}
req = Request("get",realm,headers=headers,params=fields)
LOG.debug("%s %s: headers=%s params=%s" % ('GET',realm,str(headers),str(fields)))
resp = self._send(req)
#requests.get(realm,headers=headers,params=fields)
self.__handle_http_error(resp)
t = BearerToken.from_json_http_response(
resp.json(),realm,username=self._username,fields=fields)
if self._cache:
self._cache.add(t)
if not self._realm:
self._realm = realm
if not self._service and 'service' in fields:
self._service = fields['service']
return t
def _handle_www_auth(self,hdr):
"""
:returns: a :class:`dockerreg.auth.AuthorizationValue` to fill the HTTP Authorization header value.
:param hdr (str): The value of the WWW-Authenticate header.
"""
LOG.debug(hdr)
if hdr.startswith("Basic"):
return self._handle_www_auth_basic(hdr)
elif hdr.startswith("Bearer"):
return self._handle_www_auth_bearer(hdr)
else:
raise ex.UnsupportedAuthorizationTypeError(hdr)
def _send(self,req,stream=False,timeout=None):
req = self._session.prepare_request(req)
if self._key:
cert = (self._cert,self._key)
else:
cert = self._cert
return self._session.send(req,verify=self._verify,cert=cert,
stream=stream,timeout=timeout)
def _request(self,method,suburl,headers={},params={},data=None,
ignoreauth=False,url_raw=False,stream=False,timeout=None):
if url_raw:
url = suburl
else:
url = self.apiurl + suburl
LOG.debug("%s %s: headers=%s" % (method.upper(),url,str(headers)))
req = Request(method,url,data=data,headers=headers,params=params)
resp = self._send(req,stream=stream,timeout=timeout)
if not ignoreauth and resp.status_code == 401:
if "www-authenticate" in resp.headers \
and not "authorization" in headers:
authvalue = self._handle_www_auth(
resp.headers['www-authenticate'])
authstr = "%s %s" % (authvalue.scheme(),authvalue.token())
nh = dict(**headers)
if 'authorization' in nh:
del nh['authorization']
nh['authorization'] = authstr
kwargs = { 'headers':nh }
authreq = Request(method,url,data=data,params=params,**kwargs)
authresp = self._send(authreq,stream=stream,timeout=timeout)
self.__handle_http_error(authresp)
LOG.debug("authenticated response %s: headers=%s" % (url,str(authresp.headers)))
return authresp
else:
self.__handle_http_error(resp)
raise ex.AuthorizationRequiredError() #resp.status_code,resp.json()["errors"])
self.__handle_http_error(resp)
return resp
[docs] def get_cached_auth(self,fields):
if not self._realm or not self._username or not self._cache:
return None
local_fields = dict(fields)
if self._service:
local_fields['service'] = self._service
return self._cache.get(self._realm,self._username,local_fields)
[docs] def get(self,suburl,**kwargs):
return self._request("get",suburl,**kwargs)
[docs] def head(self,suburl,**kwargs):
return self._request("head",suburl,**kwargs)
[docs] def put(self,suburl,**kwargs):
return self._request("put",suburl,**kwargs)
[docs] def patch(self,suburl,**kwargs):
return self._request("patch",suburl,**kwargs)
[docs] def post(self,suburl,**kwargs):
return self._request("post",suburl,**kwargs)
[docs] def delete(self,suburl,**kwargs):
return self._request("delete",suburl,**kwargs)
@ApplicableMethod()
[docs] def ping(self):
resp = self.get("",ignoreauth=True)
LOG.debug("ping resp headers: %r" % (repr(resp.headers)))
body = resp.json()
LOG.debug("ping resp body: %r" % (repr(body)))
if resp.status_code in [200,401]:
# and resp.headers['Docker-Distribution-Api-Version'] == "registry/2.0":
return True
else:
return False
@ApplicableMethod()
[docs] def authping(self):
resp = self.get("")
LOG.debug("ping resp headers: %r" % (repr(resp.headers)))
body = resp.json()
LOG.debug("ping resp body: %r" % (repr(body)))
if resp.status_code in [200,401]:
# and resp.headers['Docker-Distribution-Api-Version'] == "registry/2.0":
return True
else:
return False