blob: e41298174d6ebfc823d977d635094069191a5b8c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from src.runner.socket.response import Response as NewServerResponse
from src.runner.socket.response import RUNNER_ERROR_CODE
from src.runner.socket.response import RUNNER_SUCCESS_CODE
from src.runner.socket.response import RUNNER_SUCCESS_MESSAGE
class Protocol:
def __init__(self, buffer: bytes = b'', ty: int = 0):
self.__buffer = buffer
self.__type = ty
self.__length = 0
def length(self) -> int:
get buffer length
return self.__length
def type(self) -> int:
get protocol type
return self.__type
def buffer(self) -> bytes:
get buffer data
return self.__buffer
def encode(self) -> NewServerResponse:
encode protocol buffer data
if len(self.__buffer) == 0:
return NewServerResponse(RUNNER_ERROR_CODE, "ERR: send buffer is empty")
response_len = len(self.__buffer)
response_header = response_len.to_bytes(4, byteorder="big")
response_header = bytearray(response_header)
response_header[0] = self.__type
response_header = bytes(response_header)
self.__buffer = response_header + self.__buffer
self.__length = len(self.__buffer)
return NewServerResponse(code=RUNNER_SUCCESS_CODE, message=RUNNER_SUCCESS_MESSAGE)
def decode(self) -> NewServerResponse:
decode protocol buffer data
if len(self.__buffer) == 0:
return NewServerResponse(RUNNER_ERROR_CODE, "ERR: recv buffer is empty")
length = len(self.__buffer)
if length != 4:
return NewServerResponse(RUNNER_ERROR_CODE,
"ERR: recv protocol type length is 4, got %d" % length)
buf = bytearray(self.__buffer)
self.__type = buf[0]
buf[0] = 0
self.__length = int.from_bytes(buf, byteorder="big")
return NewServerResponse(code=RUNNER_SUCCESS_CODE, message=RUNNER_SUCCESS_MESSAGE)