Source code for dockerreg.models.v2

import six
import abc
import sys
import re
import hashlib
import os
import json
import jose
from jose.jwk import ECKey
import ecdsa
import base64

from dockerreg.log import LOG
import dockerreg.exceptions as ex
from dockerreg.util import (
    getgoarch, getgoos,
    joseb64urldecode, joseb64urlencode, num_joseb64urlencode)
from dockerreg.models import (
    Registry, Repository, Image, Model, Manifest, SignableMixin )
from dockerreg.models.v1 import ManifestV1

[docs]class RegistryV2(Registry):
[docs] def __init__(self,api,name=None): if api.version != 2: raise ex.ModelApiVersionMismatch("v2 registry, not v2 api") super(RegistryV2,self).__init__(api,name=name) self._db = None
def _get_repositories(self,refresh=False): if self._db is not None and not refresh: return self._db resp = self.api.get_repositories() if not "repositories" in resp: raise DockerRegistryError( "API error: /v2/get_repositories: expected 'repositories' list") self._db = {} repos = resp["repositories"] for repo in repos: idx = repo.rfind("/") if idx < 0: namespace = "" repo = repo else: namespace = repo[:idx] repo = repo[(idx+1):] if not namespace in self._db: self._db[namespace] = [] self._db[namespace].append(repo) return self._db
[docs] def get_namespaces(self,regexp=None): """ :returns: a :py:class:`list` of namespaces in this registry. :rtype: :py:class:`list` """ if not regexp: return list(self._get_repositories().keys()) retval = [] sre = re.compile(regexp) for namespace in list(self._get_repositories().keys()): if sre.search(namespace) is not None: retval.append(namespace) return retval
[docs] def get_repositories(self,regexp=None,namespace_regexp=None): """ :returns: a :py:class:`dict` of namespace-to-repository-lists in this registry. :rtype: :py:class:`dict` """ if not regexp and not namespace_regexp: return self._get_repositories() retval = {} (sre,nsre) = (None,None) if regexp: sre = re.compile(regexp) if namespace_regexp: nsre = re.compile(namespace_regexp) db = self._get_repositories() for namespace in list(db.keys()): if nsre and nsre.search(namespace) is None: continue if not sre: retval[namespace] = db[namespace] continue retval[namespace] = [] for repo in db[namespace]: if sre.search(repo) is not None: retval[namespace].append(repo) return retval
[docs] def get_tags(self,repository,metadata=False): r = RepositoryV2(self.api,repository,self) return r.get_tags(metadata=metadata)
[docs] def get_reference_metadata(self,repository,reference): r = RepositoryV2(self.api,repository,self) return r.get_reference_metadata(reference)
[docs] def get_image_metadata(self,repository,tag,arch=getgoarch,oss=getgoos): """ We return a dict of the reference metadata (manifest digest, length, repository name, reference/tag), and then of the image (total size, arch, layers (digest, size), """ r = RepositoryV2(self.api,repository,self) rm = r.get_reference_metadata(tag) retval = dict(rm) i = r.get_image(tag,arch=arch,oss=oss) retval["size"] = i.size() m = i.manifest retval["arch"] = m.architecture total_size = 0 cnt = 0 for lid in m.layers: retval["layer[%d]" % (cnt,)] = \ "%s, %s" % (lid,str(i.layer_size(lid))) cnt += 1 return retval
[docs] def check_image(self,repository,tag,arch=getgoarch,oss=getgoos): r = RepositoryV2(self.api,repository,self) i = r.get_image(tag,arch=arch,oss=oss) if not i: return -1 return i.validate(refresh=True)
[docs] def pull_image(self,repository,tag,arch=getgoarch,oss=getgoos, filebasename=None,refresh=False): """ Pulls the image descriptor and all layers, returning an :py:class:`Image`representing the downloaded image. If the `filebasename` parameter is set, the raw data will be cached in files instead of in memory. :param str repository: the full name of the image, including the namespace and repository. :param str tag: the tag indicating the version of the image. :param str arch: a GOARCH architecture string (i.e. amd64); defaults to the platform this library is running on; see https://golang.org/doc/install/source#environment for valid values. :param str oss: a GOOS operating system string (i.e. linux); defaults to the platform this library is running on; see https://golang.org/doc/install/source#environment for valid values. :param str filebasename: if set, the image manifest and layers will be cached in files named `filebasename`.manifest and `filebasename`.<layer_id>. If the image already exists on disk via `filebasename`, it will be loaded from disk. :param bool refresh: if set `False` (the default), and if the image exists on disk via `filebasename`, the image will *not* be refreshed (i.e., will not be re-pulled from the registry; only layers that are not cached in files will be pulled; the manifest will *not* be refreshed). :returns: a :py:class:`Image`. """ r = RepositoryV2(self.api,repository,self) i = r.get_image(tag,arch=arch,oss=oss,filebasename=filebasename) if not i: return None i.pull(refresh=refresh) LOG.debug("pulled image %r" % (repr(i))) return i
[docs] def get_image_size(self,repository,tag,arch=getgoarch,oss=getgoos, filebasename=None): r = RepositoryV2(self.api,repository,self) i = r.get_image(tag,arch=arch,oss=oss,filebasename=filebasename) if not i: return None return i.size()
[docs] def get_image_created(self,repository,tag,arch=getgoarch,oss=getgoos, filebasename=None): r = RepositoryV2(self.api,repository,self) i = r.get_image(tag,arch=arch,oss=oss,filebasename=filebasename) if not i: return None return i.manifest.created
[docs] def pull_and_push_image(self,repository,tag,arch=getgoarch,oss=getgoos, filebasename=None,refresh=False, dst_registry=None,dst_repository=None,dst_tag=None, dst_username=None,dst_password=None, dst_no_verify=None,dst_cert=None,dst_ca_bundle=None, dst_skip_docker_config=False,dst_no_cache=False, dst_cache_file=None): """ Pulls the repository:tag image from this registry, and pushes it to dstregistry/dstrepository:dsttag. If all dst* parameters are None, an IllegalArgumentError will be raised. If any of the dst* parameters are unspecified, their values will be taken from the current values of the specified image. This allows you to simply re-tag an image; copy it to another repository; or copy it to another repository. """ if not dst_repository: dst_repository = repository if not dst_tag: dst_tag = tag if dst_registry and dstregistry != self.name: verify_val = True if dst_no_verify: verify_val = False elif dst_ca_bundle is not None: verify_val = dst_ca_bundle if not dst_username and not dst_skip_docker_config: auth = DockerConfigAuth() else: auth = IndexedBasicAuth.from_user_pass( dst_registry,dst_username,password=dst_password, skip_docker_config=dst_skip_docker_config) cache = None if not dst_no_cache: if dst_cache_file: cache = BearerTokenCache(filename=dst_cache_file) else: cache = BearerTokenCache() napi = ApiClient( dstregistry,2,username=dst_username,auth=auth, verify=verify_val,cert=dst_cert,cache=cache) reg = napi.registry() dst_repo = RepositoryV2(napi,dst_repository,nreg) else: dst_repo = RepositoryV2(self.api,repository,self) # Ok, now that we've processed our input args, let's do the real work. i = self.pull_image(repository,tag,arch=arch,oss=oss, filebasename=filebasename,refresh=refresh) ni = i.copy_to(dst_repo,dst_tag,lazy=False) ni.push() return
[docs] def tag_image(self,repository,tag,newtag,arch=getgoarch,oss=getgoos): r = RepositoryV2(self.api,repository,self) i = ImageV2(self.api,tag,r) ni = i.copy_to(tag=newtag) ni.push() return
[docs] def delete_tag(self,repository,tag,arch=getgoarch,oss=getgoos): r = RepositoryV2(self.api,repository,self) # We cannot directly delete a tag, so instead, we push a new # empty manifest to that tag, then delete that manifest. empty = ManifestV21(self.api,repository=r,tag=tag) empty.push() empty.delete() return None
[docs] def push_image(self,repository,tag,filebasename=None): r = RepositoryV2(self.api,repository,self) i = ImageV2(self.api,tag,r,filebasename=filebasename) return i.push()
[docs] def delete_image(self,repository,tag,layers=False, arch=getgoarch,oss=getgoos): r = RepositoryV2(self.api,repository,self) i = r.get_image(tag,arch=arch,oss=oss) i.delete(layers=layers) return None
[docs]class RepositoryV2(Repository):
[docs] def __init__(self,api,repository,registry): super(RepositoryV2,self).__init__(api,repository,registry) self._tags = None self._tag_metadata = None
[docs] def get_tags(self,metadata=False): """ :returns: a :py:class:`list` of tags in this repository. """ if not self._tags: self._tags = self.api.get_tags(self.name)["tags"] if metadata: if not self._tag_metadata: self._tag_metadata = [] for tag in self._tags: md = self.api.get_reference_metadata(self.name,tag) self._tag_metadata.append(md) return self._tag_metadata else: return self._tags return self._tags
[docs] def get_reference_metadata(self,reference): if self._tag_metadata and reference in self._tag_metadata: return self._tag_metadata[reference] else: return self.api.get_reference_metadata(self.name,reference)
[docs] def refresh(self): self._tags = None self._tag_metadata = None self.get_tags()
[docs] def get_image(self,tag,arch=getgoarch,oss=getgoos,filebasename=None): """ Returns an :py:class:`Image`representing the requested image. If the `filebasename` parameter is set, any downloaded raw data will be cached in files instead of in memory. :param str tag: the tag indicating the version of the image. :param str filebasename: if set, the image manifest and layers will be cached in files named `filebasename`.manifest and `filebasename`.<layer_id>. :param str arch: a GOARCH architecture string (i.e. amd64); defaults to the platform this library is running on; see https://golang.org/doc/install/source#environment for valid values. :param str oss: a GOOS operating system string (i.e. linux); defaults to the platform this library is running on; see https://golang.org/doc/install/source#environment for valid values. :returns: a :py:class:`Image`. """ return ImageV2(self.api,tag,self,arch=arch,oss=oss, filebasename=filebasename)
[docs]class ImageV2(Image):
[docs] def __init__(self,api,tag,repository,id=None,filebasename=None, arch=getgoarch,oss=getgoos): super(ImageV2,self).__init__(api,tag,repository,id=id) self._filebasename = filebasename self._arch = arch self._os = oss self._manifest = None self._manifest_file = None if self._filebasename is not None: self._manifest_file = "%s.%s" % (self._filebasename,"manifest") if os.path.exists(self._manifest_file): f = open(self._manifest_file,'r') raw = f.read() self._manifest = create_v2_manifest( self.api,json.loads(raw),raw,repository=self.repository, tag=self.tag) self._layer_blobs = {} self._layer_files = {}
@property def id(self): return self.manifest.digest @property def filebasename(self): return self._filebasename @property def manifest(self): if self._manifest is not None: return self._manifest self.__update() return self._manifest def __delete_stale_layers(self): ll = self.manifest.layers for lid in list(self._layer_blobs.keys()): if not lid in ll: del self._layer_blobs[lid] for lid in list(self._layer_files.keys()): if not lid in ll: del self._layer_files[lid] # This returns True or False according to refresh(). However, we # also use it to implement the manifest property (which ignores the # return value). def __update(self): resp = self.api.get_manifest(self.repository.name,self.tag,raw=True) id = resp.headers.get("Docker-Content-Digest",None) ctype = resp.headers.get("Content-type",None) m = create_v2_manifest( self.api,resp.json(),resp.content,repository=self.repository, tag=self.tag,id=id,content_type=ctype) m.verify() # If we have one, we're calling this from refresh(); so return False if self._manifest is not None and m == self._manifest: return False self._manifest = m if self._manifest_file is not None: LOG.debug("writing %s" % (self._manifest_file)) mf = open(self._manifest_file,'w') mf.write(self.manifest.raw) mf.close() self.__delete_stale_layers() return True
[docs] def refresh(self): return self.__update()
[docs] def size(self): ret = 0 m = self.manifest for layer_id in m.layers: ret += self.layer_size(layer_id) return ret
[docs] def layer_size(self,layer_id): return self.api.get_blob_size(self.repository.name,layer_id)
[docs] def validate(self,refresh=False): if refresh: self.refresh() m = self.manifest retval = 0 for layer_id in m.layers: retval += self.check_layer(layer_id) return retval
[docs] def pull(self,refresh=False): m = self.manifest for layer_id in m.layers: self.get_layer_content(layer_id) return
[docs] def push(self): m = self.manifest for layer_id in m.layers: # See if the layer is already uploaded try: self.api.get_blob_size(self.repository.name,layer_id) LOG.debug("layer %s already exists in %s" % (layer_id,self.repository)) except: LOG.debug("layer %s not in %s; uploading" % (layer_id,self.repository)) self.api.post_blob(self.repository.name,layer_id, self.get_layer_content(layer_id)) return self.api.put_manifest( self.repository.name,self.tag,m.raw,media_type=m.media_type)
[docs] def delete(self,layers=False): if layers is True: m = self.manifest for layer_id in m.layers: # See if the layer is already uploaded try: self.api.get_blob_size(self.repository.name,layer_id) LOG.debug("layer %s already exists in %s" % (layer_id,self.repository)) self.api.delete_blob(self.repository.name,layer_id) except: LOG.debug("layer %s not in %s; ignoring" % (layer_id,self.repository)) #self.api.delete_manifest(self.repository.name,self.manifest.digest, # media_type=self.manifest.media_type) self.api.delete_manifest_by_tag(self.repository.name,self.tag) self._deleted = True return None
[docs] def set_manifest(self,manifest): self._manifest = manifest if self.filebasename: self._manifest_file = "%s.%s" % (self.filebasename,"manifest") else: self._manifest_file = None if self._manifest_file: f = open(self._manifest_file,'w') f.write(self.manifest.raw) f.close() self.__delete_stale_layers()
[docs] def check_layer(self,layer_id): try: self.api.get_blob_size(self.repository.name,layer_id) LOG.debug("layer %s exists in %s" % (layer_id,self.repository)) return 0 except: LOG.debug("layer %s not in %s; ignoring" % (layer_id,self.repository)) return 1
[docs] def get_layer_content(self,layer_id): [alg,digest] = layer_id.split(":") f = "%s.%s" % (self.filebasename,layer_id) if f and os.path.exists(f): fd = open(f,'rb') blob = fd.read() fd.close() fdig = hashlib.new(alg,blob).hexdigest() if fdig != digest: raise ex.LayerContentMismatch( "layer %s digest does not match content in %s" % (digest,f)) LOG.debug("%s already contains cached layer %s" % (f,layer_id)) return blob elif layer_id in self._layer_blobs: LOG.debug("already cached layer %s in memory" % (layer_id)) return self._layer_blobs[layer_id] # Ok, we have to download the blob. blob = self.api.get_blob(self.repository.name,layer_id) # Then either save in memory or to file. self.set_layer_content(layer_id,blob) return blob
[docs] def set_layer_content(self,layer_id,blob): [alg,digest] = layer_id.split(":") if self.filebasename: f = "%s.%s" % (self.filebasename,layer_id) else: f = None if f: f = open(f,'wb') f.write(blob) f.close() self._layer_files[digest] = f else: self._layer_blobs[digest] = blob
[docs] def copy_to(self,repository=None,tag=None,filebasename=None, lazy=True): """ Copies this image, possibly to a new repository (possibly registry) and/or tag. At least one of these values must be set; otherwise an IllegalArgumentError will be generated. Moreover, they must not be set to the same values as the image has; this will also result in an IllegalArgumentError. :param :py:class:`dockerreg.models.Repository` repository: a :py:class:`dockerreg.models.Repository` object, specifying a new repository (and possibly a different :py:class:`Registry`). :param str tag: a new tag to use for this image. """ if not repository and not tag: raise ex.IllegalArgumentError("must specify one of repository and tag") if not repository: repository = self.repository if not tag: tag = self.tag if tag == self.tag and repository == self.repository: raise ex.IllegalArgumentError( "repository and tag (%s,%s) are the same as image (%s,%s)" % (repository,tag,self.repository,self.tag)) ni = ImageV2(self.api,tag,repository,filebasename=filebasename, arch=self._arch,oss=self._os) nm = self.manifest.copy_to(repository,tag) ni.set_manifest(nm) if not lazy: for layer_id in self.manifest.layers: ni.set_layer_content(layer_id,self.get_layer_content(layer_id)) return ni
def __repr__(self): extra = " " if self.id: extra += "id=" + self.id if self.filebasename: if extra: extra += "," extra += "backingfile=" + self._manifest_file return "<%s %s/%s:%s%s>" % ( self.__class__.__name__,self.repository.registry.name, self.repository.name,self.name,extra) pass
[docs]class ManifestV1Compat(ManifestV1): pass
[docs]class ManifestV21(Manifest,SignableMixin): _media_types = [ "application/vnd.docker.distribution.manifest.v1+json", "application/vnd.docker.distribution.manifest.v1+prettyjws" ]
[docs] def __init__(self,api,blob={},raw=None,media_type=None, repository=None,tag=None,id=None): blob = dict(blob) need_resign = False if repository \ and (not blob.get("name") or repository.name != blob.get("name")): blob["name"] = repository.name need_resign = True if tag and (not blob.get("tag") or tag != blob.get("tag")): blob["tag"] = tag need_resign = True if not media_type: media_type = ManifestV21._media_types[0] if not blob.get("schemaVersion"): blob["schemaVersion"] = 1 elif blob.get("schemaVersion") != 1: raise ex.MalformedManifestError("invalid schemaVersion; should be 1") if not blob.get("fsLayers"): blob["fsLayers"] = [] need_resign = True super(ManifestV21,self).__init__( api,blob,raw,media_type=media_type,repository=repository,tag=tag, id=id) self._version = 2 self._history = [] for x in self._attrs.get("history",[]): if not "v1Compatibility" in x: raise ex.MalformedManifestError( "unrecognized element in v1compat history list (%s)" % (str(list(x.keys())))) cb = x["v1Compatibility"] m1c = ManifestV1Compat( self.api,json.loads(cb),cb, repository=self.repository,tag=self.tag,id=self.id) self._history.append(m1c) if need_resign: self.sign() self._id = None LOG.debug("new manifest: %s" % (self.raw))
@property def raw(self): if not self._raw: self._raw = json.dumps( dict(self._attrs),indent=3,separators=(', ',': ')) return self._raw def __digest(self): attrsNoSig = dict(self._attrs) if "signatures" in attrsNoSig: del attrsNoSig["signatures"] blob = json.dumps( dict(attrsNoSig),indent=3,separators=(', ',': ')) return "sha256:" + hashlib.sha256(blob).hexdigest() @property def id(self): if not self._id: self._id = self.__digest() return self._id @property def digest(self): return self.id @property def schema_version(self): return self._attrs.get("schemaVersion") @property def architecture(self): return self._attrs.get("architecture") @property def layers(self): return [ x["blobSum"] for x in self._attrs["fsLayers"] ] @property def history(self): return self._history @property def created(self): history = self.history if history and len(history) > 0: return history[0].created return None @property def signatures(self): return self._attrs.get("signatures")
[docs] def refresh(self): resp = self.api.get_manifest(self.repository,self.tag,raw=True) self._id = None if "Docker-Content-Digest" in resp.headers: self._id = resp.headers["Docker-Content-Digest"] self._attrs = resp.json() self._history = None if "history" in self._attrs: self._history = [ ManifestV1Compat(self.api,x,json.dumps(x)) for x in self._attrs["history"] ]
[docs] def verify(self): manifest_json = self._attrs raw = self.raw if not "signatures" in manifest_json \ or len(manifest_json["signatures"]) == 0: return None for sig in manifest_json["signatures"]: protected = joseb64urldecode(sig["protected"]) protected_json = json.loads(protected.decode('utf-8')) signature = joseb64urldecode(sig["signature"]) orig_manifest = raw[:protected_json["formatLength"]] \ + joseb64urldecode(protected_json["formatTail"]) if not "jwk" in sig["header"]: raise Exception("only support jwk private key signatures") key = ECKey(sig["header"]["jwk"],sig["header"]["alg"]) encm = joseb64urlencode(orig_manifest).decode('utf-8') vsig = sig["protected"] + "." + encm ret = key.verify(vsig.encode('utf-8'),signature) if not ret: LOG.debug("signature %s (%s) not verified!" % (sig["signature"],protected_json.get("time",None))) return False else: LOG.debug("signature %s (%s) verified" % (sig["signature"],protected_json.get("time",None))) return True
[docs] def sign(self): newblob = dict(self._attrs) if "signatures" in newblob: del newblob["signatures"] ekey = ecdsa.SigningKey.generate(curve=ecdsa.NIST256p) #vkey = ekey.get_verifying_key() #vkey_hash = hashlib.new('sha256',vkey.to_der()).hexdigest() #vkey_b32 = base64.b32encode(vkey_hash[:30]).rstrip('=') #kid = "" #for i in range(0,len(vkey_b32)/4): # kid += vkey_b32[i*4:i*4+4] + ":" #kid = kid.rstrip(":") keyd = dict(#kid=kid kty="EC",crv="P-256", x=num_joseb64urlencode(ekey.privkey.public_key.point.x()).decode('utf-8'), y=num_joseb64urlencode(ekey.privkey.public_key.point.y()).decode('utf-8')) header = dict(alg="ES256",jwk=keyd) skey = ECKey(ekey,header["alg"]) newraw = json.dumps(newblob,indent=3,separators=(', ',': ')) newraw_enc = joseb64urlencode(newraw.encode('utf-8')).decode('utf-8') format_length = newraw.rfind("}") format_tail = newraw[format_length:] format_tail_enc = joseb64urlencode(format_tail.encode('utf-8')).decode('utf-8') protected_json = dict(formatLength=format_length, formatTail=format_tail_enc) protected = json.dumps(protected_json,indent=None,separators=(',',':')) protected_enc = joseb64urlencode(protected.encode('utf-8')).decode('utf-8') signed_thing = "%s.%s" % (protected_enc,newraw_enc) signature = skey.sign(signed_thing.encode('utf-8')) signature_enc = joseb64urlencode(signature).decode('utf-8') LOG.info("signature = %s (%s" % (signature_enc,signed_thing)) signatures = [ dict(header=header,signature=signature_enc, protected=protected_enc) ] newsignedraw = "%s,\n \"signatures\": %s\n%s" % ( newraw[:format_length], json.dumps(signatures,indent=3,separators=(', ',': ')), format_tail) # Move the new JSON blob and (signed) raw manifest into place: self._attrs = newblob self._raw = newsignedraw return
[docs] def copy_to(self,repository,tag): nm = ManifestV21( self.api,self._attrs,self.raw,media_type=self.media_type, repository=self.repository,tag=self.tag,id=self.id) nm.modify(repository,tag) return nm
[docs] def modify(self,repository=None,tag=None): if repository: self._repository = repository if tag: self._tag = tag self._id = None self.sign() LOG.debug("modified manifest:\n%s" % (self.raw))
[docs] def push(self): if not self.signatures: self.sign() self._id = None LOG.debug("pushing (to %r) manifest = %r" % (repr(self.tag or self.id),repr(self.raw))) return self.api.put_manifest( self.repository.name,self.tag or self.id,self.raw, media_type=self.media_type)
[docs] def delete(self): if not self.signatures: self.sign() self._id = None LOG.debug("deleting manifest %r" % (repr(self.id))) return self.api.delete_manifest(self.repository.name,self.id)
def __repr__(self): return "<%s %s,%s,nlayers=%d>" % ( self.__class__.__name__,self.name,self.architecture, len(self.layers))
@six.add_metaclass(abc.ABCMeta)
[docs]class ManifestList(Model):
[docs] def __init__(self,api,blob={},repository=None): self.api = api self._repository = repository self._attrs = blob self._manifest = None
@abc.abstractmethod
[docs] def get_platform_manifest(self,arch=getgoarch,oss=getgoos): pass
[docs]class ManifestV22(Manifest): _pointed_media_types = [ "application/vnd.docker.distribution.manifest.v2+json", "application/vnd.docker.distribution.manifest.v1+json" ] _media_types = [ "application/vnd.docker.image.manifest.v2+json", "application/vnd.docker.image.manifest.v1+json" ]
[docs] def __init__(self,api,blob={},raw=None,media_type=None,repository=None,tag=None,id=None, manifest_list=None): media_type = blob["media_type"] id = blob["digest"] super(ManifestV22,self).__init__( api,blob,media_type=media_type, repository=repository,tag=tag,id=did) self._version = 2 self._manifest_list = manifest_list self._manifest_attrs = None self._config_attrs = None
@property def schema_version(self): return self._attrs["schemaVersion"] @property def manifest_list(self): return self._manifest_list @property def media_type(self): return self._attrs["mediaType"] @property def repository(self): return self._repository @property def size(self): return self._attrs["size"] @property def digest(self): return self._attrs["digest"] @property def architecture(self): return self._attrs["platform"]["architecture"] @property def os(self): return self._attrs["platform"].get("os",None) @property def os_version(self): return self._attrs["platform"].get("os.version",None) @property def os_features(self): return self._attrs["platform"].get("os.features",None) @property def variant(self): return self._attrs["platform"].get("variant",None) @property def features(self): return self._attrs["platform"].get("features",[]) @property def manifest(self): if not self._manifest_attrs: resp = self.api.get_manifest(self.repository,self.digest,raw=True) self._id = None if "Docker-Content-Digest" in resp.headers: self._digest = self._id = resp.headers["Docker-Content-Digest"] self._manifest_attrs = resp.json() return self._manifest_attrs @property def config_media_type(self): return self.manifest["config"]["mediaType"] @property def config_size(self): return self.manifest["config"]["size"] @property def config_digest(self): return self.manifest["config"]["digest"] @property def config(self): if not self._config_attrs: self._config_attrs = \ self.api.get_manifest(self.repository,self.config_digest) return self._config_attrs @property def created(self): config = self.config if config and "created" in config: return config["created"] return None @property def layers(self): return [ x["digest"] for x in self.manifest["layers"] ]
[docs] def refresh(self): resp = self.api.get_manifest(self.repository,self.digest,raw=True) id = resp.headers.get("Docker-Content-Digest",None) ctype = response.headers.get("Content-type",None) self._manifest = create_v2_manifest( self.api,resp.json(),resp.content,repository=self.repository, tag=self.tag,id=id,content_type=ctype)
def __repr__(self): return "<%s %s:%s,%s,%s,size=%d>" % ( self.__class__.__name__,self.repository,self.digest, self.architecture,self.os,self.size)
[docs]class ManifestListV22(ManifestList): _media_types = [ "application/vnd.docker.distribution.manifest.list.v2+json" ]
[docs] def __init__(self,api,blob={},raw=None,media_type=None,repository=None,tag=None,id=None): if media_type is None: media_type = blob["mediaType"] elif "media_type" not in blob: blob["media_type"] = media_type elif media_type != blob["mediaType"]: raise MalformedManifestListError("media_type mismatch") super(ManifestListV22,self).__init__( api,blob,raw,media_type=media_type, repository=repository,tag=tag,id=id) self._manifests = \ [ ManifestV22(api,x,manifest_list=self) for x in blob.get("manifests",[]) ] self._version = 2
@property def schema_version(self): return self._attrs["schemaVersion"] @property def manifests(self): return self._manifests
[docs] def get_platform_manifest(self,arch=getgoarch,oss=getgoos): if not arch or not oss: raise ex.IllegalArgumentError("both arch and os must be set") if six.callable(arch): arch = arch() if six.callable(oss): oss = oss() for m in self.manifests: if arch == m.architecture() and oss == m.os(): return m return None
[docs] def refresh(self): resp = self.api.get_manifest(self.repository,self.digest,raw=True) id = resp.headers.get("Docker-Content-Digest",None) ctype = response.headers.get("Content-type",None) self._manifest = create_v2_manifest( self.api,resp.json(),resp.content,repository=self.repository, tag=self.tag,id=id,content_type=ctype)
def __eq__(self,other): if self.__class__ != other.__class__: return False if self.repository != other.repository or self.tag != other.tag \ or self.id != other.id: return False if self.raw != other.raw: return False return True def __hash__(self): return hash( (self.__class__.__name__,self.repository,self.tag,self.id, self.raw,self._attrs))
[docs]def create_v2_manifest(api,blob,raw,repository=None,tag=None,id=None, content_type=None): if content_type: for kls in [ ManifestListV22,ManifestV21 ]: if kls._media_types is not None and content_type in kls._media_types: return kls( api,blob,raw,media_type=content_type, repository=repository,tag=tag,id=id) elif "manifests" in blob: return ManifestListV22( api,blob,raw,media_type=content_type, repository=repository,tag=tag,id=id) else: return ManifestV21( api,blob,raw,media_type=content_type, repository=repository,tag=tag,id=id)