blob: e030c464de349fbcedde307ebd084b5d7c322b85 [file] [log] [blame]
#!/usr/local/bin/python3
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import struct
from message_types import message_types
from read_values import read_int_value, read_byte_value, call_reader_function
CHUNKED_MESSAGE_HEADER_LENGTH = 17
class ChunkedResponseDecoder:
def __init__(self):
self.reset()
def add_header(self, connection, header):
if len(self.chunked_message) > 2:
raise Exception(
"Previous chunked message is not completed, can't process another header"
)
if len(header) == 2 * CHUNKED_MESSAGE_HEADER_LENGTH:
offset = 0
message_type = ""
(message_type, offset) = call_reader_function(
header, offset, read_int_value
)
self.chunked_message["Type"] = message_types[message_type]
# TODO: pass connection value in as a parameter
self.chunked_message["Connection"] = connection
self.chunked_message["Direction"] = "<---"
(self.chunked_message["Parts"], offset) = call_reader_function(
header, offset, read_int_value
)
(self.chunked_message["TransactionId"], offset) = call_reader_function(
header, offset, read_int_value
)
chunk_size = 0
flags = 0
(chunk_size, offset) = call_reader_function(header, offset, read_int_value)
(flags, offset) = call_reader_function(header, offset, read_byte_value)
self.add_chunk_header(chunk_size, flags)
else:
raise IndexError(
"Chunked message header should be "
+ str(CHUNKED_MESSAGE_HEADER_LENGTH)
+ " bytes"
)
def add_chunk_header(self, chunk_size, flags):
if len(self.chunked_message) == 2:
raise Exception("Can't add chunk header before message header")
#
# Chunked messages can be *really* large, like several tens of
# thousands of chunks for a really huge response. We used to
# report out a list of all the chunk sizes based on all the
# chunk headers, but it presented performance issues and really
# wasn't conveying particularly interesting info. Now, we just
# tally up all the chunk sizes and report the total size of the
# response and the timestamp of the last incoming chunk, both
# of which are potentially much more meaningful than the details
# of each individual chunk.
#
self.chunk_flags = flags
self.chunked_message["ResponseSize"] += chunk_size
self.chunked_message["NumberOfChunks"] += 1
def is_complete_message(self):
return self.chunk_flags & 0x1
def get_decoded_message(self, time_stamp):
# Return a re-ordered dictionary, with Timestamp at the front. This
# makes output consistent with other messages
decoded_message = {"Timestamp": time_stamp}
decoded_message.update(self.chunked_message)
return decoded_message
def reset(self):
self.header = ""
# self.message_body = ""
self.chunked_message = {"ResponseSize": 0, "NumberOfChunks": 0}
self.complete = False
self.chunk_flags = 0xFF