Source code for dockerreg.auth

import six
import abc
import os
import os.path
import errno
import time
import pickle
import dateutil
import dateutil.parser
import dateutil.tz
import base64
import json
import dockerreg.exceptions as ex
from dockerreg.log import LOG
import dockerreg.util
from dockerreg.util import bytes2str,str2bytes

@six.add_metaclass(abc.ABCMeta)
[docs]class Auth(object): @abc.abstractmethod
[docs] def get_auth(self,host,username=None,repository=None): """ Return a py:class:`AuthValue` encoded HTTP Authorization header value that matches the host, repository, and username arguments. If `None` is returned, and if username=None, our caller will attempt an anonymous login (i.e., no Authorization header sent to the token server). So, if a given `Auth` module does not want to support anonymous authentication attempts, it must throw a py:class:`dockerreg.exceptions.MissingCredentialsError` if username=None, rather than returning `None` in that case. :param host (str): the FQDN of the registry (possibly with a :port appended if desired). :param username (str): if the credential should be keyed on a username, filter results by that username. :param repository (str): if the credential should be keyed on a specific repository, filter by that repo name. :return (py:class:`AuthValue`): a py:class:`AuthValue` encoded HTTP Authorization header value that matches the host, repository, and username arguments. """
@six.add_metaclass(abc.ABCMeta)
[docs]class AuthorizationValue(object): """ An object representing the value for an HTTP Authorization header. """ @abc.abstractmethod
[docs] def scheme(self): """ :returns (str): the HTTP Authorization header scheme (i.e. Basic or Bearer). """
@abc.abstractmethod
[docs] def token(self): """ :returns: an encoded token value. """
@abc.abstractmethod
[docs] def expired(self,slop=0): """ :param slop (int): Number of seconds plus current time to declare the token expired. :returns (bool): True if token has expired or will expire prior to now + slop; False if not. """
[docs]class BearerToken(AuthorizationValue):
[docs] def __init__(self,token,realm,expires_in=0,issued_at="", username=None,fields={}): self._token = token if expires_in == 0: expires_in = 60 if issued_at == "": issued_at = int(time.time()) + dockerreg.util.utcoffset() self._issued_at = issued_at self._expires_in = expires_in self._username = username self._realm = realm self._fields = fields or {} self._ord_field_str = "" fkl = list(self._fields.keys()) fkl.sort() for k in fkl: if self._ord_field_str != "": self._ord_field_str += "," self._ord_field_str += k + "=" + self._fields[k] if isinstance(self._issued_at,six.string_types): iat = dateutil.parser.parse(self._issued_at) iat = iat.astimezone(dateutil.tz.tzlocal()) self._expirestamp = time.mktime(iat.timetuple()) + self._expires_in elif isinstance(self._issued_at,int): self._expirestamp = self._issued_at - dockerreg.util.utcoffset() \ + self.expires_in elif self._issued_at is None or self._expires_in is None: self._expirestamp = None else: raise ex.BearerTokenError( "issued_at (%s %s) must be an RFC3339 date string or a UTC UNIX" " timestamp" % (str(self._issued_at),type(self._issued_at)))
@classmethod
[docs] def from_json_http_response(kls,response_json, realm,username=None,fields={}): (t,exp,iat) = (None,None,None) if "token" in response_json: t = response_json["token"] elif "access_token" in response_json: t = response_json["access_token"] else: raise ex.BearerTokenError("no token nor access_token field in JSON") if "expires_in" in response_json: exp = response_json["expires_in"] if "issued_at" in response_json: iat = response_json["issued_at"] return BearerToken(t,realm,expires_in=exp,issued_at=iat, username=username,fields=fields)
@property def username(self): return self._username @property def realm(self): return self._realm @property def field_string(self): return self._ord_field_str
[docs] def scheme(self): return "Bearer"
[docs] def token(self): return self._token
[docs] def expired(self,slop=0): if self._expirestamp is None: return False if (time.time() + slop) > self._expirestamp: return True return False
def __repr__(self): expstr = "" if self._expirestamp: expstr = "," + str(self._expirestamp - int(time.time())) metastr = "," + "realm=" + self._realm if self._username: metastr += ",username=" + self._username if self._ord_field_str != "": metastr += "," + self._ord_field_str return "<%s%s%s>" % (self.__class__.__name__,expstr,metastr)
[docs]class BearerTokenCache(object): """ Caches Bearer tokens according to the fields in the original WWW-Authenticate response header (response being the HTTP response to an unauthenticated request). Ages out tokens when they are within a slop factor of expiration. """
[docs] def __init__(self,slop=0,filename="~/.dockerreg.token-cache"): self.slop = 0 self.cache = {} """A multi-level dict; first key is host/repository; second level is an ordered string of the fields from the WWW-Authenticate header that generated this token""" if filename: self.filename = os.path.expanduser(filename) else: self.filename = None try: f = open(self.filename,'rb') contents = f.read() f.close() if len(contents) > 0: self.cache = pickle.loads(contents) LOG.debug("cache: \n%s" % (self.format_dump(sep="\n",expired=True))) self.purge_expired() except IOError as e: if e.errno == errno.EACCES: LOG.warn("could not read %s: %s" % (self.filename,str(e)))
[docs] def save(self): if self.filename: try: f = open(self.filename,'wb') pickle.dump(self.cache,f) f.close() except Exception as e: LOG.warn("cache: cannot write to %s (%s), skipping!" % (str(e),self.filename))
[docs] def add(self,token): if not isinstance(token,dockerreg.auth.BearerToken): raise ex.IllegalArgumentError("must be a dockerreg.auth.BearerToken") if not token.realm: return False if token.expired(slop=self.slop): LOG.debug("would have added expired token %s" % (str(token))) return False token_key = token.realm if token.username is not None: token_key = str(token.username) + "@" + token_key if not token_key in self.cache: self.cache[token_key] = {} if token.field_string in self.cache[token_key]: t = self.cache[token_key][token.field_string] if not t.expired(slop=self.slop): LOG.debug("overwriting previous token %r" % (repr(token))) self.cache[token_key][token.field_string] = token LOG.debug("added %r" % (repr(token))) return True
[docs] def get(self,realm,username=None,fields={}): token_key = realm if username is not None: token_key = str(username) + "@" + token_key if not token_key in self.cache: return None if fields is None: fields = {} ord_field_str = "" fkl = sorted(fields.keys()) for k in fkl: if ord_field_str != "": ord_field_str += "," ord_field_str += k + "=" + fields[k] if not ord_field_str in self.cache[token_key]: return None t = self.cache[token_key][ord_field_str] if t.expired(slop=self.slop): del self.cache[token_key][ord_field_str] return None LOG.debug("found %r matching realm=%s,username=%s,fields %s" \ % (repr(t),realm,username,str(fields))) return t
[docs] def purge_expired(self): for (tk,v) in six.iteritems(self.cache): todelete = [] for fk in list(self.cache[tk].keys()): if self.cache[tk][fk].expired(slop=self.slop): todelete.append(fk) for x in todelete: del self.cache[tk][x]
[docs] def reset(self): self.cache = {}
[docs] def format_dump(self,sep="\n",expired=True): retval = "" for (tk,v) in six.iteritems(self.cache): for fk in list(self.cache[tk].keys()): t = self.cache[tk][fk] exp = t.expired(slop=self.slop) if not exp: if retval != "": retval += "," retval += repr(t) elif exp and expired: if retval != "": retval += "," retval += "EXP:" + repr(t)
[docs]class BasicAuthToken(AuthorizationValue):
[docs] def __init__(self,username,password): self.username = username self.password = password t = "%s:%s" % (username,password) self._token = bytes2str(base64.b64encode(str2bytes(t)))
@classmethod
[docs] def from_b64(kls,b64str): arr = bytes2str(base64.b64decode(str2bytes(b64str))).split(":") return BasicAuthToken(arr[0],arr[1])
[docs] def scheme(self): return "Basic"
[docs] def token(self): return self._token
[docs] def expired(self,slop=0): return False
def __repr__(self): return "<%s %s>" % (self.__class__.__name__,self.username)
[docs]class DockerConfigAuth(Auth): @classmethod
[docs] def configfile_exists(kls,configfile=None): if not configfile: configfile = os.getenv("HOME") + os.path.sep + ".docker" \ + os.path.sep + "config.json" return os.path.exists(configfile)
[docs] def __init__(self,configfile=None): if not configfile: configfile = os.getenv("HOME") + os.path.sep + ".docker" \ + os.path.sep + "config.json" if not os.path.exists(configfile): raise ex.AuthStorageError("%s does not exist!" % (configfile,)) self.configfile = configfile f = open(self.configfile,'r') contents = f.read() LOG.debug(contents) f.close() blob = json.loads(contents) if "auths" in blob: auths = blob["auths"] else: auths = {} self.db = {} for (registry,entry) in six.iteritems(auths): if "auth" in entry: bai = BasicAuthToken.from_b64(entry["auth"]) self.db[registry] = bai LOG.debug("docker auth entry %s: %s" % (registry,bai))
[docs] def get_auth(self,host,username=None,repository=None): LOG.debug("%r %r,%r" % (host,username,repository)) if not host in self.db: LOG.debug("no such host %s in db" % (host)) return None if username and self.db[host].username != username: LOG.debug("no such host %s and username %s in db" % (host,username)) return None return self.db[host]
[docs]class IndexedBasicAuth(Auth):
[docs] def __init__(self,db): self.db = db
@classmethod
[docs] def from_dict(kls,d): db = {} for (host,hent) in six.iteritems(d): if not host in db: db[host] = {} for (u,p) in six.iteritems(hent): db[host][u] = BasicAuthToken(u,p) return IndexedBasicAuth(db)
@classmethod
[docs] def from_user_pass(kls,host,username,password=None, skip_docker_config=False): if password is not None: return IndexedBasicAuth( { host : { username : BasicAuthToken(username,password) } }) if skip_docker_config: raise ex.MissingCredentialsError( "no password for username %s" % (username)) dca = DockerConfigAuth() authvalue = dca.get_auth(host,username=username) if not authvalue or not isinstance(authvalue,BasicAuthToken): raise ex.MissingCredentialsError( "no password for user %s on registry %s in %s" \ % (username,host,dca.configfile)) return IndexedBasicAuth({ host : { authvalue.username : authvalue } })
[docs] def get_auth(self,host,username=None,repository=None): LOG.debug("%r %r,%r" % (host,username,repository)) if not host in self.db: return None if username: if not username in self.db[host]: return None else: return self.db[host][username] elif len(list(self.db[host].values())) > 0: return list(self.db[host].values())[0] else: return None