blob: 9450bbb280a1f034ad7eec35b912bc6cc4335629 [file] [log] [blame]
#!/usr/local/bin/python3
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
from decoder_base import DecoderBase
from ds_codes import ds_codes
from modified_utf8 import utf8m_to_utf8s
from connection_types import ConnectionTypes, ConnectionTypeStrings
from read_values import (
read_number_from_hex_string,
read_byte_value,
read_number_from_hex_string,
read_short_value,
read_number_from_hex_string,
read_int_value,
read_long_value,
read_string_value,
read_jmutf8_string_value,
read_number_from_hex_string,
call_reader_function,
)
class HandshakeDecoder(DecoderBase):
def __init__(self, output_queue):
super(HandshakeDecoder, self).__init__(output_queue)
self.separator = ""
self.credentials_types = {
0: "SECURITY_CREDENTIALS_NONE",
1: "SECURITY_CREDENTIALS_NORMAL",
2: "SECURITY_CREDENTIALS_DHENCRYPT",
3: "SECURITY_MULTIUSER_NOTIFICATIONCHANNEL",
}
def is_handshake_trace(self, line):
expression = re.compile(r"Handshake bytes \(\d+\):\s*([0-9|a-f|A-F]+)")
match = expression.search(line)
if match:
return True
else:
return False
def get_handshake_bytes(self, line):
expression = re.compile(r"Handshake bytes \(\d+\):\s*([0-9|a-f|A-F]+)")
match = expression.search(line)
if match:
return match.group(1)
else:
exit(1)
# TODO: Find a handshake that uses a list and implement this function to parse it
def read_list_of_ports(string):
return (1001, 1002, 1003)
def get_client_proxy_address(self, string, offset):
(address_size, offset) = call_reader_function(string, offset, read_byte_value)
result = ""
if int(address_size) == 4:
(octet1, offset) = call_reader_function(string, offset, read_byte_value)
(octet2, offset) = call_reader_function(string, offset, read_byte_value)
(octet3, offset) = call_reader_function(string, offset, read_byte_value)
(octet4, offset) = call_reader_function(string, offset, read_byte_value)
result = (
str(octet1) + "." + str(octet2) + "." + str(octet3) + "." + str(octet4)
)
return (result, offset)
def convert_to_bytes(self, string, length):
result = bytearray()
for i in range(0, length, 2):
byte = int(string[i : i + 2], 16)
result.append(byte)
return result
def read_cacheable_string(self, string, offset):
(dscode, offset) = call_reader_function(string, offset, read_byte_value)
hostname = ""
string_type = ds_codes[dscode]
if string_type == "CacheableString":
(length_high_byte, offset) = call_reader_function(
string, offset, read_byte_value
)
(length_low_byte, offset) = call_reader_function(
string, offset, read_byte_value
)
string_length = (length_high_byte << 8) + length_low_byte
string_bytes = string[offset : offset + string_length * 2]
hostname = utf8m_to_utf8s(
self.convert_to_bytes(string_bytes, string_length * 2)
).decode("utf-8")
offset += string_length * 2
elif string_type == "CacheableStringHuge":
(length_byte_3, offset) = call_reader_function(
string, offset, read_byte_value
)
(length_byte_2, offset) = call_reader_function(
string, offset, read_byte_value
)
(length_byte_1, offset) = call_reader_function(
string, offset, read_byte_value
)
(length_byte_0, offset) = call_reader_function(
string, offset, read_byte_value
)
string_length = (
(length_byte_3 << 24)
+ (length_byte_2 << 16)
+ (length_byte_1 << 8)
+ length_byte_0
)
string_bytes = string[offset : offset + string_length * 2]
hostname = utf8m_to_utf8s(
self.convert_to_bytes(string_bytes, string_length * 2)
).decode("utf-8")
offset += string_length * 2
elif ds_codes[dscode] == "CacheableNullString":
# CacheableNullString is a signal that there's actually nothing here, so just return empty
hostname = ""
else:
hostname = ""
offset -= 2
return (hostname, offset)
def get_client_proxy_role_array_length(self, string, offset):
(maybe_length, bytes_read) = read_byte_value(string, offset)
offset += bytes_read
array_length = 0
if maybe_length == 255:
array_length = -1
elif maybe_length == 254:
array_length = read_short_value(string, offset)
offset += 4
elif maybe_length == 253:
array_length = read_int_value(string, offset)
offset += 8
else:
array_length = maybe_length
return (array_length, offset)
def get_client_proxy_version_ordinal(self, string, offset):
(maybe_ordinal, offset) = call_reader_function(string, offset, read_byte_value)
ordinal = 0
if maybe_ordinal == 255:
(ordinal, offset) = call_reader_function(string, offset, read_short_value)
else:
ordinal = maybe_ordinal
return (ordinal, offset)
def get_credentials_type(self, string, offset):
(credential_type, offset) = call_reader_function(
string, offset, read_byte_value
)
return (self.credentials_types[credential_type], offset)
def get_handshake_info(self, line, handshake_info):
handshake_bytes = self.get_handshake_bytes(line)
(connection_type, offset) = call_reader_function(
handshake_bytes, 0, read_byte_value
)
handshake_info["ConnectionType"] = ConnectionTypeStrings[connection_type]
(handshake_info["VersionOrdinal"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
(handshake_info["ReplyOK"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
ports = ()
if (connection_type == ConnectionTypes.PRIMARY_SERVER_TO_CLIENT) or (
connection_type == ConnectionTypes.SECONDARY_SERVER_TO_CLIENT
):
ports = self.read_list_of_ports(handshake_bytes)
handshake_info["Ports"] = ports
(handshake_info["ReadTimeout"], offset) = call_reader_function(
handshake_bytes, offset, read_int_value
)
(handshake_info["FixedIDByte"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
client_proxy = {}
(client_proxy["MembershipIDByte"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
# TODO: Solve this mystery!
(client_proxy["MysteryByte"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
(client_proxy["FixedIDByte"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
(client_proxy["InternalDistributedMemberID"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
(client_proxy["Address"], offset) = self.get_client_proxy_address(
handshake_bytes, offset
)
(client_proxy["SyncCounter"], offset) = call_reader_function(
handshake_bytes, offset, read_int_value
)
(client_proxy["Hostname"], offset) = self.read_cacheable_string(
handshake_bytes, offset
)
(client_proxy["SplitBrainFlag"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
(client_proxy["DCPort"], offset) = call_reader_function(
handshake_bytes, offset, read_int_value
)
(client_proxy["VPID"], offset) = call_reader_function(
handshake_bytes, offset, read_int_value
)
(client_proxy["VMKind"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
(
client_proxy["RoleArrayLength"],
offset,
) = self.get_client_proxy_role_array_length(handshake_bytes, offset)
(client_proxy["DSName"], offset) = self.read_cacheable_string(
handshake_bytes, offset
)
(client_proxy["UniqueTag"], offset) = self.read_cacheable_string(
handshake_bytes, offset
)
(client_proxy["DurableClientID"], offset) = self.read_cacheable_string(
handshake_bytes, offset
)
client_proxy_durable_client_timeout = 0
if "DurableClientID" in client_proxy:
(client_proxy_durable_client_timeout, offset) = call_reader_function(
handshake_bytes, offset, read_int_value
)
(client_proxy["Version"], offset) = self.get_client_proxy_version_ordinal(
handshake_bytes, offset
)
handshake_info["ClientProxy"] = client_proxy
(handshake_info["ThisValueisAlways1"], offset) = call_reader_function(
handshake_bytes, offset, read_int_value
)
(handshake_info["Overrides"], offset) = call_reader_function(
handshake_bytes, offset, read_byte_value
)
(handshake_info["CredentialsTypeFlag"], offset) = self.get_credentials_type(
handshake_bytes, offset
)
def process_line(self, line):
handshake = {}
if self.is_handshake_trace(line):
self.get_handshake_info(line, handshake)
self.output_queue_.put({"handshake": handshake})