import json
from dockerreg.api import BaseApiClient, _requests_formatter
from dockerreg.models import v1 as modelsv1
#from dockerreg.util.applicable \
# import ApplicableClass, ApplicableMethod
[docs]class RepositoryV1Mixin(object):
[docs] def get_image(self,image_id,raw=False):
resp = self.get("/images/%s/json" % (image_id))
if raw:
return resp
return resp.json()
[docs] def put_image(self,image_id,metadata):
return self.put(
"/images/%s/json" % (image_id),
data=json.dumps(metadata,indent=None,separators=(',',':')),
headers={"content-type":"application/json"})
[docs] def put_layer(self,image_id,data):
return self.put(
"/images/%s/layer" % (image_id),data=data,
headers={"transfer-encoding":"chunked"})
[docs] def get_layer(self,image_id):
return self.get("/images/%s/layer" % (image_id))
[docs] def get_ancestry(self,image_id):
return self.get("/images/%s/ancestry" % (image_id))
[docs] def get_image_id(self,repository,tag):
return self.get("/repositories/%s/tags/%s" % (repository,tag))
[docs] def delete_tag(self,repository,tag):
return self.delete("/repositories/%s/tags/%s" % (repository,tag))
[docs] def set_tag(self,repository,tag,image_id):
return self.put(
"/repositories/%s/tag/%s" % (repository,tag),
data=json.dumps(image_id,indent=None,separators=(',',':')),
headers={"content-type":"application/json"})
[docs] def delete_repository(self,repository):
return self.delete("/repositories/%s" % (repository))
[docs]class RegistryV1Mixin(object):
#@ApplicableMethod(alias="v1.search")
[docs] def search(self,q,n=25,page=1):
return self.get("/search",params=dict(q=q,n=n,page=page))
#@ApplicableMethod(alias="v1.ping")
[docs] def ping(self):
resp = self.get("/_ping")
if resp.status_code == 200:
return True
return False
#@ApplicableClass()
[docs]class ApiClientV1(BaseApiClient,RegistryV1Mixin,RepositoryV1Mixin):
[docs] def __init__(self,host,url_prefix=None,username=None,
auth=None,auth_url=None,verify=True,cert=None,
cache=None,**kwargs):
super(ApiClientV1,self).__init__(
host,url_prefix=url_prefix,username=username,
auth=auth,auth_url=auth_url,verify=verify,cert=cert,
cache=cache,version=1,**kwargs)
[docs] def registry(self):
return modelsv1.RegistryV1(self,name=self.host)