refactor: rpc request processing flow (#9)
diff --git a/.github/workflows/unit-test-ci.yml b/.github/workflows/unit-test-ci.yml
new file mode 100644
index 0000000..00b05d6
--- /dev/null
+++ b/.github/workflows/unit-test-ci.yml
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: Unit Test CI
+
+on:
+ push:
+ branches:
+ - master
+ pull_request:
+ branches:
+ - master
+jobs:
+ run:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: [ 3.6, 3.7, 3.8, 3.9 ]
+ fail-fast: false
+ steps:
+ - name: Checkout source codes
+ uses: actions/checkout@v2
+ with:
+ submodules: true
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v2
+ with:
+ python-version: ${{ matrix.python-version }}
+ - name: Set up dependencies
+ run: make setup
+ - name: Run unit tests
+ run: make test
diff --git a/.gitignore b/.gitignore
index d4d9525..92150d1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
.idea
.vscode
.DS_Store
+.pytest_cache/
+**/__pycache__/
diff --git a/Makefile b/Makefile
index 8ad5cf4..baadfff 100644
--- a/Makefile
+++ b/Makefile
@@ -24,4 +24,4 @@
.PHONY: test
test:
- pytest .
+ python3 -m pytest -v tests
diff --git a/src/__init__.py b/apisix/__init__.py
similarity index 100%
rename from src/__init__.py
rename to apisix/__init__.py
diff --git a/src/main.py b/apisix/main.py
similarity index 80%
rename from src/main.py
rename to apisix/main.py
index bf0a1cb..ad9a7d5 100644
--- a/src/main.py
+++ b/apisix/main.py
@@ -14,11 +14,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import os
+import sys
import click
-from runner.socket.server import Server as A6SocketServer
+
+sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+
+from apisix.runner.server.server import Server as NewServer
RUNNER_VERSION = "0.1.0"
-RUNNER_SOCKET = "/tmp/runner.sock"
+RUNNER_SOCKET = os.getenv("APISIX_LISTEN_ADDRESS", "/tmp/runner.sock")
@click.group()
@@ -31,7 +36,8 @@
@click.option('--debug/--no-debug', help='enable or disable debug, default disable.', default=False)
def start(debug) -> None:
click.echo(debug)
- server = A6SocketServer(RUNNER_SOCKET)
+ click.echo(RUNNER_SOCKET)
+ server = NewServer(RUNNER_SOCKET)
server.receive()
diff --git a/src/plugins/__init__.py b/apisix/plugins/__init__.py
similarity index 100%
rename from src/plugins/__init__.py
rename to apisix/plugins/__init__.py
diff --git a/src/plugins/say.py b/apisix/plugins/say.py
similarity index 73%
rename from src/plugins/say.py
rename to apisix/plugins/say.py
index d64f952..5dd1608 100644
--- a/src/plugins/say.py
+++ b/apisix/plugins/say.py
@@ -14,15 +14,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import runner.plugin.base
-from runner.http.request import Request
-from runner.http.response import Response
+from apisix.runner.plugin.base import Base
+from apisix.runner.http.request import Request
+from apisix.runner.http.response import Response
-class Say(runner.plugin.base.Base):
+class Say(Base):
def __init__(self):
super(Say, self).__init__(self.__class__.__name__)
def filter(self, request: Request, response: Response):
- response.setHeader("X-Resp-A6-Runner", "Python")
- response.setBody("Hello, Python Runner of APISIX")
+ headers = request.headers
+ headers["X-Resp-A6-Runner"] = "Python"
+ response.body = "Hello, Python Runner of APISIX"
+ response.headers = headers
diff --git a/src/runner/__init__.py b/apisix/runner/__init__.py
similarity index 100%
rename from src/runner/__init__.py
rename to apisix/runner/__init__.py
diff --git a/src/__init__.py b/apisix/runner/http/__init__.py
similarity index 100%
copy from src/__init__.py
copy to apisix/runner/http/__init__.py
diff --git a/src/runner/http/method.py b/apisix/runner/http/method.py
similarity index 96%
rename from src/runner/http/method.py
rename to apisix/runner/http/method.py
index 32f0af3..df90f8d 100644
--- a/src/runner/http/method.py
+++ b/apisix/runner/http/method.py
@@ -69,9 +69,9 @@
}
-def getNameByCode(code: int) -> str:
+def get_name_by_code(code: int) -> str:
return methodName.get(code)
-def getCodeByName(name: str) -> int:
+def get_code_by_name(name: str) -> int:
return methodCode.get(name)
diff --git a/src/runner/http/protocol.py b/apisix/runner/http/protocol.py
similarity index 96%
rename from src/runner/http/protocol.py
rename to apisix/runner/http/protocol.py
index 53a356f..57ae400 100644
--- a/src/runner/http/protocol.py
+++ b/apisix/runner/http/protocol.py
@@ -18,9 +18,8 @@
RPC_PREPARE_CONF = 1
RPC_HTTP_REQ_CALL = 2
-RPC_TEST = 127
RPC_UNKNOWN = 0
-def newBuilder():
+def new_builder():
return flatbuffers.Builder(256)
diff --git a/apisix/runner/http/request.py b/apisix/runner/http/request.py
new file mode 100644
index 0000000..844b647
--- /dev/null
+++ b/apisix/runner/http/request.py
@@ -0,0 +1,317 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+from ipaddress import IPv4Address
+from ipaddress import IPv6Address
+import apisix.runner.plugin.core as RunnerPlugin
+import apisix.runner.http.method as RunnerMethod
+from apisix.runner.http.protocol import RPC_HTTP_REQ_CALL
+from apisix.runner.http.protocol import RPC_PREPARE_CONF
+from a6pluginproto.HTTPReqCall import Req as A6HTTPReqCallReq
+from a6pluginproto.PrepareConf import Req as A6PrepareConfReq
+
+
+class Request:
+
+ def __init__(self, ty: int = 0, buf: bytes = b''):
+ """
+ Init and parse request
+ :param ty:
+ rpc request protocol type
+ :param buf:
+ rpc request buffer data
+ """
+ self._rpc_type = ty
+ self._rpc_buf = buf
+ self._req_id = 0
+ self._req_conf_token = 0
+ self._req_method = ""
+ self._req_path = ""
+ self._req_headers = {}
+ self._req_configs = {}
+ self._req_args = {}
+ self._req_src_ip = ""
+ self._init()
+
+ @property
+ def rpc_type(self) -> int:
+ """
+ get request protocol type for request handler
+ :return:
+ """
+ return self._rpc_type
+
+ @rpc_type.setter
+ def rpc_type(self, rpc_type: int) -> None:
+ """
+ set request protocol type for request handler
+ :param rpc_type:
+ :return:
+ """
+ self._rpc_type = rpc_type
+
+ @property
+ def rpc_buf(self) -> bytes:
+ """
+ get request buffer data for request handler
+ :return:
+ """
+ return self._rpc_buf
+
+ @rpc_buf.setter
+ def rpc_buf(self, rpc_buf: bytes) -> None:
+ """
+ set request buffer data for request handler
+ :return:
+ """
+ self._rpc_buf = rpc_buf
+
+ @property
+ def conf_token(self) -> int:
+ """
+ get request token for request handler
+ :return:
+ """
+ return self._req_conf_token
+
+ @conf_token.setter
+ def conf_token(self, req_conf_token: int) -> None:
+ """
+ set request token for request handler
+ :return:
+ """
+ self._req_conf_token = req_conf_token
+
+ @property
+ def id(self) -> int:
+ """
+ get request id for request handler
+ :return:
+ """
+ return self._req_id
+
+ @id.setter
+ def id(self, req_id: int) -> None:
+ """
+ set request id for request handler
+ :return:
+ """
+ self._req_id = req_id
+
+ @property
+ def method(self) -> str:
+ """
+ get request method for request handler
+ :return:
+ """
+ return self._req_method
+
+ @method.setter
+ def method(self, req_method: str) -> None:
+ """
+ set request method for request handler
+ :return:
+ """
+ self._req_method = req_method
+
+ @property
+ def path(self) -> str:
+ """
+ get request path for request handler
+ :return:
+ """
+ return self._req_path
+
+ @path.setter
+ def path(self, req_path: str) -> None:
+ """
+ set request path for request handler
+ :return:
+ """
+ self._req_path = req_path
+
+ @property
+ def headers(self) -> dict:
+ """
+ get request headers for request handler
+ :return:
+ """
+ return self._req_headers
+
+ @headers.setter
+ def headers(self, req_headers: dict) -> None:
+ """
+ set request headers for request handler
+ :return:
+ """
+ self._req_headers = req_headers
+
+ @property
+ def configs(self) -> dict:
+ """
+ get plugin instance and configs for request handler
+ :return:
+ """
+ return self._req_configs
+
+ @configs.setter
+ def configs(self, req_configs: dict) -> None:
+ """
+ set plugin instance and configs for request handler
+ :return:
+ """
+ self._req_configs = req_configs
+
+ @property
+ def args(self) -> dict:
+ """
+ get request args for request handler
+ :return:
+ """
+ return self._req_args
+
+ @args.setter
+ def args(self, req_args: dict) -> None:
+ """
+ set request args for request handler
+ :return:
+ """
+ self._req_args = req_args
+
+ @property
+ def src_ip(self) -> str:
+ """
+ get request source ip address for request handler
+ :return:
+ """
+ return self._req_src_ip
+
+ @src_ip.setter
+ def src_ip(self, req_src_ip: str) -> None:
+ """
+ set request source ip address for request handler
+ :return:
+ """
+ self._req_src_ip = req_src_ip
+
+ def reset(self) -> None:
+ """
+ reset request handler
+ :return:
+ """
+ """
+ reset request class
+ :return:
+ """
+ self._rpc_type = 0
+ self._rpc_buf = b''
+ self._req_id = 0
+ self._req_conf_token = 0
+ self._req_method = ""
+ self._req_path = ""
+ self._req_headers = {}
+ self._req_configs = {}
+ self._req_args = {}
+ self._req_src_ip = ""
+
+ def _parse_src_ip(self, req: A6HTTPReqCallReq) -> None:
+ """
+ parse request source ip address
+ :param req:
+ :return:
+ """
+ if req.SrcIpIsNone():
+ return
+ ip_len = req.SrcIpLength()
+ if ip_len == 0:
+ return
+ ip_arr = bytearray()
+ for i in range(ip_len):
+ ip_arr.append(req.SrcIp(i))
+ ip_byte = bytes(ip_arr)
+
+ if ip_len == 4:
+ self.src_ip = IPv4Address(ip_byte).exploded
+ if ip_len == 16:
+ self.src_ip = IPv6Address(ip_byte).exploded
+
+ def _parse_headers(self, req: A6HTTPReqCallReq) -> None:
+ """
+ parse request headers
+ :param req:
+ :return:
+ """
+ if not req.HeadersIsNone():
+ headers = {}
+ for i in range(req.HeadersLength()):
+ key = str(req.Headers(i).Name(), encoding="UTF-8")
+ val = str(req.Headers(i).Value(), encoding="UTF-8")
+ headers[key] = val
+ self.headers = headers
+
+ def _parse_args(self, req: A6HTTPReqCallReq) -> None:
+ """
+ parse request args
+ :param req:
+ :return:
+ """
+ if not req.ArgsIsNone():
+ args = {}
+ for i in range(req.ArgsLength()):
+ key = str(req.Args(i).Name(), encoding="UTF-8")
+ val = str(req.Args(i).Value(), encoding="UTF-8")
+ args[key] = val
+ self.args = args
+
+ def _parse_configs(self, req: A6PrepareConfReq) -> None:
+ """
+ parse request plugin configs
+ :param req:
+ :return:
+ """
+ if not req.ConfIsNone():
+ plugins = RunnerPlugin.loading()
+ configs = {}
+ for i in range(req.ConfLength()):
+ name = str(req.Conf(i).Name(), encoding="UTF-8").lower()
+ plugin = plugins.get(name)
+ if not plugin:
+ continue
+ value = str(req.Conf(i).Value(), encoding="UTF-8")
+ plugin = plugin()
+ plugin.config = json.loads(value)
+ configs[name] = plugin
+ self.configs = configs
+
+ def _init(self) -> None:
+ """
+ init request handler
+ :return:
+ """
+ if self.rpc_type == RPC_HTTP_REQ_CALL:
+ req = A6HTTPReqCallReq.Req.GetRootAsReq(self.rpc_buf)
+ self.id = req.Id()
+ self.method = RunnerMethod.get_name_by_code(req.Method())
+ self.path = str(req.Path(), encoding="UTF-8")
+ self.conf_token = req.ConfToken()
+ self._parse_src_ip(req)
+ self._parse_headers(req)
+ self._parse_args(req)
+
+ if self.rpc_type == RPC_PREPARE_CONF:
+ req = A6PrepareConfReq.Req.GetRootAsReq(self.rpc_buf)
+ self._parse_configs(req)
diff --git a/apisix/runner/http/response.py b/apisix/runner/http/response.py
new file mode 100644
index 0000000..599ead5
--- /dev/null
+++ b/apisix/runner/http/response.py
@@ -0,0 +1,246 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import annotations
+import flatbuffers
+from a6pluginproto import TextEntry as A6TextEntry
+from a6pluginproto.Err import Resp as A6ErrResp
+from a6pluginproto.HTTPReqCall import Stop as A6HTTPReqCallStop
+from a6pluginproto.HTTPReqCall import Resp as A6HTTPReqCallResp
+from a6pluginproto.HTTPReqCall import Action as A6HTTPReqCallAction
+from a6pluginproto.PrepareConf import Resp as A6PrepareConfResp
+from apisix.runner.http.protocol import new_builder
+from apisix.runner.http.protocol import RPC_PREPARE_CONF
+from apisix.runner.http.protocol import RPC_HTTP_REQ_CALL
+
+RESP_MAX_DATA_SIZE = 2 << 24 - 1
+
+
+class Response:
+
+ def __init__(self, ty: int):
+ """
+ Init and parse request
+ :param ty:
+ rpc request protocol type
+ """
+ self.rpc_type = ty
+ self._resp_id = 0
+ self._resp_token = 0
+ self._resp_body = ""
+ self._resp_headers = {}
+ self._resp_status_code = 0
+ self._resp_error_code = 0
+
+ @property
+ def rpc_type(self) -> int:
+ """
+ get protocol type for response handler
+ :return:
+ """
+ return self._rpc_type
+
+ @rpc_type.setter
+ def rpc_type(self, rpc_type: int) -> None:
+ """
+ set protocol type for response handler
+ :return:
+ """
+ self._rpc_type = rpc_type
+
+ @property
+ def id(self) -> int:
+ """
+ get request id for response handler
+ :return:
+ """
+ return self._resp_id
+
+ @id.setter
+ def id(self, resp_id: int) -> None:
+ """
+ set request id for response handler
+ :return:
+ """
+ self._resp_id = resp_id
+
+ @property
+ def token(self) -> int:
+ """
+ get token for response handler
+ :return:
+ """
+ return self._resp_token
+
+ @token.setter
+ def token(self, resp_token: int) -> None:
+ """
+ set token for response handler
+ :return:
+ """
+ self._resp_token = resp_token
+
+ @property
+ def body(self) -> str:
+ """
+ get body for response handler
+ :return:
+ """
+ return self._resp_body
+
+ @body.setter
+ def body(self, resp_body: str) -> None:
+ """
+ set body for response handler
+ :return:
+ """
+ self._resp_body = resp_body
+
+ @property
+ def headers(self) -> dict:
+ """
+ get headers for response handler
+ :return:
+ """
+ return self._resp_headers
+
+ @headers.setter
+ def headers(self, resp_headers: dict) -> None:
+ """
+ set headers for response handler
+ :return:
+ """
+ self._resp_headers = resp_headers
+
+ @property
+ def status_code(self) -> int:
+ """
+ get status code for response handler
+ :return:
+ """
+ return self._resp_status_code
+
+ @status_code.setter
+ def status_code(self, resp_status_code: int) -> None:
+ """
+ set status code for response handler
+ :return:
+ """
+ self._resp_status_code = resp_status_code
+
+ @property
+ def error_code(self) -> int:
+ """
+ get error code for response handler
+ :return:
+ """
+ return self._resp_error_code
+
+ @error_code.setter
+ def error_code(self, resp_error_code: int = 0) -> None:
+ """
+ set error code for response handler
+ :return:
+ """
+ self._resp_error_code = resp_error_code
+
+ def reset(self) -> None:
+ """
+ reset response handler
+ :return:
+ """
+ self._rpc_type = 0
+ self._resp_id = 0
+ self._resp_token = 0
+ self._resp_body = ""
+ self._resp_headers = {}
+ self._resp_status_code = 0
+ self._resp_error_code = 0
+
+ def changed(self) -> bool:
+ """
+ check response handler is change
+ :return:
+ """
+ if self.body or self.headers or self.status_code or self.token or self.error_code:
+ return True
+ else:
+ return False
+
+ def flatbuffers(self) -> flatbuffers.Builder:
+ """
+ response to flat buffer object
+ :return:
+ """
+ builder = new_builder()
+ rpc_type = self.rpc_type
+
+ if rpc_type == RPC_PREPARE_CONF:
+ A6PrepareConfResp.Start(builder)
+ A6PrepareConfResp.AddConfToken(builder, self.token)
+ res = A6PrepareConfResp.End(builder)
+ builder.Finish(res)
+ elif rpc_type == RPC_HTTP_REQ_CALL:
+ header_vector = None
+ headers = self.headers
+ if headers:
+ headerEntries = []
+ for hk in headers:
+ hv = headers[hk]
+ hkb = builder.CreateString(hk)
+ hvb = builder.CreateString(hv)
+ A6TextEntry.Start(builder)
+ A6TextEntry.AddName(builder, hkb)
+ A6TextEntry.AddValue(builder, hvb)
+ headerEntry = A6TextEntry.End(builder)
+ headerEntries.append(headerEntry)
+
+ headerSize = len(headerEntries)
+ A6HTTPReqCallStop.StartHeadersVector(builder, headerSize)
+ for i in range(headerSize - 1, -1, -1):
+ builder.PrependUOffsetTRelative(headerEntries[i])
+ header_vector = builder.EndVector()
+
+ body_vector = None
+ body = self.body
+ if body:
+ body = body.encode(encoding="UTF-8")
+ body_vector = builder.CreateByteVector(body)
+
+ status_code = self.status_code
+ A6HTTPReqCallStop.Start(builder)
+ if status_code == 0:
+ A6HTTPReqCallStop.AddStatus(builder, 200)
+ else:
+ A6HTTPReqCallStop.AddStatus(builder, status_code)
+ if header_vector:
+ A6HTTPReqCallStop.AddHeaders(builder, header_vector)
+ if body_vector:
+ A6HTTPReqCallStop.AddBody(builder, body_vector)
+ stop = A6HTTPReqCallStop.End(builder)
+
+ A6HTTPReqCallResp.Start(builder)
+ A6HTTPReqCallResp.AddId(builder, self.id)
+ A6HTTPReqCallResp.AddActionType(builder, A6HTTPReqCallAction.Action.Stop)
+ A6HTTPReqCallResp.AddAction(builder, stop)
+ res = A6HTTPReqCallResp.End(builder)
+ builder.Finish(res)
+ else:
+ A6ErrResp.Start(builder)
+ A6ErrResp.AddCode(builder, self.error_code)
+ res = A6ErrResp.End(builder)
+ builder.Finish(res)
+ return builder
diff --git a/src/__init__.py b/apisix/runner/plugin/__init__.py
similarity index 100%
copy from src/__init__.py
copy to apisix/runner/plugin/__init__.py
diff --git a/src/runner/plugin/base.py b/apisix/runner/plugin/base.py
similarity index 62%
rename from src/runner/plugin/base.py
rename to apisix/runner/plugin/base.py
index 7a8d334..fe6e134 100644
--- a/src/runner/plugin/base.py
+++ b/apisix/runner/plugin/base.py
@@ -14,26 +14,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import json
-
-class Base(object):
+class Base:
def __init__(self, name: str):
- self.pluginName = name
- self.pluginConfig = {}
+ """
+ plugin base class
+ :param name:
+ instance plugin name
+ """
+ self._name = name
+ self._config = {}
+ @property
def name(self) -> str:
- return self.pluginName
+ return self._name
+ @name.setter
+ def name(self, name: str) -> None:
+ self._name = name
+
+ @property
def config(self) -> dict:
- return self.pluginConfig
+ return self._config
- def setConfig(self, config: dict):
- if config and isinstance(config, str):
- try:
- conf = json.loads(config)
- except ValueError:
- self.pluginConfig = {}
- else:
- self.pluginConfig = conf
- return self
+ @config.setter
+ def config(self, config: dict) -> None:
+ if config and isinstance(config, dict):
+ self._config = config
+ else:
+ self._config = {}
diff --git a/src/runner/plugin/cache.py b/apisix/runner/plugin/cache.py
similarity index 90%
rename from src/runner/plugin/cache.py
rename to apisix/runner/plugin/cache.py
index d21665c..30d5c60 100644
--- a/src/runner/plugin/cache.py
+++ b/apisix/runner/plugin/cache.py
@@ -20,19 +20,19 @@
RUNNER_CACHE_ENTRY = "RUNNER:CACHE:ENTRY"
-def generateToken() -> int:
+def generate_token() -> int:
token = cache.get(RUNNER_CACHE_TOKEN, 0)
token = token + 1
cache.update(RUNNER_CACHE_TOKEN, token)
return token
-def setConfigByToken(token: int, configs: dict) -> bool:
+def set_config_by_token(token: int, configs: dict) -> bool:
cache_key = "%s:%s" % (RUNNER_CACHE_ENTRY, token)
cache.update(cache_key, configs)
return cache.has(cache_key)
-def getConfigByToken(token: int):
+def get_config_by_token(token: int):
cache_key = "%s:%s" % (RUNNER_CACHE_ENTRY, token)
return cache.get(cache_key, {})
diff --git a/src/runner/plugin/loading.py b/apisix/runner/plugin/core.py
similarity index 63%
rename from src/runner/plugin/loading.py
rename to apisix/runner/plugin/core.py
index 2b9702f..4ebe848 100644
--- a/src/runner/plugin/loading.py
+++ b/apisix/runner/plugin/core.py
@@ -14,18 +14,32 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import os
import importlib
from pkgutil import iter_modules
-def instances() -> dict:
- modules = iter_modules(__import__("plugins").__path__)
+def filter(configs: dict, request, response) -> None:
+ for name in configs:
+ plugin = configs.get(name)
+ if not plugin:
+ print("ERR: plugin `%s` undefined." % name)
+ continue
+ try:
+ plugin.filter(request, response)
+ finally:
+ print("ERR: plugin `%s` filter execute failure" % name)
+
+
+def loading() -> dict:
+ path = "%s/plugins" % os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+ modules = iter_modules(path=[path])
plugins = {}
for loader, moduleName, _ in modules:
classNameConversion = list(map(lambda name: name.capitalize(), moduleName.split("_")))
className = "".join(classNameConversion)
- classInstance = getattr(importlib.import_module("plugins.%s" % moduleName), className)
+ classInstance = getattr(importlib.import_module("apisix.plugins.%s" % moduleName), className)
plugins[str(moduleName).lower()] = classInstance
return plugins
diff --git a/src/runner/__init__.py b/apisix/runner/server/__init__.py
similarity index 100%
copy from src/runner/__init__.py
copy to apisix/runner/server/__init__.py
diff --git a/apisix/runner/server/handle.py b/apisix/runner/server/handle.py
new file mode 100644
index 0000000..7c05fb5
--- /dev/null
+++ b/apisix/runner/server/handle.py
@@ -0,0 +1,137 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import annotations
+from a6pluginproto.Err import Code as A6ErrCode
+import apisix.runner.plugin.core as RunnerPlugin
+import apisix.runner.plugin.cache as RunnerCache
+from apisix.runner.http.response import Response as NewHttpResponse
+from apisix.runner.http.response import RESP_MAX_DATA_SIZE
+from apisix.runner.http.request import Request as NewHttpRequest
+from apisix.runner.server.response import Response as NewServerResponse
+from apisix.runner.server.response import RUNNER_SUCCESS_CODE
+from apisix.runner.server.response import RUNNER_SUCCESS_MESSAGE
+from apisix.runner.http.protocol import RPC_PREPARE_CONF
+from apisix.runner.http.protocol import RPC_HTTP_REQ_CALL
+from apisix.runner.http.protocol import RPC_UNKNOWN
+
+
+class Handle:
+
+ def __init__(self, ty: int = 0, buf: bytes = b'', debug: bool = False):
+ """
+ Init Python runner server
+ :param ty:
+ rpc request protocol type
+ :param buf:
+ rpc request buffer data
+ :param debug:
+ enable debug mode
+ """
+ self.type = ty
+ self.buffer = buf
+ self.debug = debug
+
+ @property
+ def type(self) -> int:
+ return self._type
+
+ @type.setter
+ def type(self, ty: int = 0) -> None:
+ self._type = ty
+
+ @property
+ def buffer(self) -> bytes:
+ return self._buffer
+
+ @buffer.setter
+ def buffer(self, buf: bytes = b'') -> None:
+ self._buffer = buf
+
+ @property
+ def debug(self) -> bool:
+ return self._debug
+
+ @debug.setter
+ def debug(self, debug: bool = False) -> None:
+ self._debug = debug
+
+ def _rpc_config(self) -> NewServerResponse:
+ # init request
+ req = NewHttpRequest(RPC_PREPARE_CONF, self.buffer)
+ # generate token
+ token = RunnerCache.generate_token()
+ # get plugins config
+ configs = req.configs
+ # cache plugins config
+ ok = RunnerCache.set_config_by_token(token, configs)
+ if not ok:
+ return NewServerResponse(code=A6ErrCode.Code.SERVICE_UNAVAILABLE, message="cache token failure")
+ # init response
+ resp = NewHttpResponse(RPC_PREPARE_CONF)
+ resp.token = token
+ response = resp.flatbuffers()
+
+ return NewServerResponse(code=RUNNER_SUCCESS_CODE, message=RUNNER_SUCCESS_MESSAGE, data=response.Output(),
+ ty=self.type)
+
+ def _rpc_call(self) -> NewServerResponse:
+ # init request
+ req = NewHttpRequest(RPC_HTTP_REQ_CALL, self.buffer)
+ # get request token
+ token = req.conf_token
+ # get plugins
+ configs = RunnerCache.get_config_by_token(token)
+ if len(configs) == 0:
+ return NewServerResponse(code=A6ErrCode.Code.CONF_TOKEN_NOT_FOUND, message="cache token not found")
+ # init response
+ resp = NewHttpResponse(RPC_HTTP_REQ_CALL)
+ # execute plugins
+ RunnerPlugin.filter(configs, req, resp)
+
+ response = resp.flatbuffers()
+ return NewServerResponse(code=RUNNER_SUCCESS_CODE, message=RUNNER_SUCCESS_MESSAGE, data=response.Output(),
+ ty=self.type)
+
+ @staticmethod
+ def _rpc_unknown(err_code: int = 0) -> NewServerResponse:
+ resp = NewHttpResponse(RPC_UNKNOWN)
+ resp.error_code = err_code
+ response = resp.flatbuffers()
+ return NewServerResponse(code=RUNNER_SUCCESS_CODE, message="OK", data=response.Output(),
+ ty=RPC_UNKNOWN)
+
+ def dispatch(self) -> NewServerResponse:
+ resp = None
+
+ if self.type == RPC_PREPARE_CONF:
+ resp = self._rpc_config()
+
+ if self.type == RPC_HTTP_REQ_CALL:
+ resp = self._rpc_call()
+
+ if not resp:
+ return self._rpc_unknown()
+
+ size = len(resp.data)
+ if (size > RESP_MAX_DATA_SIZE or size <= 0) and resp.code == 200:
+ resp = NewServerResponse(A6ErrCode.Code.SERVICE_UNAVAILABLE,
+ "the max length of data is %d but got %d" % (
+ RESP_MAX_DATA_SIZE, size))
+ if resp.code != 200:
+ print("ERR: %s" % resp.message)
+ resp = self._rpc_unknown(resp.code)
+ return resp
diff --git a/src/runner/socket/protocol.py b/apisix/runner/server/protocol.py
similarity index 90%
rename from src/runner/socket/protocol.py
rename to apisix/runner/server/protocol.py
index e412981..b2e3fc8 100644
--- a/src/runner/socket/protocol.py
+++ b/apisix/runner/server/protocol.py
@@ -14,10 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-from src.runner.socket.response import Response as NewServerResponse
-from src.runner.socket.response import RUNNER_ERROR_CODE
-from src.runner.socket.response import RUNNER_SUCCESS_CODE
-from src.runner.socket.response import RUNNER_SUCCESS_MESSAGE
+from apisix.runner.server.response import Response as NewServerResponse
+from apisix.runner.server.response import RUNNER_ERROR_CODE
+from apisix.runner.server.response import RUNNER_SUCCESS_CODE
+from apisix.runner.server.response import RUNNER_SUCCESS_MESSAGE
class Protocol:
diff --git a/src/runner/socket/response.py b/apisix/runner/server/response.py
similarity index 100%
rename from src/runner/socket/response.py
rename to apisix/runner/server/response.py
diff --git a/apisix/runner/server/server.py b/apisix/runner/server/server.py
new file mode 100644
index 0000000..2ad741e
--- /dev/null
+++ b/apisix/runner/server/server.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import socket
+from threading import Thread as NewThread
+from apisix.runner.server.handle import Handle as NewServerHandle
+from apisix.runner.server.protocol import Protocol as NewServerProtocol
+
+
+def _threaded(conn: socket, debug: bool):
+ while True:
+ buffer = conn.recv(4)
+ protocol = NewServerProtocol(buffer, 0)
+ err = protocol.decode()
+ if err.code != 200:
+ print(err.message)
+ break
+
+ buffer = conn.recv(protocol.length)
+ handler = NewServerHandle(protocol.type, buffer)
+ response = handler.dispatch()
+
+ protocol = NewServerProtocol(response.data, response.type)
+ protocol.encode()
+ response = protocol.buffer
+
+ err = conn.sendall(response)
+ if err:
+ print(err)
+ break
+
+ conn.close()
+
+
+class Server:
+ def __init__(self, fd: str, debug: bool = False):
+ self.fd = fd.replace("unix:", "")
+ self.debug = debug
+ if os.path.exists(self.fd):
+ os.remove(self.fd)
+ self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.sock.bind(self.fd)
+ self.sock.listen(1024)
+ print("listening on unix:%s" % self.fd)
+
+ def receive(self):
+ while True:
+ conn, address = self.sock.accept()
+
+ NewThread(target=_threaded, args=(conn, self.debug)).start()
+
+ def __del__(self):
+ self.sock.close()
+ os.remove(self.fd)
+ print("Bye")
diff --git a/pytest.ini b/pytest.ini
index d207ff6..607c293 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -15,6 +15,6 @@
; limitations under the License.
;
[pytest]
-addopts = -vs -p no:warnings
+addopts = -vs -p no:warnings --disable-socket
testpaths = tests
python_files = test_*
diff --git a/src/runner/http/__init__.py b/src/runner/http/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/src/runner/http/__init__.py
+++ /dev/null
diff --git a/src/runner/http/request.py b/src/runner/http/request.py
deleted file mode 100644
index f202657..0000000
--- a/src/runner/http/request.py
+++ /dev/null
@@ -1,133 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import json
-import runner.http.method as RunnerHttpMethod
-import runner.http.protocol as RunnerHttpProtocol
-import runner.plugin.loading as RunnerPluginLoading
-from a6pluginproto.HTTPReqCall import Req as A6HTTPReqCallReq
-from a6pluginproto.PrepareConf import Req as A6PrepareConfReq
-
-
-class Request(object):
-
- def __init__(self, rpc_type: int, buf: bytes):
- self.request = {}
- self.setRpcType(rpc_type)
- self.initRequest(buf)
-
- def setRpcType(self, rpc_type: int) -> None:
- self.request["rpc_type"] = rpc_type
-
- def getRpcType(self) -> int:
- return self.request.get("rpc_type", 0)
-
- def setConfToken(self, conf_token: int) -> None:
- self.request["conf_token"] = conf_token
-
- def getConfToken(self) -> int:
- return self.request.get("conf_token", 0)
-
- def getId(self) -> int:
- return self.request.get("id", 0)
-
- def setId(self, id: int) -> None:
- self.request["id"] = id
-
- def getMethod(self) -> str:
- return self.request.get("method", "")
-
- def setMethod(self, method: str) -> None:
- self.request["method"] = method
-
- def getPath(self) -> str:
- return self.request.get("path", "")
-
- def setPath(self, path: str) -> None:
- self.request["path"] = path
-
- def setHeaders(self, headers: dict) -> None:
- self.request["headers"] = headers
-
- def getHeaders(self) -> dict:
- return self.request.get("headers", {})
-
- def setConfigs(self, headers: dict) -> None:
- self.request["configs"] = headers
-
- def getConfigs(self) -> dict:
- return self.request.get("configs", {})
-
- def setArgs(self, args: dict) -> None:
- self.request["args"] = args
-
- def getArgs(self) -> dict:
- return self.request.get("args", {})
-
- def getSourceIP(self) -> str:
- return self.request.get("src_ip", "")
-
- def setSourceIP(self, ip: str) -> None:
- self.request["src_ip"] = ip
-
- def initRequest(self, buf: bytes) -> None:
- if self.getRpcType() == RunnerHttpProtocol.RPC_HTTP_REQ_CALL:
- req = A6HTTPReqCallReq.Req.GetRootAsReq(buf)
- self.setId(req.Id())
- self.setMethod(RunnerHttpMethod.getNameByCode(req.Method()))
- self.setPath(str(req.Path(), encoding="UTF-8"))
- self.setConfToken(req.ConfToken())
-
- if not req.SrcIpIsNone():
- delimiter = "."
- if req.SrcIpLength() > 4:
- delimiter = ":"
- ipAddress = []
- for i in range(req.SrcIpLength()):
- ipAddress.append(str(req.SrcIp(i)))
- self.setSourceIP(delimiter.join(ipAddress))
-
- if not req.HeadersIsNone():
- headers = {}
- for i in range(req.HeadersLength()):
- key = str(req.Headers(i).Name(), encoding="UTF-8")
- val = str(req.Headers(i).Value(), encoding="UTF-8")
- headers[key] = val
- self.setHeaders(headers)
-
- if not req.ArgsIsNone():
- args = {}
- for i in range(req.ArgsLength()):
- key = str(req.Args(i).Name(), encoding="UTF-8")
- val = str(req.Args(i).Value(), encoding="UTF-8")
- args[key] = val
- self.setArgs(args)
-
- if self.getRpcType() == RunnerHttpProtocol.RPC_PREPARE_CONF:
- req = A6PrepareConfReq.Req.GetRootAsReq(buf)
- plugins = RunnerPluginLoading.instances()
- configs = {}
- if not req.ConfIsNone():
- for i in range(req.ConfLength()):
- name = str(req.Conf(i).Name(), encoding="UTF-8").lower()
- plugin = plugins.get(name)
- if not plugin:
- continue
- value = str(req.Conf(i).Value(), encoding="UTF-8")
- plugin = plugin()
- plugin.setConfig(json.loads(value))
- configs[name] = plugin
- self.setConfigs(configs)
diff --git a/src/runner/http/response.py b/src/runner/http/response.py
deleted file mode 100644
index b3d1968..0000000
--- a/src/runner/http/response.py
+++ /dev/null
@@ -1,161 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import annotations
-import flatbuffers
-import runner.http.protocol as RunnerHttpProtocol
-from a6pluginproto import TextEntry as A6TextEntry
-from a6pluginproto.HTTPReqCall import Stop as A6HTTPReqCallStop
-from a6pluginproto.HTTPReqCall import Resp as A6HTTPReqCallResp
-from a6pluginproto.HTTPReqCall import Action as A6HTTPReqCallAction
-from a6pluginproto.PrepareConf import Resp as A6PrepareConfResp
-from a6pluginproto.Err import Resp as A6ErrResp
-from a6pluginproto.Err import Code as A6ErrCode
-
-RESP_MAX_DATA_SIZE = 2 << 24 - 1
-
-
-class Response(object):
-
- def __init__(self, rpc_type: int):
- self.response = {}
- self.initResponse()
- self.setRpcType(rpc_type)
-
- def setRpcType(self, rpc_type: int) -> Response:
- self.response["rpc_type"] = rpc_type
- return self
-
- def getRpcType(self) -> int:
- return self.response.get("rpc_type", 0)
-
- def setId(self, id: int) -> Response:
- self.response["id"] = id
- return self
-
- def getId(self) -> int:
- return self.response.get("id", 0)
-
- def setToken(self, token: int) -> Response:
- self.response["token"] = token
- return self
-
- def getToken(self) -> int:
- return self.response.get("token", 0)
-
- def setBody(self, value: str) -> Response:
- self.response["body"] = value
- return self
-
- def getBody(self) -> str:
- return self.response.get("body", "")
-
- def getHeaders(self) -> dict:
- return self.response.get("headers", {})
-
- def setHeader(self, key: str, value: str) -> Response:
- if not self.response.get("headers", None):
- self.response["headers"] = {}
- self.response["headers"][key] = value
- return self
-
- def getHeader(self, key: str) -> str:
- if not self.response.get("headers", None):
- return ""
- return self.response.get("headers").get(key, "")
-
- def getStatusCode(self) -> int:
- return self.response.get("statusCode", 0)
-
- def setStatusCode(self, code: int) -> Response:
- self.response["statusCode"] = code
- return self
-
- def resetResponse(self) -> None:
- self.initResponse()
-
- def initResponse(self) -> None:
- self.response = {
- "body": "",
- "headers": {},
- "statusCode": 0,
- }
-
- def responseHasChange(self) -> bool:
- return self.response.get("body") or \
- self.response.get("headers") or \
- self.response.get("statusCode")
-
- def responseToFlatBuffers(self) -> flatbuffers.Builder:
- rpcType = self.getRpcType()
- builder = RunnerHttpProtocol.newBuilder()
-
- if rpcType == RunnerHttpProtocol.RPC_PREPARE_CONF:
- A6PrepareConfResp.Start(builder)
- A6PrepareConfResp.AddConfToken(builder, self.getToken())
- res = A6PrepareConfResp.End(builder)
- builder.Finish(res)
- elif rpcType == RunnerHttpProtocol.RPC_HTTP_REQ_CALL:
- headerVector = None
- headers = self.getHeaders()
- if headers:
- headerEntries = []
- for hk in headers:
- hv = headers[hk]
- hkb = builder.CreateString(hk)
- hvb = builder.CreateString(hv)
- A6TextEntry.Start(builder)
- A6TextEntry.AddName(builder, hkb)
- A6TextEntry.AddValue(builder, hvb)
- headerEntry = A6TextEntry.End(builder)
- headerEntries.append(headerEntry)
-
- headerSize = len(headerEntries)
- A6HTTPReqCallStop.StartHeadersVector(builder, headerSize)
- for i in range(headerSize - 1, -1, -1):
- builder.PrependUOffsetTRelative(headerEntries[i])
- headerVector = builder.EndVector()
-
- bodyVector = None
- body = self.getBody()
- if body:
- body = body.encode(encoding="UTF-8")
- bodyVector = builder.CreateByteVector(body)
-
- statusCode = self.getStatusCode()
- A6HTTPReqCallStop.Start(builder)
- if statusCode == 0:
- A6HTTPReqCallStop.AddStatus(builder, 200)
- else:
- A6HTTPReqCallStop.AddStatus(builder, statusCode)
- if headerVector:
- A6HTTPReqCallStop.AddHeaders(builder, headerVector)
- if bodyVector:
- A6HTTPReqCallStop.AddBody(builder, bodyVector)
- stop = A6HTTPReqCallStop.End(builder)
-
- A6HTTPReqCallResp.Start(builder)
- A6HTTPReqCallResp.AddId(builder, self.getId())
- A6HTTPReqCallResp.AddActionType(builder, A6HTTPReqCallAction.Action.Stop)
- A6HTTPReqCallResp.AddAction(builder, stop)
- res = A6HTTPReqCallResp.End(builder)
- builder.Finish(res)
- else:
- A6ErrResp.Start(builder)
- A6ErrResp.AddCode(builder, A6ErrCode.Code.BAD_REQUEST)
- res = A6ErrResp.End(builder)
- builder.Finish(res)
- return builder
diff --git a/src/runner/plugin/__init__.py b/src/runner/plugin/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/src/runner/plugin/__init__.py
+++ /dev/null
diff --git a/src/runner/plugin/execute.py b/src/runner/plugin/execute.py
deleted file mode 100644
index a31f511..0000000
--- a/src/runner/plugin/execute.py
+++ /dev/null
@@ -1,30 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from runner.http.request import Request
-from runner.http.response import Response
-
-
-def executeFilter(configs: dict, request: Request, response: Response):
- for name in configs:
- plugin = configs.get(name)
- if not plugin:
- print("plugin undefined.")
- continue
- try:
- plugin.filter(request, response)
- except Exception as e:
- print(e)
diff --git a/src/runner/socket/__init__.py b/src/runner/socket/__init__.py
deleted file mode 100644
index b1312a0..0000000
--- a/src/runner/socket/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
diff --git a/src/runner/socket/handle.py b/src/runner/socket/handle.py
deleted file mode 100644
index 14942ce..0000000
--- a/src/runner/socket/handle.py
+++ /dev/null
@@ -1,78 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from a6pluginproto.Err import Resp as A6ErrResp
-from a6pluginproto.Err import Code as A6ErrCode
-import runner.plugin.cache as RunnerPluginCache
-import runner.plugin.execute as RunnerPluginExecute
-import runner.http.request as RunnerHttpRequest
-import runner.http.response as RunnerHttpResponse
-import runner.http.protocol as RunnerHttpProtocol
-
-
-class Handle:
-
- def __init__(self, req_type, req_data):
- self.req_type = req_type
- self.req_data = req_data
-
- def RpcPrepareConf(self):
- # init request
- req = RunnerHttpRequest.Request(RunnerHttpProtocol.RPC_PREPARE_CONF, self.req_data)
- # generate token
- token = RunnerPluginCache.generateToken()
- # get plugins config
- configs = req.getConfigs()
- # cache plugins config
- RunnerPluginCache.setConfigByToken(token, configs)
- # init response
- reps = RunnerHttpResponse.Response(RunnerHttpProtocol.RPC_PREPARE_CONF)
-
- return reps.setToken(token).responseToFlatBuffers()
-
- def RpcHttpReqCall(self):
- # init request
- req = RunnerHttpRequest.Request(RunnerHttpProtocol.RPC_HTTP_REQ_CALL, self.req_data)
- # get request token
- token = req.getConfToken()
- # get plugins
- configs = RunnerPluginCache.getConfigByToken(token)
- # init response
- reps = RunnerHttpResponse.Response(RunnerHttpProtocol.RPC_HTTP_REQ_CALL)
- # execute plugins
- RunnerPluginExecute.executeFilter(configs, req, reps)
-
- return reps.responseToFlatBuffers()
-
- def RpcTest(self):
- pass
-
- def RpcUnknown(self):
- builder = RunnerHttpProtocol.newBuilder()
- A6ErrResp.Start(builder)
- A6ErrResp.AddCode(builder, A6ErrCode.Code.BAD_REQUEST)
- res = A6ErrResp.End(builder)
- builder.Finish(res)
- return builder
-
- def dispatch(self):
- handler = {
- RunnerHttpProtocol.RPC_UNKNOWN: self.RpcUnknown,
- RunnerHttpProtocol.RPC_TEST: self.RpcTest,
- RunnerHttpProtocol.RPC_PREPARE_CONF: self.RpcPrepareConf,
- RunnerHttpProtocol.RPC_HTTP_REQ_CALL: self.RpcHttpReqCall,
- }
- return {"type": self.req_type, "data": handler.get(self.req_type, self.RpcUnknown)().Output()}
diff --git a/src/runner/socket/server.py b/src/runner/socket/server.py
deleted file mode 100644
index 37e66d4..0000000
--- a/src/runner/socket/server.py
+++ /dev/null
@@ -1,105 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-import os
-import socket
-from _thread import start_new_thread
-from runner.socket.handle import Handle as A6ServerHandle
-
-
-def runner_protocol_decode(buf):
- """
- decode for runner protocol
- :param buf:
- :return:
- """
- if not buf:
- return None, "runner protocol undefined."
- if len(buf) != 4:
- return None, "runner protocol invalid."
-
- buf = bytearray(buf)
- # request buf type
- buf_type = buf[0]
- buf[0] = 0
- # request buf length
- buf_len = int.from_bytes(buf, byteorder="big")
- return {"type": buf_type, "len": buf_len}, None
-
-
-def runner_protocol_encode(reps_type, reps_data):
- """
- encode for runner protocol
- :param reps_type:
- :param reps_data:
- :return:
- """
- reps_len = len(reps_data)
- reps_header = reps_len.to_bytes(4, byteorder="big")
- reps_header = bytearray(reps_header)
- reps_header[0] = reps_type
- reps_header = bytes(reps_header)
- return reps_header + reps_data
-
-
-def threaded(conn):
- while True:
- header_buf = conn.recv(4)
- protocol, err = runner_protocol_decode(header_buf)
- if err:
- print(err)
- break
-
- # rpc request type
- req_type = protocol.get("type")
- # rpc request length
- req_len = protocol.get("len")
-
- req_data = conn.recv(req_len)
-
- rpc_handler = A6ServerHandle(req_type, req_data)
- response = rpc_handler.dispatch()
-
- reps_type = response.get("type")
- reps_data = response.get("data")
- reps = runner_protocol_encode(reps_type, reps_data)
-
- err = conn.sendall(reps)
- if err:
- print(err)
- break
- conn.close()
-
-
-class Server:
- def __init__(self, socket_address):
- if os.path.exists(socket_address):
- os.remove(socket_address)
- self.socket_address = socket_address
- self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.sock.bind(socket_address)
- self.sock.listen(1024)
- print("listening on unix:%s" % socket_address)
-
- def receive(self):
- while True:
- conn, address = self.sock.accept()
-
- start_new_thread(threaded, (conn,))
-
- def __del__(self):
- self.sock.close()
- print("Bye")
diff --git a/src/__init__.py b/tests/runner/http/__init__.py
similarity index 100%
copy from src/__init__.py
copy to tests/runner/http/__init__.py
diff --git a/tests/runner/http/test_method.py b/tests/runner/http/test_method.py
new file mode 100644
index 0000000..388efb0
--- /dev/null
+++ b/tests/runner/http/test_method.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import apisix.runner.http.method as RunnerMethod
+from a6pluginproto import Method as A6Method
+
+
+def test_get_name_by_code():
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodGET) == A6Method.Method.GET
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodHEAD) == A6Method.Method.HEAD
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodPOST) == A6Method.Method.POST
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodPUT) == A6Method.Method.PUT
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodDELETE) == A6Method.Method.DELETE
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodMKCOL) == A6Method.Method.MKCOL
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodCOPY) == A6Method.Method.COPY
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodMOVE) == A6Method.Method.MOVE
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodOPTIONS) == A6Method.Method.OPTIONS
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodPROPFIND) == A6Method.Method.PROPFIND
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodPROPPATCH) == A6Method.Method.PROPPATCH
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodLOCK) == A6Method.Method.LOCK
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodUNLOCK) == A6Method.Method.UNLOCK
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodPATCH) == A6Method.Method.PATCH
+ assert RunnerMethod.get_code_by_name(RunnerMethod.A6MethodTRACE) == A6Method.Method.TRACE
+
+
+def test_get_code_by_name():
+ assert RunnerMethod.get_name_by_code(A6Method.Method.GET) == RunnerMethod.A6MethodGET
+ assert RunnerMethod.get_name_by_code(A6Method.Method.HEAD) == RunnerMethod.A6MethodHEAD
+ assert RunnerMethod.get_name_by_code(A6Method.Method.POST) == RunnerMethod.A6MethodPOST
+ assert RunnerMethod.get_name_by_code(A6Method.Method.PUT) == RunnerMethod.A6MethodPUT
+ assert RunnerMethod.get_name_by_code(A6Method.Method.DELETE) == RunnerMethod.A6MethodDELETE
+ assert RunnerMethod.get_name_by_code(A6Method.Method.MKCOL) == RunnerMethod.A6MethodMKCOL
+ assert RunnerMethod.get_name_by_code(A6Method.Method.COPY) == RunnerMethod.A6MethodCOPY
+ assert RunnerMethod.get_name_by_code(A6Method.Method.MOVE) == RunnerMethod.A6MethodMOVE
+ assert RunnerMethod.get_name_by_code(A6Method.Method.OPTIONS) == RunnerMethod.A6MethodOPTIONS
+ assert RunnerMethod.get_name_by_code(A6Method.Method.PROPFIND) == RunnerMethod.A6MethodPROPFIND
+ assert RunnerMethod.get_name_by_code(A6Method.Method.PROPPATCH) == RunnerMethod.A6MethodPROPPATCH
+ assert RunnerMethod.get_name_by_code(A6Method.Method.LOCK) == RunnerMethod.A6MethodLOCK
+ assert RunnerMethod.get_name_by_code(A6Method.Method.UNLOCK) == RunnerMethod.A6MethodUNLOCK
+ assert RunnerMethod.get_name_by_code(A6Method.Method.PATCH) == RunnerMethod.A6MethodPATCH
+ assert RunnerMethod.get_name_by_code(A6Method.Method.TRACE) == RunnerMethod.A6MethodTRACE
diff --git a/src/runner/http/protocol.py b/tests/runner/http/test_protocol.py
similarity index 74%
copy from src/runner/http/protocol.py
copy to tests/runner/http/test_protocol.py
index 53a356f..be4764e 100644
--- a/src/runner/http/protocol.py
+++ b/tests/runner/http/test_protocol.py
@@ -14,13 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
import flatbuffers
-
-RPC_PREPARE_CONF = 1
-RPC_HTTP_REQ_CALL = 2
-RPC_TEST = 127
-RPC_UNKNOWN = 0
+from apisix.runner.http.protocol import new_builder
-def newBuilder():
- return flatbuffers.Builder(256)
+def test_new_builder():
+ builder = new_builder()
+ assert isinstance(builder, flatbuffers.Builder)
+ assert builder.Bytes == flatbuffers.Builder(256).Bytes
+ assert builder.Bytes != flatbuffers.Builder(512).Bytes
diff --git a/tests/runner/http/test_request.py b/tests/runner/http/test_request.py
new file mode 100644
index 0000000..506e965
--- /dev/null
+++ b/tests/runner/http/test_request.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from apisix.runner.http.request import Request as NewHttpRequest
+from apisix.runner.http.protocol import RPC_PREPARE_CONF
+from apisix.runner.http.protocol import RPC_HTTP_REQ_CALL
+from apisix.runner.http.protocol import new_builder
+from apisix.runner.http.method import get_name_by_code
+from a6pluginproto.HTTPReqCall import Req as A6HTTPReqCallReq
+from a6pluginproto.PrepareConf import Req as A6PrepareConfReq
+from a6pluginproto import TextEntry as A6TextEntry
+from a6pluginproto import Method as A6Method
+
+
+def test_request_config():
+ builder = new_builder()
+ name = builder.CreateString("say")
+ value = builder.CreateString('{"body":"Hello Python Runner"}')
+ A6TextEntry.Start(builder)
+ A6TextEntry.AddName(builder, name)
+ A6TextEntry.AddValue(builder, value)
+ conf_data = A6TextEntry.End(builder)
+
+ A6PrepareConfReq.ReqStartConfVector(builder, 1)
+ builder.PrependUOffsetTRelative(conf_data)
+ conf = builder.EndVector()
+
+ A6PrepareConfReq.Start(builder)
+ A6PrepareConfReq.AddConf(builder, conf)
+ req = A6PrepareConfReq.End(builder)
+ builder.Finish(req)
+ buf = builder.Output()
+ req = NewHttpRequest(ty=RPC_PREPARE_CONF, buf=buf)
+ assert req.configs
+ assert len(req.configs) >= 1
+
+
+def test_request_call():
+ req_path = "/hello/python/runner"
+ req_src_ip = [127, 0, 0, 1]
+ req_args = {"a": "args"}
+ req_headers = {"h": "headers"}
+
+ builder = new_builder()
+ path = builder.CreateString(req_path)
+ src_ip = bytes(bytearray(req_src_ip))
+ src_ip = builder.CreateByteVector(src_ip)
+
+ arg_k = builder.CreateString("a")
+ arg_v = builder.CreateString(req_args.get("a"))
+ A6TextEntry.Start(builder)
+ A6TextEntry.AddName(builder, arg_k)
+ A6TextEntry.AddValue(builder, arg_v)
+ args = A6TextEntry.End(builder)
+ A6HTTPReqCallReq.StartArgsVector(builder, 1)
+ builder.PrependUOffsetTRelative(args)
+ args_vec = builder.EndVector()
+
+ head_k = builder.CreateString("h")
+ head_v = builder.CreateString(req_headers.get("h"))
+ A6TextEntry.Start(builder)
+ A6TextEntry.AddName(builder, head_k)
+ A6TextEntry.AddValue(builder, head_v)
+ headers = A6TextEntry.End(builder)
+ A6HTTPReqCallReq.StartHeadersVector(builder, 1)
+ builder.PrependUOffsetTRelative(headers)
+ headers_vec = builder.EndVector()
+
+ A6HTTPReqCallReq.Start(builder)
+ A6HTTPReqCallReq.AddId(builder, 1)
+ A6HTTPReqCallReq.AddMethod(builder, A6Method.Method.GET)
+ A6HTTPReqCallReq.AddPath(builder, path)
+ A6HTTPReqCallReq.AddSrcIp(builder, src_ip)
+ A6HTTPReqCallReq.AddArgs(builder, args_vec)
+ A6HTTPReqCallReq.AddHeaders(builder, headers_vec)
+ req = A6HTTPReqCallReq.End(builder)
+ builder.Finish(req)
+ buf = builder.Output()
+ req = NewHttpRequest(ty=RPC_HTTP_REQ_CALL, buf=buf)
+
+ assert req.src_ip == ".".join('%s' % ip for ip in req_src_ip)
+ assert req.path == req_path
+ assert req.args.get("a") == req_args.get("a")
+ assert req.headers.get("h") == req_headers.get("h")
+ assert req.method == get_name_by_code(A6Method.Method.GET)
diff --git a/tests/runner/http/test_response.py b/tests/runner/http/test_response.py
new file mode 100644
index 0000000..d076add
--- /dev/null
+++ b/tests/runner/http/test_response.py
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from apisix.runner.http.response import Response as NewHttpResponse
+from apisix.runner.http.protocol import RPC_HTTP_REQ_CALL
+from apisix.runner.http.protocol import RPC_PREPARE_CONF
+from apisix.runner.http.protocol import RPC_UNKNOWN
+from a6pluginproto.PrepareConf.Resp import Resp as PrepareConfResp
+from a6pluginproto.HTTPReqCall.Resp import Resp as HTTPReqCallResp
+from a6pluginproto.HTTPReqCall.Action import Action as HTTPReqCallAction
+from a6pluginproto.HTTPReqCall.Stop import Stop as HTTPReqCallStop
+from a6pluginproto.HTTPReqCall.Rewrite import Rewrite as HTTPReqCallRewrite
+from a6pluginproto.Err.Code import Code as A6ErrCode
+from a6pluginproto.Err.Resp import Resp as A6ErrResp
+
+
+def test_response_config():
+ token = 1
+ resp = NewHttpResponse(ty=RPC_PREPARE_CONF)
+ resp.token = token
+ response = resp.flatbuffers()
+ flat_resp = PrepareConfResp.GetRootAs(response.Output())
+ assert resp.changed()
+ assert flat_resp.ConfToken() == token
+
+
+def test_response_call():
+ headers = {
+ "X-TEST-HELLO": "hello",
+ "X-TEST-WORLD": "world"
+ }
+ args = {
+ "A-TEST-HELLO": "hello",
+ "A-TEST-WORLD": "world",
+ }
+ body = "hello world"
+ resp = NewHttpResponse(ty=RPC_HTTP_REQ_CALL)
+ resp.headers = headers
+ resp.body = body
+ response = resp.flatbuffers()
+ flat_resp = HTTPReqCallResp.GetRootAs(response.Output())
+ assert resp.changed()
+ if flat_resp.ActionType() == HTTPReqCallAction.Stop:
+ action = flat_resp.Action()
+ stop = HTTPReqCallStop()
+ stop.Init(action.Bytes, action.Pos)
+ body_list = []
+ body_len = stop.BodyLength()
+ for i in range(body_len):
+ body_list.append(chr(stop.Body(i)))
+ assert "".join(body_list) == body
+ header_dict = {}
+ header_len = stop.HeadersLength()
+ for j in range(header_len):
+ entry = stop.Headers(j)
+ hk = str(entry.Name(), encoding="utf-8")
+ hv = str(entry.Value(), encoding="utf-8")
+ header_dict[hk] = hv
+ assert header_dict.get("X-TEST-HELLO") == headers.get("X-TEST-HELLO")
+ assert header_dict.get("X-TEST-WORLD") == headers.get("X-TEST-WORLD")
+ if flat_resp.ActionType() == HTTPReqCallAction.Rewrite:
+ action = flat_resp.Action()
+ rewrite = HTTPReqCallRewrite()
+ rewrite.Init(action.Bytes, action.Pos)
+ args_dict = {}
+ args_len = rewrite.ArgsLength()
+ for k in range(args_len):
+ entry = rewrite.Args(k)
+ ak = str(entry.Name(), encoding="utf-8")
+ av = str(entry.Value(), encoding="utf-8")
+ args_dict[ak] = av
+ assert args_dict.get("A-TEST-HELLO") == args.get("A-TEST-HELLO")
+ assert args_dict.get("A-TEST-WORLD") == args.get("A-TEST-WORLD")
+ header_dict = {}
+ header_len = rewrite.HeadersLength()
+ for j in range(header_len):
+ entry = rewrite.Headers(j)
+ hk = str(entry.Name(), encoding="utf-8")
+ hv = str(entry.Value(), encoding="utf-8")
+ header_dict[hk] = hv
+ assert header_dict.get("X-TEST-HELLO") == headers.get("X-TEST-HELLO")
+ assert header_dict.get("X-TEST-WORLD") == headers.get("X-TEST-WORLD")
+
+
+def test_response_unknown():
+ resp = NewHttpResponse(ty=RPC_UNKNOWN)
+ resp.error_code = A6ErrCode.BAD_REQUEST
+ response = resp.flatbuffers()
+ flat_resp = A6ErrResp.GetRootAs(response.Output())
+ assert flat_resp.Code() == A6ErrCode.BAD_REQUEST
diff --git a/src/__init__.py b/tests/runner/plugin/__init__.py
similarity index 100%
copy from src/__init__.py
copy to tests/runner/plugin/__init__.py
diff --git a/src/plugins/say.py b/tests/runner/plugin/test_base.py
similarity index 63%
copy from src/plugins/say.py
copy to tests/runner/plugin/test_base.py
index d64f952..a5f814d 100644
--- a/src/plugins/say.py
+++ b/tests/runner/plugin/test_base.py
@@ -14,15 +14,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import runner.plugin.base
-from runner.http.request import Request
-from runner.http.response import Response
+
+from apisix.runner.plugin.base import Base
-class Say(runner.plugin.base.Base):
- def __init__(self):
- super(Say, self).__init__(self.__class__.__name__)
+def test_base():
+ hello_name = "hello"
+ hello_config = {"body": "apisix"}
+ hello = Base(hello_name)
+ hello.config = hello_config
+ assert hello.name == hello_name
+ assert hello.config == hello_config
- def filter(self, request: Request, response: Response):
- response.setHeader("X-Resp-A6-Runner", "Python")
- response.setBody("Hello, Python Runner of APISIX")
+ world_name = "world"
+ world_config = "apisxi"
+ world = Base(world_name)
+ world.config = world_config
+ assert world.name == world_name
+ assert world.config != world_config
diff --git a/src/plugins/say.py b/tests/runner/plugin/test_cache.py
similarity index 62%
copy from src/plugins/say.py
copy to tests/runner/plugin/test_cache.py
index d64f952..91d4435 100644
--- a/src/plugins/say.py
+++ b/tests/runner/plugin/test_cache.py
@@ -14,15 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import runner.plugin.base
-from runner.http.request import Request
-from runner.http.response import Response
+
+from apisix.runner.plugin.cache import generate_token
+from apisix.runner.plugin.cache import get_config_by_token
+from apisix.runner.plugin.cache import set_config_by_token
-class Say(runner.plugin.base.Base):
- def __init__(self):
- super(Say, self).__init__(self.__class__.__name__)
-
- def filter(self, request: Request, response: Response):
- response.setHeader("X-Resp-A6-Runner", "Python")
- response.setBody("Hello, Python Runner of APISIX")
+def test_cache():
+ cache_config = {"hello": "world"}
+ token = generate_token()
+ config = get_config_by_token(token)
+ assert not config
+ ok = set_config_by_token(token, cache_config)
+ assert ok
+ config = get_config_by_token(token)
+ assert config == cache_config
diff --git a/src/__init__.py b/tests/runner/plugin/test_core.py
similarity index 100%
copy from src/__init__.py
copy to tests/runner/plugin/test_core.py
diff --git a/src/runner/__init__.py b/tests/runner/server/__init__.py
similarity index 100%
copy from src/runner/__init__.py
copy to tests/runner/server/__init__.py
diff --git a/tests/runner/server/test_handle.py b/tests/runner/server/test_handle.py
new file mode 100644
index 0000000..56ede96
--- /dev/null
+++ b/tests/runner/server/test_handle.py
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from apisix.runner.server.handle import Handle as NewServerHandle
+from apisix.runner.http.protocol import RPC_PREPARE_CONF
+from apisix.runner.http.protocol import RPC_HTTP_REQ_CALL
+from apisix.runner.http.protocol import RPC_UNKNOWN
+from apisix.runner.http.protocol import new_builder
+from apisix.runner.server.response import RUNNER_SUCCESS_CODE
+from apisix.runner.server.response import RUNNER_SUCCESS_MESSAGE
+from a6pluginproto.HTTPReqCall import Req as A6HTTPReqCallReq
+from a6pluginproto.PrepareConf import Req as A6PrepareConfReq
+from a6pluginproto.PrepareConf import Resp as A6PrepareConfResp
+from a6pluginproto import TextEntry as A6TextEntry
+from a6pluginproto import Method as A6Method
+
+
+def test_type():
+ handle = NewServerHandle(ty=RPC_UNKNOWN)
+ assert handle.type == RPC_UNKNOWN
+ handle = NewServerHandle(ty=RPC_PREPARE_CONF)
+ assert handle.type == RPC_PREPARE_CONF
+ handle = NewServerHandle(ty=RPC_HTTP_REQ_CALL)
+ assert handle.type == RPC_HTTP_REQ_CALL
+
+
+def test_buffer():
+ handle = NewServerHandle(buf="Hello Python Runner".encode())
+ assert handle.buffer == b"Hello Python Runner"
+
+
+def test_debug():
+ handle = NewServerHandle(debug=False)
+ assert not handle.debug
+ handle = NewServerHandle(debug=True)
+ assert handle.debug
+
+
+def test_dispatch_config():
+ builder = new_builder()
+ name = builder.CreateString("say")
+ value = builder.CreateString('{"body":"Hello Python Runner"}')
+ A6TextEntry.Start(builder)
+ A6TextEntry.AddName(builder, name)
+ A6TextEntry.AddValue(builder, value)
+ conf_data = A6TextEntry.End(builder)
+
+ A6PrepareConfReq.ReqStartConfVector(builder, 1)
+ builder.PrependUOffsetTRelative(conf_data)
+ conf = builder.EndVector()
+
+ A6PrepareConfReq.Start(builder)
+ A6PrepareConfReq.AddConf(builder, conf)
+ req = A6PrepareConfReq.End(builder)
+ builder.Finish(req)
+ buf = builder.Output()
+ handle = NewServerHandle(ty=RPC_PREPARE_CONF, buf=buf)
+ response = handle.dispatch()
+ resp = A6PrepareConfResp.Resp.GetRootAs(response.data)
+ assert response.code == RUNNER_SUCCESS_CODE
+ assert response.message == RUNNER_SUCCESS_MESSAGE
+ assert response.type == RPC_PREPARE_CONF
+ assert resp.ConfToken() != 0
+
+
+def test_dispatch_call():
+ builder = new_builder()
+ # request path
+ path = builder.CreateString("/hello/python/runner")
+ # request ip
+ src_ip = bytes(bytearray([127, 0, 0, 1]))
+ src_ip = builder.CreateByteVector(src_ip)
+ # request args
+ arg_k = builder.CreateString("hello")
+ arg_v = builder.CreateString("world")
+ A6TextEntry.Start(builder)
+ A6TextEntry.AddName(builder, arg_k)
+ A6TextEntry.AddValue(builder, arg_v)
+ args = A6TextEntry.End(builder)
+ A6HTTPReqCallReq.StartArgsVector(builder, 1)
+ builder.PrependUOffsetTRelative(args)
+ args_vec = builder.EndVector()
+ # request headers
+ head_k = builder.CreateString("hello")
+ head_v = builder.CreateString("world")
+ A6TextEntry.Start(builder)
+ A6TextEntry.AddName(builder, head_k)
+ A6TextEntry.AddValue(builder, head_v)
+ headers = A6TextEntry.End(builder)
+ A6HTTPReqCallReq.StartHeadersVector(builder, 1)
+ builder.PrependUOffsetTRelative(headers)
+ headers_vec = builder.EndVector()
+
+ A6HTTPReqCallReq.Start(builder)
+ A6HTTPReqCallReq.AddId(builder, 1)
+ A6HTTPReqCallReq.AddMethod(builder, A6Method.Method.GET)
+ A6HTTPReqCallReq.AddPath(builder, path)
+ A6HTTPReqCallReq.AddSrcIp(builder, src_ip)
+ A6HTTPReqCallReq.AddArgs(builder, args_vec)
+ A6HTTPReqCallReq.AddHeaders(builder, headers_vec)
+ req = A6HTTPReqCallReq.End(builder)
+ builder.Finish(req)
+ buf = builder.Output()
+
+ handle = NewServerHandle(ty=RPC_HTTP_REQ_CALL, buf=buf)
+ response = handle.dispatch()
+ assert response.code == RUNNER_SUCCESS_CODE
+ assert response.message == RUNNER_SUCCESS_MESSAGE
+ assert response.type == RPC_UNKNOWN
+
+
+def test_dispatch_unknown():
+ handle = NewServerHandle(ty=RPC_UNKNOWN)
+ response = handle.dispatch()
+ assert response.code == RUNNER_SUCCESS_CODE
+ assert response.message == RUNNER_SUCCESS_MESSAGE
+ assert response.type == RPC_UNKNOWN
diff --git a/tests/runner/server/test_protocol.py b/tests/runner/server/test_protocol.py
new file mode 100644
index 0000000..019f653
--- /dev/null
+++ b/tests/runner/server/test_protocol.py
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from apisix.runner.server.protocol import Protocol as NewServerProtocol
+from apisix.runner.http.protocol import RPC_PREPARE_CONF
+from apisix.runner.server.response import RUNNER_SUCCESS_CODE
+from apisix.runner.server.response import RUNNER_SUCCESS_MESSAGE
+
+
+def test_protocol_encode():
+ buf_str = "Hello Python Runner".encode()
+ protocol = NewServerProtocol(buffer=buf_str, ty=RPC_PREPARE_CONF)
+ err = protocol.encode()
+ buf_len = len(buf_str)
+ buf_arr = bytearray(buf_len.to_bytes(4, byteorder="big"))
+ buf_arr[0] = RPC_PREPARE_CONF
+ buf_data = bytes(buf_arr) + buf_str
+ buf_len = len(buf_data)
+ assert err.code == RUNNER_SUCCESS_CODE
+ assert err.message == RUNNER_SUCCESS_MESSAGE
+ assert protocol.type == RPC_PREPARE_CONF
+ assert protocol.buffer == buf_data
+ assert protocol.length == buf_len
+
+
+def test_protocol_decode():
+ buf_str = "Hello Python Runner".encode()
+ buf_len = len(buf_str)
+ buf_arr = bytearray(buf_len.to_bytes(4, byteorder="big"))
+ buf_arr[0] = RPC_PREPARE_CONF
+ buf_data = bytes(buf_arr)
+ protocol = NewServerProtocol(buffer=buf_data)
+ err = protocol.decode()
+ assert err.code == RUNNER_SUCCESS_CODE
+ assert err.message == RUNNER_SUCCESS_MESSAGE
+ assert protocol.type == RPC_PREPARE_CONF
+ assert protocol.length == buf_len
diff --git a/tests/runner/server/test_response.py b/tests/runner/server/test_response.py
new file mode 100644
index 0000000..1dd8ee1
--- /dev/null
+++ b/tests/runner/server/test_response.py
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from apisix.runner.server.response import Response as NewServerResponse
+from apisix.runner.server.response import RUNNER_ERROR_CODE
+from apisix.runner.server.response import RUNNER_SUCCESS_CODE
+from apisix.runner.http.protocol import RPC_PREPARE_CONF
+from apisix.runner.http.protocol import RPC_HTTP_REQ_CALL
+from apisix.runner.http.protocol import RPC_UNKNOWN
+
+
+def test_response_code():
+ response = NewServerResponse(code=RUNNER_SUCCESS_CODE)
+ assert response.code == RUNNER_SUCCESS_CODE
+ error = NewServerResponse(code=RUNNER_ERROR_CODE)
+ assert error.code == RUNNER_ERROR_CODE
+
+
+def test_response_message():
+ response = NewServerResponse(message="Hello Python Runner")
+ assert response.message == "Hello Python Runner"
+
+
+def test_response_data():
+ response = NewServerResponse(data="Hello Python Runner".encode())
+ assert response.data == b'Hello Python Runner'
+
+
+def test_response_type():
+ response = NewServerResponse(ty=RPC_UNKNOWN)
+ assert response.type == RPC_UNKNOWN
+ response = NewServerResponse(ty=RPC_PREPARE_CONF)
+ assert response.type == RPC_PREPARE_CONF
+ response = NewServerResponse(ty=RPC_HTTP_REQ_CALL)
+ assert response.type == RPC_HTTP_REQ_CALL
+
+
+def test_response_eq():
+ resp1 = NewServerResponse(code=RUNNER_SUCCESS_CODE, message="Hello Python Runner",
+ data="Hello Python Runner".encode(), ty=RPC_PREPARE_CONF)
+ resp2 = NewServerResponse(code=RUNNER_ERROR_CODE, message="Hello Python Runner",
+ data="Hello Python Runner".encode(), ty=RPC_PREPARE_CONF)
+ resp3 = NewServerResponse(code=RUNNER_SUCCESS_CODE, message="Hello Python Runner",
+ data="Hello Python Runner".encode(), ty=RPC_PREPARE_CONF)
+ assert resp1 != resp2
+ assert resp1 == resp3
diff --git a/tests/runner/socket/test_protocol.py b/tests/runner/socket/test_protocol.py
deleted file mode 100644
index 7c1a3c6..0000000
--- a/tests/runner/socket/test_protocol.py
+++ /dev/null
@@ -1,34 +0,0 @@
-from src.runner.socket.protocol import Protocol as NewServerProtocol
-from src.runner.http.protocol import RPC_PREPARE_CONF
-from src.runner.socket.response import RUNNER_SUCCESS_CODE
-from src.runner.socket.response import RUNNER_SUCCESS_MESSAGE
-
-
-def test_protocol_encode():
- buf_str = "Hello Python Runner".encode()
- protocol = NewServerProtocol(buffer=buf_str, ty=RPC_PREPARE_CONF)
- err = protocol.encode()
- buf_len = len(buf_str)
- buf_arr = bytearray(buf_len.to_bytes(4, byteorder="big"))
- buf_arr[0] = RPC_PREPARE_CONF
- buf_data = bytes(buf_arr) + buf_str
- buf_len = len(buf_data)
- assert err.code == RUNNER_SUCCESS_CODE
- assert err.message == RUNNER_SUCCESS_MESSAGE
- assert protocol.type == RPC_PREPARE_CONF
- assert protocol.buffer == buf_data
- assert protocol.length == buf_len
-
-
-def test_protocol_decode():
- buf_str = "Hello Python Runner".encode()
- buf_len = len(buf_str)
- buf_arr = bytearray(buf_len.to_bytes(4, byteorder="big"))
- buf_arr[0] = RPC_PREPARE_CONF
- buf_data = bytes(buf_arr)
- protocol = NewServerProtocol(buffer=buf_data)
- err = protocol.decode()
- assert err.code == RUNNER_SUCCESS_CODE
- assert err.message == RUNNER_SUCCESS_MESSAGE
- assert protocol.type == RPC_PREPARE_CONF
- assert protocol.length == buf_len
diff --git a/tests/runner/socket/test_response.py b/tests/runner/socket/test_response.py
deleted file mode 100644
index dc9e88f..0000000
--- a/tests/runner/socket/test_response.py
+++ /dev/null
@@ -1,43 +0,0 @@
-from src.runner.socket.response import Response as NewServerResponse
-from src.runner.socket.response import RUNNER_ERROR_CODE
-from src.runner.socket.response import RUNNER_SUCCESS_CODE
-from src.runner.http.protocol import RPC_PREPARE_CONF
-from src.runner.http.protocol import RPC_HTTP_REQ_CALL
-from src.runner.http.protocol import RPC_UNKNOWN
-
-
-def test_response_code():
- response = NewServerResponse(code=RUNNER_SUCCESS_CODE)
- assert response.code == RUNNER_SUCCESS_CODE
- error = NewServerResponse(code=RUNNER_ERROR_CODE)
- assert error.code == RUNNER_ERROR_CODE
-
-
-def test_response_message():
- response = NewServerResponse(message="Hello Python Runner")
- assert response.message == "Hello Python Runner"
-
-
-def test_response_data():
- response = NewServerResponse(data="Hello Python Runner".encode())
- assert response.data == b'Hello Python Runner'
-
-
-def test_response_type():
- response = NewServerResponse(ty=RPC_UNKNOWN)
- assert response.type == RPC_UNKNOWN
- response = NewServerResponse(ty=RPC_PREPARE_CONF)
- assert response.type == RPC_PREPARE_CONF
- response = NewServerResponse(ty=RPC_HTTP_REQ_CALL)
- assert response.type == RPC_HTTP_REQ_CALL
-
-
-def test_response_eq():
- resp1 = NewServerResponse(code=RUNNER_SUCCESS_CODE, message="Hello Python Runner",
- data="Hello Python Runner".encode(), ty=RPC_PREPARE_CONF)
- resp2 = NewServerResponse(code=RUNNER_ERROR_CODE, message="Hello Python Runner",
- data="Hello Python Runner".encode(), ty=RPC_PREPARE_CONF)
- resp3 = NewServerResponse(code=RUNNER_SUCCESS_CODE, message="Hello Python Runner",
- data="Hello Python Runner".encode(), ty=RPC_PREPARE_CONF)
- assert resp1 != resp2
- assert resp1 == resp3