from __future__ import absolute_import
from .resolve import Resolver
from .primitives import Primitive, MimeCodec
from .spec.v1_2.parser import ResourceListContext
from .spec.v2_0.parser import SwaggerContext
from .spec.v2_0.objects import Operation
from .spec.base import BaseObj
from .scan import Scanner
from .scanner import TypeReduce, CycleDetector
from .scanner.v1_2 import Upgrade
from .scanner.v2_0 import AssignParent, Merge, Resolve, PatchObject, YamlFixer, Aggregate, NormalizeRef
from pyswagger import utils, errs, consts
import base64
import six
import weakref
import logging
logger = logging.getLogger(__name__)
[docs]class App(object):
""" Major component of pyswagger
This object is tended to be used in read-only manner. Therefore,
all accessible attributes are almost read-only properties.
"""
sc_path = 1
_shortcut_ = {
sc_path: ('/', '#/paths')
}
def __init__(self, url=None, url_load_hook=None, sep=consts.private.SCOPE_SEPARATOR, prim=None, mime_codec=None, resolver=None):
""" constructor
:param url str: url of swagger.json
:param func url_load_hook: a way to redirect url to a accessible place. for self testing.
:param sep str: separator used by pyswager.utils.ScopeDict
:param prim pyswagger.primitives.Primitive: factory for primitives in Swagger.
:param resolver: pyswagger.resolve.Resolver: customized resolver used as default when none is provided when resolving
"""
logger.info('init with url: {0}'.format(url))
self.__root = None
self.__raw = None
self.__version = ''
self.__op = None
self.__m = None
self.__schemes = []
self.__url=url
# a map from json-reference to
# - spec.BaseObj
# - a map from json-pointer to spec.BaseObj
self.__objs = {}
if url_load_hook and resolver:
raise ValueError('when use customized Resolver, please pass url_load_hook to that one')
# the start-point when you want to traverse the code to laod new object
self.__resolver = resolver or Resolver(url_load_hook)
# allow init App-wised SCOPE_SEPARATOR
self.__sep = sep
# init a default Primitive as factory for primitives
self.__prim = prim if prim else Primitive()
# MIME codec
self.__mime_codec = mime_codec or MimeCodec()
@property
def root(self):
""" schema representation of Swagger API, its structure may
be different from different version of Swagger.
There is 'Schema' object in swagger 2.0, that's why I change this
property name from 'schema' to 'root'.
:type: pyswagger.spec.v2_0.objects.Swagger
"""
return self.__root
@property
def raw(self):
""" raw objects for original version of loaded resources.
When loaded json is the latest version we supported, this property is the same as App.root
:type: ex. when loading Swagger 1.2, the type is pyswagger.spec.v1_2.objects.ResourceList
"""
return self.__raw
@property
def op(self):
""" list of Operations, organized by utils.ScopeDict
In Swagger 2.0, Operation(s) can be organized with Tags and Operation.operationId.
ex. if there is an operation with tag:['user', 'security'] and operationId:get_one,
here is the combination of keys to access it:
- .op['user', 'get_one']
- .op['security', 'get_one']
- .op['get_one']
:type: pyswagger.utils.ScopeDict of pyswagger.spec.v2_0.objects.Operation
"""
return self.__op
@property
def m(self):
""" backward compatible to access Swagger.definitions in Swagger 2.0,
and Resource.Model in Swagger 1.2.
ex. a Model:user in Resource:Users, access it by .m['Users', 'user'].
For Schema object in Swagger 2.0, just access it by it key in json.
:type: pyswagger.utils.ScopeDict
"""
return self.__m
@property
def version(self):
""" original version of loaded json
:type: str
"""
return self.__version
@property
def schemes(self):
""" supported schemes, refer to Swagger.schemes in Swagger 2.0 for details
:type: list of str, ex. ['http', 'https']
"""
return self.__schemes
@property
def url(self):
"""
"""
return self.__url
@property
def prim_factory(self):
""" primitive factory used by this app
:type: pyswagger.primitives.Primitive
"""
return self.__prim
@property
def mime_codec(self):
""" mime codec used by this app
:type: pyswagger.primitives.MimeCodec
"""
return self.__mime_codec
[docs] def load_obj(self, jref, getter=None, parser=None):
""" load a object(those in spec._version_.objects) from a JSON reference.
"""
obj = self.__resolver.resolve(jref, getter)
# get root document to check its swagger version.
tmp = {'_tmp_': {}}
version = utils.get_swagger_version(obj)
if version == '1.2':
# swagger 1.2
with ResourceListContext(tmp, '_tmp_') as ctx:
ctx.parse(obj, jref, self.__resolver, getter)
elif version == '2.0':
# swagger 2.0
with SwaggerContext(tmp, '_tmp_') as ctx:
ctx.parse(obj)
elif version == None and parser:
with parser(tmp, '_tmp_') as ctx:
ctx.parse(obj)
version = tmp['_tmp_'].__swagger_version__ if hasattr(tmp['_tmp_'], '__swagger_version__') else version
else:
raise NotImplementedError('Unsupported Swagger Version: {0} from {1}'.format(version, jref))
if not tmp['_tmp_']:
raise Exception('Unable to parse object from {0}'.format(jref))
logger.info('version: {0}'.format(version))
return tmp['_tmp_'], version
[docs] def prepare_obj(self, obj, jref):
""" basic preparation of an object(those in sepc._version_.objects),
and cache the 'prepared' object.
"""
if not obj:
raise Exception('unexpected, passing {0}:{1} to prepare'.format(obj, jref))
s = Scanner(self)
if self.version == '1.2':
# upgrade from 1.2 to 2.0
converter = Upgrade(self.__sep)
s.scan(root=obj, route=[converter])
obj = converter.swagger
if not obj:
raise Exception('unable to upgrade from 1.2: {0}'.format(jref))
s.scan(root=obj, route=[AssignParent()])
# normalize $ref
url, jp = utils.jr_split(jref)
s.scan(root=obj, route=[NormalizeRef(url)])
# fix for yaml that treat response code as number
s.scan(root=obj, route=[YamlFixer()], leaves=[Operation])
# cache this object
if url not in self.__objs:
if jp == '#':
self.__objs[url] = obj
else:
self.__objs[url] = {jp: obj}
else:
if not isinstance(self.__objs[url], dict):
raise Exception('it should be able to resolve with BaseObj')
self.__objs[url].update({jp: obj})
# pre resolve Schema Object
# note: make sure this object is cached before using 'Resolve' scanner
s.scan(root=obj, route=[Resolve()])
return obj
[docs] def _validate(self):
""" check if this Swagger API valid or not.
:param bool strict: when in strict mode, exception would be raised if not valid.
:return: validation errors
:rtype: list of tuple(where, type, msg).
"""
v_mod = utils.import_string('.'.join([
'pyswagger',
'scanner',
'v' + self.version.replace('.', '_'),
'validate'
]))
if not v_mod:
# there is no validation module
# for this version of spec
return []
s = Scanner(self)
v = v_mod.Validate()
s.scan(route=[v], root=self.__raw)
return v.errs
@classmethod
[docs] def load(kls, url, getter=None, parser=None, url_load_hook=None, sep=consts.private.SCOPE_SEPARATOR, prim=None, mime_codec=None, resolver=None):
""" load json as a raw App
:param str url: url of path of Swagger API definition
:param getter: customized Getter
:type getter: sub class/instance of Getter
:param parser: the parser to parse the loaded json.
:type parser: pyswagger.base.Context
:param dict app_cache: the cache shared by related App
:param func url_load_hook: hook to patch the url to load json
:param str sep: scope-separater used in this App
:param prim pyswager.primitives.Primitive: factory for primitives in Swagger
:param mime_codec pyswagger.primitives.MimeCodec: MIME codec
:param resolver: pyswagger.resolve.Resolver: customized resolver used as default when none is provided when resolving
:return: the created App object
:rtype: App
:raises ValueError: if url is wrong
:raises NotImplementedError: the swagger version is not supported.
"""
logger.info('load with [{0}]'.format(url))
url = utils.normalize_url(url)
app = kls(url, url_load_hook=url_load_hook, sep=sep, prim=prim, mime_codec=mime_codec, resolver=resolver)
app.__raw, app.__version = app.load_obj(url, getter=getter, parser=parser)
if app.__version not in ['1.2', '2.0']:
raise NotImplementedError('Unsupported Version: {0}'.format(self.__version))
# update schem if any
p = six.moves.urllib.parse.urlparse(url)
if p.scheme:
app.schemes.append(p.scheme)
return app
[docs] def validate(self, strict=True):
""" check if this Swagger API valid or not.
:param bool strict: when in strict mode, exception would be raised if not valid.
:return: validation errors
:rtype: list of tuple(where, type, msg).
"""
result = self._validate()
if strict and len(result):
for r in result:
logger.error(r)
raise errs.ValidationError('this Swagger App contains error: {0}.'.format(len(result)))
return result
[docs] def prepare(self, strict=True):
""" preparation for loaded json
:param bool strict: when in strict mode, exception would be raised if not valid.
"""
self.validate(strict=strict)
self.__root = self.prepare_obj(self.raw, self.__url)
if hasattr(self.__root, 'schemes') and self.__root.schemes:
if len(self.__root.schemes) > 0:
self.__schemes = self.__root.schemes
else:
# extract schemes from the url to load spec
self.__schemes = [six.moves.urlparse(self.__url).schemes]
s = Scanner(self)
s.scan(root=self.__root, route=[Merge()])
s.scan(root=self.__root, route=[PatchObject()])
s.scan(root=self.__root, route=[Aggregate()])
# reducer for Operation
tr = TypeReduce(self.__sep)
cy = CycleDetector()
s.scan(root=self.__root, route=[tr, cy])
# 'op' -- shortcut for Operation with tag and operaionId
self.__op = utils.ScopeDict(tr.op)
# 'm' -- shortcut for model in Swagger 1.2
if hasattr(self.__root, 'definitions') and self.__root.definitions != None:
self.__m = utils.ScopeDict(self.__root.definitions)
else:
self.__m = utils.ScopeDict({})
# update scope-separater
self.__m.sep = self.__sep
self.__op.sep = self.__sep
# cycle detection
if len(cy.cycles['schema']) > 0 and strict:
raise errs.CycleDetectionError('Cycles detected in Schema Object: {0}'.format(cy.cycles['schema']))
@classmethod
[docs] def create(kls, url, strict=True):
""" factory of App
:param str url: url of path of Swagger API definition
:param bool strict: when in strict mode, exception would be raised if not valid.
:return: the created App object
:rtype: App
:raises ValueError: if url is wrong
:raises NotImplementedError: the swagger version is not supported.
"""
app = kls.load(url)
app.prepare(strict=strict)
return app
""" for backward compatible, for later version,
please call App.create instead.
"""
_create_ = create
[docs] def resolve(self, jref, parser=None):
""" JSON reference resolver
:param str jref: a JSON Reference, refer to http://tools.ietf.org/html/draft-pbryan-zyp-json-ref-03 for details.
:param parser: the parser corresponding to target object.
:type parser: pyswagger.base.Context
:return: the referenced object, wrapped by weakref.ProxyType
:rtype: weakref.ProxyType
:raises ValueError: if path is not valid
"""
logger.info('resolving: [{0}]'.format(jref))
if jref == None or len(jref) == 0:
raise ValueError('Empty Path is not allowed')
obj = None
url, jp = utils.jr_split(jref)
# check cacahed object against json reference by
# comparing url first, and find those object prefixed with
# the JSON pointer.
o = self.__objs.get(url, None)
if o:
if isinstance(o, BaseObj):
obj = o.resolve(utils.jp_split(jp)[1:])
elif isinstance(o, dict):
for k, v in six.iteritems(o):
if jp.startswith(k):
obj = v.resolve(utils.jp_split(jp[len(k):])[1:])
break
else:
raise Exception('Unknown Cached Object: {0}'.format(str(type(o))))
# this object is not found in cache
if obj == None:
if url:
obj, _ = self.load_obj(jref, parser=parser)
if obj:
obj = self.prepare_obj(obj, jref)
else:
# a local reference, 'jref' is just a json-pointer
if not jp.startswith('#'):
raise ValueError('Invalid Path, root element should be \'#\', but [{0}]'.format(jref))
obj = self.root.resolve(utils.jp_split(jp)[1:]) # heading element is #, mapping to self.root
if obj == None:
raise ValueError('Unable to resolve path, [{0}]'.format(jref))
if isinstance(obj, (six.string_types, six.integer_types, list, dict)):
return obj
return weakref.proxy(obj)
[docs] def s(self, p, b=_shortcut_[sc_path]):
""" shortcut of App.resolve.
We provide a default base for '#/paths'. ex. to access '#/paths/~1user/get',
just call App.s('user/get')
:param str p: path relative to base
:param tuple b: a tuple (expected_prefix, base) to represent a 'base'
"""
if b[0]:
return self.resolve(utils.jp_compose(b[0] + p if not p.startswith(b[0]) else p, base=b[1]))
else:
return self.resolve(utils.jp_compose(p, base=b[1]))
[docs] def dump(self):
""" dump into Swagger Document
:rtype: dict
:return: dict representation of Swagger
"""
return self.root.dump()
[docs]class Security(object):
""" security handler
"""
[docs] def __init__(self, app):
""" constructor
:param App app: App
"""
self.__app = app
# placeholder of Security Info
self.__info = {}
[docs] def update_with(self, name, security_info):
""" insert/clear authorizations
:param str name: name of the security info to be updated
:param security_info: the real security data, token, ...etc.
:type security_info: **(username, password)** for *basicAuth*, **token** in str for *oauth2*, *apiKey*.
:raises ValueError: unsupported types of authorizations
"""
s = self.__app.root.securityDefinitions.get(name, None)
if s == None:
raise ValueError('Unknown security name: [{0}]'.format(name))
cred = security_info
header = True
if s.type == 'basic':
cred = 'Basic ' + base64.standard_b64encode(six.b('{0}:{1}'.format(*security_info))).decode('utf-8')
key = 'Authorization'
elif s.type == 'apiKey':
key = s.name
header = getattr(s, 'in') == 'header'
elif s.type == 'oauth2':
key = 'access_token'
else:
raise ValueError('Unsupported Authorization type: [{0}, {1}]'.format(name, s.type))
self.__info.update({name: (header, {key: cred})})
[docs] def __call__(self, req):
""" apply security info for a request.
:param Request req: the request to be authorized.
:return: the updated request
:rtype: Request
"""
if not req._security:
return req
for s in req._security:
for k, v in six.iteritems(s):
if not k in self.__info:
logger.info('missing: [{0}]'.format(k))
continue
logger.info('applying: [{0}]'.format(k))
header, cred = self.__info[k]
if header:
req._p['header'].update(cred)
else:
utils.nv_tuple_list_replace(req._p['query'], utils.get_dict_as_tuple(cred))
return req
[docs]class BaseClient(object):
""" base implementation of SwaggerClient, below is an minimum example
to extend this class
.. code-block:: python
class MyClient(BaseClient):
# declare supported schemes here
__schemes__ = ['http', 'https']
def request(self, req_and_resp, opt):
# passing to parent for default patching behavior,
# applying authorizations, ...etc.
req, resp = super(MyClient, self).request(req_and_resp, opt)
# perform request by req
...
# apply result to resp
resp.apply(header=header, raw=data_received, status=code)
return resp
"""
# supported schemes, ex. ['http', 'https', 'ws', 'ftp']
__schemes__ = set()
[docs] def __init__(self, security=None):
""" constructor
:param Security security: the security holder
"""
# placeholder of Security
self.__security = security
[docs] def prepare_schemes(self, req):
""" make sure this client support schemes required by current request
:param pyswagger.io.Request req: current request object
"""
# fix test bug when in python3 scheme, more details in commint msg
ret = sorted(self.__schemes__ & set(req.schemes), reverse=True)
if len(ret) == 0:
raise ValueError('No schemes available: {0}'.format(req.schemes))
return ret
[docs] def request(self, req_and_resp, opt):
""" preprocess before performing a request, usually some patching.
authorization also applied here.
:param req_and_resp: tuple of Request and Response
:type req_and_resp: (Request, Response)
:return: patched request and response
:rtype: Request, Response
"""
req, resp = req_and_resp
# dump info for debugging
logger.info('request.url: {0}'.format(req.url))
logger.info('request.header: {0}'.format(req.header))
logger.info('request.query: {0}'.format(req.query))
logger.info('request.file: {0}'.format(req.files))
logger.info('request.schemes: {0}'.format(req.schemes))
# apply authorizations
if self.__security:
self.__security(req)
return req, resp
SwaggerApp = App
SwaggerSecurity = Security