Source code for dockerreg.api.v1

import json
from dockerreg.api import BaseApiClient, _requests_formatter
from dockerreg.models import v1 as modelsv1
#from dockerreg.util.applicable \
#  import ApplicableClass, ApplicableMethod

[docs]class RepositoryV1Mixin(object):
[docs] def get_image(self,image_id,raw=False): resp = self.get("/images/%s/json" % (image_id)) if raw: return resp return resp.json()
[docs] def put_image(self,image_id,metadata): return self.put( "/images/%s/json" % (image_id), data=json.dumps(metadata,indent=None,separators=(',',':')), headers={"content-type":"application/json"})
[docs] def put_layer(self,image_id,data): return self.put( "/images/%s/layer" % (image_id),data=data, headers={"transfer-encoding":"chunked"})
[docs] def get_layer(self,image_id): return self.get("/images/%s/layer" % (image_id))
[docs] def get_ancestry(self,image_id): return self.get("/images/%s/ancestry" % (image_id))
[docs] def get_tags(self,repository): return self.get("/repositories/%s/tags" % (repository))
[docs] def get_image_id(self,repository,tag): return self.get("/repositories/%s/tags/%s" % (repository,tag))
[docs] def delete_tag(self,repository,tag): return self.delete("/repositories/%s/tags/%s" % (repository,tag))
[docs] def set_tag(self,repository,tag,image_id): return self.put( "/repositories/%s/tag/%s" % (repository,tag), data=json.dumps(image_id,indent=None,separators=(',',':')), headers={"content-type":"application/json"})
[docs] def delete_repository(self,repository): return self.delete("/repositories/%s" % (repository))
[docs]class RegistryV1Mixin(object): #@ApplicableMethod(alias="v1.search")
[docs] def search(self,q,n=25,page=1): return self.get("/search",params=dict(q=q,n=n,page=page))
#@ApplicableMethod(alias="v1.ping")
[docs] def ping(self): resp = self.get("/_ping") if resp.status_code == 200: return True return False
#@ApplicableClass()
[docs]class ApiClientV1(BaseApiClient,RegistryV1Mixin,RepositoryV1Mixin):
[docs] def __init__(self,host,url_prefix=None,username=None, auth=None,auth_url=None,verify=True,cert=None, cache=None,**kwargs): super(ApiClientV1,self).__init__( host,url_prefix=url_prefix,username=username, auth=auth,auth_url=auth_url,verify=verify,cert=cert, cache=cache,version=1,**kwargs)
[docs] def registry(self): return modelsv1.RegistryV1(self,name=self.host)