blob: dbc5208807de7b9a7df48180fa765ad5c43421d2 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
This is the OpenAPI validator library.
Validates input using the OpenAPI specification version 3 from (a simplified version, ahem)
import yaml
import json
import functools
import operator
import re
class OpenAPIException(Exception):
def __init__(self, message):
self.message = message
# Python type names to JSON type names
py2JSON = {
'int': 'integer',
'float': 'float',
'str': 'string',
'list': 'array',
'dict': 'object',
'bool': 'boolean'
mcolors = {
'PUT': '#fca130',
'DELETE': '#f93e3e',
'GET': '#61affe',
'POST': '#49cc5c',
'PATCH': '#d5a37e'
class OpenAPI():
def __init__(self, APIFile):
""" Instantiates an OpenAPI validator given a YAML specification"""
if APIFile.endswith(".json") or APIFile.endswith(".js"):
self.API = json.load(open(APIFile))
self.API = yaml.load(open(APIFile))
def validateType(self, field, value, ftype):
""" Validate a single field value against an expected type """
# Get type of value, convert to JSON name of type.
pyType = type(value).__name__
jsonType = py2JSON[pyType] if pyType in py2JSON else pyType
# Check if type matches
if ftype != jsonType:
raise OpenAPIException("OpenAPI mismatch: Field '%s' was expected to be %s, but was really %s!" % (field, ftype, jsonType))
def validateSchema(self, pdef, formdata, schema = None):
""" Validate (sub)parameters against OpenAPI specs """
# allOf: list of schemas to validate against
if 'allOf' in pdef:
for subdef in pdef['allOf']:
self.validateSchema(subdef, formdata)
where = "JSON body"
# Symbolic link??
if 'schema' in pdef:
schema = pdef['schema']['$ref']
if '$ref' in pdef:
schema = pdef['$ref']
if schema:
# #/foo/bar/baz --> dict['foo']['bar']['baz']
pdef = functools.reduce(operator.getitem, schema.split('/')[1:], self.API)
where = "item matching schema %s" % schema
# Check that all required fields are present
if 'required' in pdef:
for field in pdef['required']:
if not field in formdata:
raise OpenAPIException("OpenAPI mismatch: Missing input field '%s' in %s!" % (field, where))
# Now check for valid format of input data
for field in formdata:
if 'properties' not in pdef or field not in pdef['properties'] :
raise OpenAPIException("Unknown input field '%s' in %s!" % (field, where))
if 'type' not in pdef['properties'][field]:
raise OpenAPIException("OpenAPI mismatch: Field '%s' was found in api.yaml, but no format was specified in specs!" % field)
ftype = pdef['properties'][field]['type']
self.validateType(field, formdata[field], ftype)
# Validate sub-arrays
if ftype == 'array' and 'items' in pdef['properties'][field]:
for item in formdata[field]:
if '$ref' in pdef['properties'][field]['items']:
self.validateSchema(pdef['properties'][field]['items'], item)
self.validateType(field, formdata[field], pdef['properties'][field]['items']['type'])
# Validate sub-hashes
if ftype == 'hash' and 'schema' in pdef['properties'][field]:
self.validateSchema(pdef['properties'][field], formdata[field])
def validateParameters(self, defs, formdata):
def validate(self, method = "GET", path = "/foo", formdata = None):
""" Validate the request method and input data against the OpenAPI specification """
# Make sure we're not dealing with a dynamic URL.
# If we find /foo/{key}, we fold that into the form data
# and process as if it's a json input field for now.
if not self.API['paths'].get(path):
for xpath in self.API['paths']:
pathRE = re.sub(r"\{(.+?)\}", r"(?P<\1>[^/]+)", xpath)
m = re.match(pathRE, path)
if m:
for k,v in m.groupdict().items():
formdata[k] = v
path = xpath
if self.API['paths'].get(path):
defs = self.API['paths'].get(path)
method = method.lower()
if method in defs:
mdefs = defs[method]
if formdata and 'parameters' in mdefs:
self.validateParameters(mdefs['parameters'], formdata)
elif formdata and 'requestBody' not in mdefs:
raise OpenAPIException("OpenAPI mismatch: JSON data is now allowed for this request type")
elif 'requestBody' in mdefs and 'content' in mdefs['requestBody']:
# SHORTCUT: We only care about JSON input for Warble! Disregard other types
if not 'application/json' in mdefs['requestBody']['content']:
raise OpenAPIException ("OpenAPI mismatch: API endpoint accepts input, but no application/json definitions found in api.yaml!")
jdefs = mdefs['requestBody']['content']['application/json']
# Check that required params are here
self.validateSchema(jdefs, formdata)
raise OpenAPIException ("OpenAPI mismatch: Method %s is not registered for this API" % method)
raise OpenAPIException("OpenAPI mismatch: Unknown API path '%s'!" % path)
def dumpExamples(self, pdef, array = False):
schema = None
if 'schema' in pdef:
if 'type' in pdef['schema'] and pdef['schema']['type'] == 'array':
array = True
schema = pdef['schema']['items']['$ref']
schema = pdef['schema']['$ref']
if '$ref' in pdef:
schema = pdef['$ref']
if schema:
# #/foo/bar/baz --> dict['foo']['bar']['baz']
pdef = functools.reduce(operator.getitem, schema.split('/')[1:], self.API)
js = {}
desc = {}
if 'properties' in pdef:
for k, v in pdef['properties'].items():
if 'description' in v:
desc[k] = [v['type'], v['description']]
if 'example' in v:
js[k] = v['example']
elif 'items' in v:
if v['type'] == 'array':
js[k], foo = self.dumpExamples(v['items'], True)
js[k], foo = self.dumpExamples(v['items'])
return [js if not array else [js], desc]
def toHTML(self):
""" Blurps out the specs in a pretty HTML blob """
<!DOCTYPE html>
<html lang="en">
li = "<h3>Overview:</h3><ul style='font-size: 12px; font-family: Open Sans, sans-serif;'>"
for path, spec in sorted(self.API['paths'].items()):
for method, mspec in sorted(spec.items()):
method = method.upper()
summary = mspec.get('summary', 'No summary available')
linkname = "%s%s" % (method.lower(), path.replace('/', '-'))
li += "<li><a href='#%s'>%s %s</a>: %s</li>\n" % (linkname, method, path, summary)
li += "</ul>"
for path, spec in sorted(self.API['paths'].items()):
for method, mspec in sorted(spec.items()):
method = method.upper()
summary = mspec.get('summary', 'No summary available')
resp = ""
inp = ""
inpvars = ""
linkname = "%s%s" % (method.lower(), path.replace('/', '-'))
if 'responses' in mspec:
for code, cresp in sorted(mspec['responses'].items()):
for ctype, pdef in cresp['content'].items():
xjs, desc = self.dumpExamples(pdef)
js = json.dumps(xjs, indent = 4)
resp += "<div style='float: left; width: 90%%;'><pre style='width: 600px;'><b>%s</b>:\n%s</pre>\n</div>\n" % (code, js)
if 'requestBody' in mspec:
for ctype, pdef in mspec['requestBody']['content'].items():
xjs, desc = self.dumpExamples(pdef)
if desc:
for k, v in desc.items():
inpvars += "<kbd><b>%s:</b></kbd> (%s) <span style='font-size: 12px; font-family: Open Sans, sans-serif;'>%s</span><br/>\n" % (k, v[0], v[1])
js = json.dumps(xjs, indent = 4)
inp += "<div style='float: left; width: 90%%;'><h4>Input examples:</h4><blockquote><pre style='width: 600px;'><b>%s</b>:\n%s</pre></blockquote>\n</div>" % (ctype, js)
if inpvars:
inpvars = "<div style='float: left; width: 90%%;'><blockquote><pre style='width: 600px;'>%s</pre>\n</blockquote></div>" % inpvars
<div id="%s" style="margin: 20px; display: flex; box-sizing: border-box; width: 900px; border-radius: 6px; border: 1px solid %s; font-family: sans-serif; background: %s30;">
<div style="min-height: 32px;">
<!-- method -->
<div style="float: left; align-items: center; margin: 4px; border-radius: 5px; text-align: center; padding-top: 4px; height: 20px; width: 100px; color: #FFF; font-weight: bold; background: %s;">%s</div>
<!-- path and summary -->
<span style="display: flex; padding-top: 6px;"><kbd><strong>%s</strong></kbd></span>
<div style="box-sizing: border-box; flex: 1; font-size: 13px; font-family: Open Sans, sans-serif; float: left; padding-top: 6px; margin-left: 20px;">
<div style="float: left; width: 90%%;display: %s; ">
<h4>JSON parameters:</h4>
<div style="float: left; width: 90%%; ">
<h4>Response examples:</h4>
""" % (linkname, mcolors[method], mcolors[method], mcolors[method], method, path, summary, "block" if inp else "none", inpvars, inp, resp))
#print("%s %s: %s" % (method.upper(), path, mspec['summary']))