import six
import abc
import sys
import re
import hashlib
import os
import json
import jose
from jose.jwk import ECKey
import ecdsa
import base64
from dockerreg.log import LOG
import dockerreg.exceptions as ex
from dockerreg.util import (
getgoarch, getgoos,
joseb64urldecode, joseb64urlencode, num_joseb64urlencode)
from dockerreg.models import (
Registry, Repository, Image, Model, Manifest, SignableMixin )
from dockerreg.models.v1 import ManifestV1
[docs]class RegistryV2(Registry):
[docs] def __init__(self,api,name=None):
if api.version != 2:
raise ex.ModelApiVersionMismatch("v2 registry, not v2 api")
super(RegistryV2,self).__init__(api,name=name)
self._db = None
def _get_repositories(self,refresh=False):
if self._db is not None and not refresh:
return self._db
resp = self.api.get_repositories()
if not "repositories" in resp:
raise DockerRegistryError(
"API error: /v2/get_repositories: expected 'repositories' list")
self._db = {}
repos = resp["repositories"]
for repo in repos:
idx = repo.rfind("/")
if idx < 0:
namespace = ""
repo = repo
else:
namespace = repo[:idx]
repo = repo[(idx+1):]
if not namespace in self._db:
self._db[namespace] = []
self._db[namespace].append(repo)
return self._db
[docs] def get_namespaces(self,regexp=None):
"""
:returns: a :py:class:`list` of namespaces in this registry.
:rtype: :py:class:`list`
"""
if not regexp:
return list(self._get_repositories().keys())
retval = []
sre = re.compile(regexp)
for namespace in list(self._get_repositories().keys()):
if sre.search(namespace) is not None:
retval.append(namespace)
return retval
[docs] def get_repositories(self,regexp=None,namespace_regexp=None):
"""
:returns: a :py:class:`dict` of namespace-to-repository-lists in this registry.
:rtype: :py:class:`dict`
"""
if not regexp and not namespace_regexp:
return self._get_repositories()
retval = {}
(sre,nsre) = (None,None)
if regexp:
sre = re.compile(regexp)
if namespace_regexp:
nsre = re.compile(namespace_regexp)
db = self._get_repositories()
for namespace in list(db.keys()):
if nsre and nsre.search(namespace) is None:
continue
if not sre:
retval[namespace] = db[namespace]
continue
retval[namespace] = []
for repo in db[namespace]:
if sre.search(repo) is not None:
retval[namespace].append(repo)
return retval
[docs] def check_image(self,repository,tag,arch=getgoarch,oss=getgoos):
r = RepositoryV2(self.api,repository,self)
i = r.get_image(tag,arch=arch,oss=oss)
if not i:
return -1
return i.validate(refresh=True)
[docs] def pull_image(self,repository,tag,arch=getgoarch,oss=getgoos,
filebasename=None,refresh=False):
"""
Pulls the image descriptor and all layers, returning an :py:class:`Image`representing the downloaded image. If the `filebasename` parameter is set, the raw data will be cached in files instead of in memory.
:param str repository: the full name of the image, including the namespace and repository.
:param str tag: the tag indicating the version of the image.
:param str arch: a GOARCH architecture string (i.e. amd64); defaults to the platform this library is running on; see https://golang.org/doc/install/source#environment for valid values.
:param str oss: a GOOS operating system string (i.e. linux); defaults to the platform this library is running on; see https://golang.org/doc/install/source#environment for valid values.
:param str filebasename: if set, the image manifest and layers will be cached in files named `filebasename`.manifest and `filebasename`.<layer_id>. If the image already exists on disk via `filebasename`, it will be loaded from disk.
:param bool refresh: if set `False` (the default), and if the image exists on disk via `filebasename`, the image will *not* be refreshed (i.e., will not be re-pulled from the registry; only layers that are not cached in files will be pulled; the manifest will *not* be refreshed).
:returns: a :py:class:`Image`.
"""
r = RepositoryV2(self.api,repository,self)
i = r.get_image(tag,arch=arch,oss=oss,filebasename=filebasename)
if not i:
return None
i.pull(refresh=refresh)
LOG.debug("pulled image %r" % (repr(i)))
return i
[docs] def get_image_size(self,repository,tag,arch=getgoarch,oss=getgoos,
filebasename=None):
r = RepositoryV2(self.api,repository,self)
i = r.get_image(tag,arch=arch,oss=oss,filebasename=filebasename)
if not i:
return None
return i.size()
[docs] def get_image_created(self,repository,tag,arch=getgoarch,oss=getgoos,
filebasename=None):
r = RepositoryV2(self.api,repository,self)
i = r.get_image(tag,arch=arch,oss=oss,filebasename=filebasename)
if not i:
return None
return i.manifest.created
[docs] def pull_and_push_image(self,repository,tag,arch=getgoarch,oss=getgoos,
filebasename=None,refresh=False,
dst_registry=None,dst_repository=None,dst_tag=None,
dst_username=None,dst_password=None,
dst_no_verify=None,dst_cert=None,dst_ca_bundle=None,
dst_skip_docker_config=False,dst_no_cache=False,
dst_cache_file=None):
"""
Pulls the repository:tag image from this registry, and pushes it to dstregistry/dstrepository:dsttag. If all dst* parameters are None, an IllegalArgumentError will be raised. If any of the dst* parameters are unspecified, their values will be taken from the current values of the specified image. This allows you to simply re-tag an image; copy it to another repository; or copy it to another repository.
"""
if not dst_repository:
dst_repository = repository
if not dst_tag:
dst_tag = tag
if dst_registry and dstregistry != self.name:
verify_val = True
if dst_no_verify:
verify_val = False
elif dst_ca_bundle is not None:
verify_val = dst_ca_bundle
if not dst_username and not dst_skip_docker_config:
auth = DockerConfigAuth()
else:
auth = IndexedBasicAuth.from_user_pass(
dst_registry,dst_username,password=dst_password,
skip_docker_config=dst_skip_docker_config)
cache = None
if not dst_no_cache:
if dst_cache_file:
cache = BearerTokenCache(filename=dst_cache_file)
else:
cache = BearerTokenCache()
napi = ApiClient(
dstregistry,2,username=dst_username,auth=auth,
verify=verify_val,cert=dst_cert,cache=cache)
reg = napi.registry()
dst_repo = RepositoryV2(napi,dst_repository,nreg)
else:
dst_repo = RepositoryV2(self.api,repository,self)
# Ok, now that we've processed our input args, let's do the real work.
i = self.pull_image(repository,tag,arch=arch,oss=oss,
filebasename=filebasename,refresh=refresh)
ni = i.copy_to(dst_repo,dst_tag,lazy=False)
ni.push()
return
[docs] def tag_image(self,repository,tag,newtag,arch=getgoarch,oss=getgoos):
r = RepositoryV2(self.api,repository,self)
i = ImageV2(self.api,tag,r)
ni = i.copy_to(tag=newtag)
ni.push()
return
[docs] def delete_tag(self,repository,tag,arch=getgoarch,oss=getgoos):
r = RepositoryV2(self.api,repository,self)
# We cannot directly delete a tag, so instead, we push a new
# empty manifest to that tag, then delete that manifest.
empty = ManifestV21(self.api,repository=r,tag=tag)
empty.push()
empty.delete()
return None
[docs] def push_image(self,repository,tag,filebasename=None):
r = RepositoryV2(self.api,repository,self)
i = ImageV2(self.api,tag,r,filebasename=filebasename)
return i.push()
[docs] def delete_image(self,repository,tag,layers=False,
arch=getgoarch,oss=getgoos):
r = RepositoryV2(self.api,repository,self)
i = r.get_image(tag,arch=arch,oss=oss)
i.delete(layers=layers)
return None
[docs]class RepositoryV2(Repository):
[docs] def __init__(self,api,repository,registry):
super(RepositoryV2,self).__init__(api,repository,registry)
self._tags = None
self._tag_metadata = None
[docs] def refresh(self):
self._tags = None
self._tag_metadata = None
self.get_tags()
[docs] def get_image(self,tag,arch=getgoarch,oss=getgoos,filebasename=None):
"""
Returns an :py:class:`Image`representing the requested image. If the `filebasename` parameter is set, any downloaded raw data will be cached in files instead of in memory.
:param str tag: the tag indicating the version of the image.
:param str filebasename: if set, the image manifest and layers will be cached in files named `filebasename`.manifest and `filebasename`.<layer_id>.
:param str arch: a GOARCH architecture string (i.e. amd64); defaults to the platform this library is running on; see https://golang.org/doc/install/source#environment for valid values.
:param str oss: a GOOS operating system string (i.e. linux); defaults to the platform this library is running on; see https://golang.org/doc/install/source#environment for valid values.
:returns: a :py:class:`Image`.
"""
return ImageV2(self.api,tag,self,arch=arch,oss=oss,
filebasename=filebasename)
[docs]class ImageV2(Image):
[docs] def __init__(self,api,tag,repository,id=None,filebasename=None,
arch=getgoarch,oss=getgoos):
super(ImageV2,self).__init__(api,tag,repository,id=id)
self._filebasename = filebasename
self._arch = arch
self._os = oss
self._manifest = None
self._manifest_file = None
if self._filebasename is not None:
self._manifest_file = "%s.%s" % (self._filebasename,"manifest")
if os.path.exists(self._manifest_file):
f = open(self._manifest_file,'r')
raw = f.read()
self._manifest = create_v2_manifest(
self.api,json.loads(raw),raw,repository=self.repository,
tag=self.tag)
self._layer_blobs = {}
self._layer_files = {}
@property
def id(self):
return self.manifest.digest
@property
def filebasename(self):
return self._filebasename
@property
def manifest(self):
if self._manifest is not None:
return self._manifest
self.__update()
return self._manifest
def __delete_stale_layers(self):
ll = self.manifest.layers
for lid in list(self._layer_blobs.keys()):
if not lid in ll:
del self._layer_blobs[lid]
for lid in list(self._layer_files.keys()):
if not lid in ll:
del self._layer_files[lid]
# This returns True or False according to refresh(). However, we
# also use it to implement the manifest property (which ignores the
# return value).
def __update(self):
resp = self.api.get_manifest(self.repository.name,self.tag,raw=True)
id = resp.headers.get("Docker-Content-Digest",None)
ctype = resp.headers.get("Content-type",None)
m = create_v2_manifest(
self.api,resp.json(),resp.content,repository=self.repository,
tag=self.tag,id=id,content_type=ctype)
m.verify()
# If we have one, we're calling this from refresh(); so return False
if self._manifest is not None and m == self._manifest:
return False
self._manifest = m
if self._manifest_file is not None:
LOG.debug("writing %s" % (self._manifest_file))
mf = open(self._manifest_file,'w')
mf.write(self.manifest.raw)
mf.close()
self.__delete_stale_layers()
return True
[docs] def refresh(self):
return self.__update()
[docs] def size(self):
ret = 0
m = self.manifest
for layer_id in m.layers:
ret += self.layer_size(layer_id)
return ret
[docs] def layer_size(self,layer_id):
return self.api.get_blob_size(self.repository.name,layer_id)
[docs] def validate(self,refresh=False):
if refresh:
self.refresh()
m = self.manifest
retval = 0
for layer_id in m.layers:
retval += self.check_layer(layer_id)
return retval
[docs] def pull(self,refresh=False):
m = self.manifest
for layer_id in m.layers:
self.get_layer_content(layer_id)
return
[docs] def push(self):
m = self.manifest
for layer_id in m.layers:
# See if the layer is already uploaded
try:
self.api.get_blob_size(self.repository.name,layer_id)
LOG.debug("layer %s already exists in %s"
% (layer_id,self.repository))
except:
LOG.debug("layer %s not in %s; uploading"
% (layer_id,self.repository))
self.api.post_blob(self.repository.name,layer_id,
self.get_layer_content(layer_id))
return self.api.put_manifest(
self.repository.name,self.tag,m.raw,media_type=m.media_type)
[docs] def delete(self,layers=False):
if layers is True:
m = self.manifest
for layer_id in m.layers:
# See if the layer is already uploaded
try:
self.api.get_blob_size(self.repository.name,layer_id)
LOG.debug("layer %s already exists in %s"
% (layer_id,self.repository))
self.api.delete_blob(self.repository.name,layer_id)
except:
LOG.debug("layer %s not in %s; ignoring"
% (layer_id,self.repository))
#self.api.delete_manifest(self.repository.name,self.manifest.digest,
# media_type=self.manifest.media_type)
self.api.delete_manifest_by_tag(self.repository.name,self.tag)
self._deleted = True
return None
[docs] def set_manifest(self,manifest):
self._manifest = manifest
if self.filebasename:
self._manifest_file = "%s.%s" % (self.filebasename,"manifest")
else:
self._manifest_file = None
if self._manifest_file:
f = open(self._manifest_file,'w')
f.write(self.manifest.raw)
f.close()
self.__delete_stale_layers()
[docs] def check_layer(self,layer_id):
try:
self.api.get_blob_size(self.repository.name,layer_id)
LOG.debug("layer %s exists in %s" % (layer_id,self.repository))
return 0
except:
LOG.debug("layer %s not in %s; ignoring"
% (layer_id,self.repository))
return 1
[docs] def get_layer_content(self,layer_id):
[alg,digest] = layer_id.split(":")
f = "%s.%s" % (self.filebasename,layer_id)
if f and os.path.exists(f):
fd = open(f,'rb')
blob = fd.read()
fd.close()
fdig = hashlib.new(alg,blob).hexdigest()
if fdig != digest:
raise ex.LayerContentMismatch(
"layer %s digest does not match content in %s"
% (digest,f))
LOG.debug("%s already contains cached layer %s" % (f,layer_id))
return blob
elif layer_id in self._layer_blobs:
LOG.debug("already cached layer %s in memory" % (layer_id))
return self._layer_blobs[layer_id]
# Ok, we have to download the blob.
blob = self.api.get_blob(self.repository.name,layer_id)
# Then either save in memory or to file.
self.set_layer_content(layer_id,blob)
return blob
[docs] def set_layer_content(self,layer_id,blob):
[alg,digest] = layer_id.split(":")
if self.filebasename:
f = "%s.%s" % (self.filebasename,layer_id)
else:
f = None
if f:
f = open(f,'wb')
f.write(blob)
f.close()
self._layer_files[digest] = f
else:
self._layer_blobs[digest] = blob
[docs] def copy_to(self,repository=None,tag=None,filebasename=None,
lazy=True):
"""
Copies this image, possibly to a new repository (possibly registry) and/or tag. At least one of these values must be set; otherwise an IllegalArgumentError will be generated. Moreover, they must not be set to the same values as the image has; this will also result in an IllegalArgumentError.
:param :py:class:`dockerreg.models.Repository` repository: a :py:class:`dockerreg.models.Repository` object, specifying a new repository (and possibly a different :py:class:`Registry`).
:param str tag: a new tag to use for this image.
"""
if not repository and not tag:
raise ex.IllegalArgumentError("must specify one of repository and tag")
if not repository:
repository = self.repository
if not tag:
tag = self.tag
if tag == self.tag and repository == self.repository:
raise ex.IllegalArgumentError(
"repository and tag (%s,%s) are the same as image (%s,%s)"
% (repository,tag,self.repository,self.tag))
ni = ImageV2(self.api,tag,repository,filebasename=filebasename,
arch=self._arch,oss=self._os)
nm = self.manifest.copy_to(repository,tag)
ni.set_manifest(nm)
if not lazy:
for layer_id in self.manifest.layers:
ni.set_layer_content(layer_id,self.get_layer_content(layer_id))
return ni
def __repr__(self):
extra = " "
if self.id:
extra += "id=" + self.id
if self.filebasename:
if extra:
extra += ","
extra += "backingfile=" + self._manifest_file
return "<%s %s/%s:%s%s>" % (
self.__class__.__name__,self.repository.registry.name,
self.repository.name,self.name,extra)
pass
[docs]class ManifestV1Compat(ManifestV1):
pass
[docs]class ManifestV21(Manifest,SignableMixin):
_media_types = [
"application/vnd.docker.distribution.manifest.v1+json",
"application/vnd.docker.distribution.manifest.v1+prettyjws" ]
[docs] def __init__(self,api,blob={},raw=None,media_type=None,
repository=None,tag=None,id=None):
blob = dict(blob)
need_resign = False
if repository \
and (not blob.get("name") or repository.name != blob.get("name")):
blob["name"] = repository.name
need_resign = True
if tag and (not blob.get("tag") or tag != blob.get("tag")):
blob["tag"] = tag
need_resign = True
if not media_type:
media_type = ManifestV21._media_types[0]
if not blob.get("schemaVersion"):
blob["schemaVersion"] = 1
elif blob.get("schemaVersion") != 1:
raise ex.MalformedManifestError("invalid schemaVersion; should be 1")
if not blob.get("fsLayers"):
blob["fsLayers"] = []
need_resign = True
super(ManifestV21,self).__init__(
api,blob,raw,media_type=media_type,repository=repository,tag=tag,
id=id)
self._version = 2
self._history = []
for x in self._attrs.get("history",[]):
if not "v1Compatibility" in x:
raise ex.MalformedManifestError(
"unrecognized element in v1compat history list (%s)"
% (str(list(x.keys()))))
cb = x["v1Compatibility"]
m1c = ManifestV1Compat(
self.api,json.loads(cb),cb,
repository=self.repository,tag=self.tag,id=self.id)
self._history.append(m1c)
if need_resign:
self.sign()
self._id = None
LOG.debug("new manifest: %s" % (self.raw))
@property
def raw(self):
if not self._raw:
self._raw = json.dumps(
dict(self._attrs),indent=3,separators=(', ',': '))
return self._raw
def __digest(self):
attrsNoSig = dict(self._attrs)
if "signatures" in attrsNoSig:
del attrsNoSig["signatures"]
blob = json.dumps(
dict(attrsNoSig),indent=3,separators=(', ',': '))
return "sha256:" + hashlib.sha256(blob).hexdigest()
@property
def id(self):
if not self._id:
self._id = self.__digest()
return self._id
@property
def digest(self):
return self.id
@property
def schema_version(self):
return self._attrs.get("schemaVersion")
@property
def architecture(self):
return self._attrs.get("architecture")
@property
def layers(self):
return [ x["blobSum"] for x in self._attrs["fsLayers"] ]
@property
def history(self):
return self._history
@property
def created(self):
history = self.history
if history and len(history) > 0:
return history[0].created
return None
@property
def signatures(self):
return self._attrs.get("signatures")
[docs] def refresh(self):
resp = self.api.get_manifest(self.repository,self.tag,raw=True)
self._id = None
if "Docker-Content-Digest" in resp.headers:
self._id = resp.headers["Docker-Content-Digest"]
self._attrs = resp.json()
self._history = None
if "history" in self._attrs:
self._history = [ ManifestV1Compat(self.api,x,json.dumps(x)) for x in self._attrs["history"] ]
[docs] def verify(self):
manifest_json = self._attrs
raw = self.raw
if not "signatures" in manifest_json \
or len(manifest_json["signatures"]) == 0:
return None
for sig in manifest_json["signatures"]:
protected = joseb64urldecode(sig["protected"])
protected_json = json.loads(protected.decode('utf-8'))
signature = joseb64urldecode(sig["signature"])
orig_manifest = raw[:protected_json["formatLength"]] \
+ joseb64urldecode(protected_json["formatTail"])
if not "jwk" in sig["header"]:
raise Exception("only support jwk private key signatures")
key = ECKey(sig["header"]["jwk"],sig["header"]["alg"])
encm = joseb64urlencode(orig_manifest).decode('utf-8')
vsig = sig["protected"] + "." + encm
ret = key.verify(vsig.encode('utf-8'),signature)
if not ret:
LOG.debug("signature %s (%s) not verified!"
% (sig["signature"],protected_json.get("time",None)))
return False
else:
LOG.debug("signature %s (%s) verified"
% (sig["signature"],protected_json.get("time",None)))
return True
[docs] def sign(self):
newblob = dict(self._attrs)
if "signatures" in newblob:
del newblob["signatures"]
ekey = ecdsa.SigningKey.generate(curve=ecdsa.NIST256p)
#vkey = ekey.get_verifying_key()
#vkey_hash = hashlib.new('sha256',vkey.to_der()).hexdigest()
#vkey_b32 = base64.b32encode(vkey_hash[:30]).rstrip('=')
#kid = ""
#for i in range(0,len(vkey_b32)/4):
# kid += vkey_b32[i*4:i*4+4] + ":"
#kid = kid.rstrip(":")
keyd = dict(#kid=kid
kty="EC",crv="P-256",
x=num_joseb64urlencode(ekey.privkey.public_key.point.x()).decode('utf-8'),
y=num_joseb64urlencode(ekey.privkey.public_key.point.y()).decode('utf-8'))
header = dict(alg="ES256",jwk=keyd)
skey = ECKey(ekey,header["alg"])
newraw = json.dumps(newblob,indent=3,separators=(', ',': '))
newraw_enc = joseb64urlencode(newraw.encode('utf-8')).decode('utf-8')
format_length = newraw.rfind("}")
format_tail = newraw[format_length:]
format_tail_enc = joseb64urlencode(format_tail.encode('utf-8')).decode('utf-8')
protected_json = dict(formatLength=format_length,
formatTail=format_tail_enc)
protected = json.dumps(protected_json,indent=None,separators=(',',':'))
protected_enc = joseb64urlencode(protected.encode('utf-8')).decode('utf-8')
signed_thing = "%s.%s" % (protected_enc,newraw_enc)
signature = skey.sign(signed_thing.encode('utf-8'))
signature_enc = joseb64urlencode(signature).decode('utf-8')
LOG.info("signature = %s (%s" % (signature_enc,signed_thing))
signatures = [ dict(header=header,signature=signature_enc,
protected=protected_enc) ]
newsignedraw = "%s,\n \"signatures\": %s\n%s" % (
newraw[:format_length],
json.dumps(signatures,indent=3,separators=(', ',': ')),
format_tail)
# Move the new JSON blob and (signed) raw manifest into place:
self._attrs = newblob
self._raw = newsignedraw
return
[docs] def copy_to(self,repository,tag):
nm = ManifestV21(
self.api,self._attrs,self.raw,media_type=self.media_type,
repository=self.repository,tag=self.tag,id=self.id)
nm.modify(repository,tag)
return nm
[docs] def modify(self,repository=None,tag=None):
if repository:
self._repository = repository
if tag:
self._tag = tag
self._id = None
self.sign()
LOG.debug("modified manifest:\n%s" % (self.raw))
[docs] def push(self):
if not self.signatures:
self.sign()
self._id = None
LOG.debug("pushing (to %r) manifest = %r"
% (repr(self.tag or self.id),repr(self.raw)))
return self.api.put_manifest(
self.repository.name,self.tag or self.id,self.raw,
media_type=self.media_type)
[docs] def delete(self):
if not self.signatures:
self.sign()
self._id = None
LOG.debug("deleting manifest %r" % (repr(self.id)))
return self.api.delete_manifest(self.repository.name,self.id)
def __repr__(self):
return "<%s %s,%s,nlayers=%d>" % (
self.__class__.__name__,self.name,self.architecture,
len(self.layers))
@six.add_metaclass(abc.ABCMeta)
[docs]class ManifestList(Model):
[docs] def __init__(self,api,blob={},repository=None):
self.api = api
self._repository = repository
self._attrs = blob
self._manifest = None
@abc.abstractmethod
[docs]class ManifestV22(Manifest):
_pointed_media_types = [
"application/vnd.docker.distribution.manifest.v2+json",
"application/vnd.docker.distribution.manifest.v1+json" ]
_media_types = [
"application/vnd.docker.image.manifest.v2+json",
"application/vnd.docker.image.manifest.v1+json" ]
[docs] def __init__(self,api,blob={},raw=None,media_type=None,repository=None,tag=None,id=None,
manifest_list=None):
media_type = blob["media_type"]
id = blob["digest"]
super(ManifestV22,self).__init__(
api,blob,media_type=media_type,
repository=repository,tag=tag,id=did)
self._version = 2
self._manifest_list = manifest_list
self._manifest_attrs = None
self._config_attrs = None
@property
def schema_version(self):
return self._attrs["schemaVersion"]
@property
def manifest_list(self):
return self._manifest_list
@property
def media_type(self):
return self._attrs["mediaType"]
@property
def repository(self):
return self._repository
@property
def size(self):
return self._attrs["size"]
@property
def digest(self):
return self._attrs["digest"]
@property
def architecture(self):
return self._attrs["platform"]["architecture"]
@property
def os(self):
return self._attrs["platform"].get("os",None)
@property
def os_version(self):
return self._attrs["platform"].get("os.version",None)
@property
def os_features(self):
return self._attrs["platform"].get("os.features",None)
@property
def variant(self):
return self._attrs["platform"].get("variant",None)
@property
def features(self):
return self._attrs["platform"].get("features",[])
@property
def manifest(self):
if not self._manifest_attrs:
resp = self.api.get_manifest(self.repository,self.digest,raw=True)
self._id = None
if "Docker-Content-Digest" in resp.headers:
self._digest = self._id = resp.headers["Docker-Content-Digest"]
self._manifest_attrs = resp.json()
return self._manifest_attrs
@property
def config_media_type(self):
return self.manifest["config"]["mediaType"]
@property
def config_size(self):
return self.manifest["config"]["size"]
@property
def config_digest(self):
return self.manifest["config"]["digest"]
@property
def config(self):
if not self._config_attrs:
self._config_attrs = \
self.api.get_manifest(self.repository,self.config_digest)
return self._config_attrs
@property
def created(self):
config = self.config
if config and "created" in config:
return config["created"]
return None
@property
def layers(self):
return [ x["digest"] for x in self.manifest["layers"] ]
[docs] def refresh(self):
resp = self.api.get_manifest(self.repository,self.digest,raw=True)
id = resp.headers.get("Docker-Content-Digest",None)
ctype = response.headers.get("Content-type",None)
self._manifest = create_v2_manifest(
self.api,resp.json(),resp.content,repository=self.repository,
tag=self.tag,id=id,content_type=ctype)
def __repr__(self):
return "<%s %s:%s,%s,%s,size=%d>" % (
self.__class__.__name__,self.repository,self.digest,
self.architecture,self.os,self.size)
[docs]class ManifestListV22(ManifestList):
_media_types = [ "application/vnd.docker.distribution.manifest.list.v2+json" ]
[docs] def __init__(self,api,blob={},raw=None,media_type=None,repository=None,tag=None,id=None):
if media_type is None:
media_type = blob["mediaType"]
elif "media_type" not in blob:
blob["media_type"] = media_type
elif media_type != blob["mediaType"]:
raise MalformedManifestListError("media_type mismatch")
super(ManifestListV22,self).__init__(
api,blob,raw,media_type=media_type,
repository=repository,tag=tag,id=id)
self._manifests = \
[ ManifestV22(api,x,manifest_list=self) for x in blob.get("manifests",[]) ]
self._version = 2
@property
def schema_version(self):
return self._attrs["schemaVersion"]
@property
def manifests(self):
return self._manifests
[docs] def refresh(self):
resp = self.api.get_manifest(self.repository,self.digest,raw=True)
id = resp.headers.get("Docker-Content-Digest",None)
ctype = response.headers.get("Content-type",None)
self._manifest = create_v2_manifest(
self.api,resp.json(),resp.content,repository=self.repository,
tag=self.tag,id=id,content_type=ctype)
def __eq__(self,other):
if self.__class__ != other.__class__:
return False
if self.repository != other.repository or self.tag != other.tag \
or self.id != other.id:
return False
if self.raw != other.raw:
return False
return True
def __hash__(self):
return hash(
(self.__class__.__name__,self.repository,self.tag,self.id,
self.raw,self._attrs))
[docs]def create_v2_manifest(api,blob,raw,repository=None,tag=None,id=None,
content_type=None):
if content_type:
for kls in [ ManifestListV22,ManifestV21 ]:
if kls._media_types is not None and content_type in kls._media_types:
return kls(
api,blob,raw,media_type=content_type,
repository=repository,tag=tag,id=id)
elif "manifests" in blob:
return ManifestListV22(
api,blob,raw,media_type=content_type,
repository=repository,tag=tag,id=id)
else:
return ManifestV21(
api,blob,raw,media_type=content_type,
repository=repository,tag=tag,id=id)