blob: 33ece3502f8d5ef4ab14cadd49507a3e161f02ed [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from config import KafkaConfig
from kafkatest.services.kafka import config_property
from kafkatest.services.kafka.version import TRUNK
from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.services.security.minikdc import MiniKdc
import json
import re
import signal
import subprocess
import time
import os.path
import collections
Port = collections.namedtuple('Port', ['name', 'number', 'open'])
class KafkaService(JmxMixin, Service):
PERSISTENT_ROOT = "/mnt"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "kafka.log")
STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "kafka.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "kafka-log4j.properties")
# Logs such as controller.log, server.log, etc all go here
OPERATIONAL_LOG_DIR = os.path.join(PERSISTENT_ROOT, "kafka-operational-logs")
OPERATIONAL_LOG_INFO_DIR = os.path.join(OPERATIONAL_LOG_DIR, "info")
OPERATIONAL_LOG_DEBUG_DIR = os.path.join(OPERATIONAL_LOG_DIR, "debug")
# Kafka log segments etc go here
DATA_LOG_DIR = os.path.join(PERSISTENT_ROOT, "kafka-data-logs")
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
# Kafka Authorizer
SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
logs = {
"kafka_operational_logs_info": {
"path": OPERATIONAL_LOG_INFO_DIR,
"collect_default": True},
"kafka_operational_logs_debug": {
"path": OPERATIONAL_LOG_DEBUG_DIR,
"collect_default": False},
"kafka_data": {
"path": DATA_LOG_DIR,
"collect_default": False}
}
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None,
jmx_attributes=[], zk_connect_timeout=5000):
"""
:type context
:type zk: ZookeeperService
:type topics: dict
"""
Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
self.zk = zk
self.quota_config = quota_config
self.security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol
self.sasl_mechanism = sasl_mechanism
self.topics = topics
self.minikdc = None
self.authorizer_class_name = authorizer_class_name
#
# In a heavily loaded and not very fast machine, it is
# sometimes necessary to give more time for the zk client
# to have its session established, especially if the client
# is authenticating and waiting for the SaslAuthenticated
# in addition to the SyncConnected event.
#
# The defaut value for zookeeper.connect.timeout.ms is
# 2 seconds and here we increase it to 5 seconds, but
# it can be overriden by setting the corresponding parameter
# for this constructor.
self.zk_connect_timeout = zk_connect_timeout
self.port_mappings = {
'PLAINTEXT': Port('PLAINTEXT', 9092, False),
'SSL': Port('SSL', 9093, False),
'SASL_PLAINTEXT': Port('SASL_PLAINTEXT', 9094, False),
'SASL_SSL': Port('SASL_SSL', 9095, False)
}
for node in self.nodes:
node.version = version
node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
@property
def security_config(self):
return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, zk_sasl = self.zk.zk_sasl , sasl_mechanism=self.sasl_mechanism)
def open_port(self, protocol):
self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True)
def close_port(self, protocol):
self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=False)
def start_minikdc(self, add_principals=""):
if self.security_config.has_sasl:
if self.minikdc is None:
self.minikdc = MiniKdc(self.context, self.nodes, extra_principals = add_principals)
self.minikdc.start()
else:
self.minikdc = None
def start(self, add_principals=""):
self.open_port(self.security_protocol)
self.open_port(self.interbroker_security_protocol)
self.start_minikdc(add_principals)
Service.start(self)
# Create topics if necessary
if self.topics is not None:
for topic, topic_cfg in self.topics.items():
if topic_cfg is None:
topic_cfg = {}
topic_cfg["topic"] = topic
self.create_topic(topic_cfg)
def set_protocol_and_port(self, node):
listeners = []
advertised_listeners = []
for protocol in self.port_mappings:
port = self.port_mappings[protocol]
if port.open:
listeners.append(port.name + "://:" + str(port.number))
advertised_listeners.append(port.name + "://" + node.account.hostname + ":" + str(port.number))
self.listeners = ','.join(listeners)
self.advertised_listeners = ','.join(advertised_listeners)
def prop_file(self, node):
cfg = KafkaConfig(**node.config)
cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
cfg[config_property.ZOOKEEPER_CONNECT] = self.zk.connect_setting()
self.set_protocol_and_port(node)
# TODO - clean up duplicate configuration logic
prop_file = cfg.render()
prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
security_config=self.security_config,
interbroker_security_protocol=self.interbroker_security_protocol,
sasl_mechanism=self.sasl_mechanism)
return prop_file
def start_cmd(self, node):
cmd = "export JMX_PORT=%d; " % self.jmx_port
cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG
cmd += "export KAFKA_OPTS=%s; " % self.security_config.kafka_opts
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh %s 1>> %s 2>> %s &" % (KafkaService.CONFIG_FILE, KafkaService.STDOUT_CAPTURE, KafkaService.STDERR_CAPTURE)
return cmd
def start_node(self, node):
prop_file = self.prop_file(node)
self.logger.info("kafka.properties:")
self.logger.info(prop_file)
node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))
self.security_config.setup_node(node)
cmd = self.start_cmd(node)
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
with node.account.monitor_log(KafkaService.STDOUT_CAPTURE) as monitor:
node.account.ssh(cmd)
monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
self.start_jmx_tool(self.idx(node), node)
if len(self.pids(node)) == 0:
raise Exception("No process ids recorded on node %s" % str(node))
def pids(self, node):
"""Return process ids associated with running processes on the given node."""
try:
cmd = "ps ax | grep -i kafka | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
return []
def signal_node(self, node, sig=signal.SIGTERM):
pids = self.pids(node)
for pid in pids:
node.account.signal(pid, sig)
def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
leader = self.leader(topic, partition)
self.signal_node(leader, sig)
def stop_node(self, node, clean_shutdown=True):
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
node.account.signal(pid, sig, allow_fail=False)
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=20, err_msg="Kafka node failed to stop")
def clean_node(self, node):
JmxMixin.clean_node(self, node)
self.security_config.clean_node(node)
node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/*", allow_fail=False)
def create_topic(self, topic_cfg, node=None):
"""Run the admin tool create topic command.
Specifying node is optional, and may be done if for different kafka nodes have different versions,
and we care where command gets run.
If the node is not specified, run the command from self.nodes[0]
"""
if node is None:
node = self.nodes[0]
self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
cmd = "/opt/%s/bin/kafka-topics.sh " % kafka_dir(node)
cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
'zk_connect': self.zk.connect_setting(),
'topic': topic_cfg.get("topic"),
'partitions': topic_cfg.get('partitions', 1),
'replication': topic_cfg.get('replication-factor', 1)
}
if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None:
for config_name, config_value in topic_cfg["configs"].items():
cmd += " --config %s=%s" % (config_name, str(config_value))
self.logger.info("Running topic creation command...\n%s" % cmd)
node.account.ssh(cmd)
time.sleep(1)
self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
self.logger.info(line)
def describe_topic(self, topic, node=None):
if node is None:
node = self.nodes[0]
cmd = "/opt/%s/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
(kafka_dir(node), self.zk.connect_setting(), topic)
output = ""
for line in node.account.ssh_capture(cmd):
output += line
return output
def alter_message_format(self, topic, msg_format_version, node=None):
if node is None:
node = self.nodes[0]
self.logger.info("Altering message format version for topic %s with format %s", topic, msg_format_version)
cmd = "/opt/%s/bin/kafka-configs.sh --zookeeper %s --entity-name %s --entity-type topics --alter --add-config message.format.version=%s" % \
(kafka_dir(node), self.zk.connect_setting(), topic, msg_format_version)
self.logger.info("Running alter message format command...\n%s" % cmd)
node.account.ssh(cmd)
def parse_describe_topic(self, topic_description):
"""Parse output of kafka-topics.sh --describe (or describe_topic() method above), which is a string of form
PartitionCount:2\tReplicationFactor:2\tConfigs:
Topic: test_topic\ttPartition: 0\tLeader: 3\tReplicas: 3,1\tIsr: 3,1
Topic: test_topic\tPartition: 1\tLeader: 1\tReplicas: 1,2\tIsr: 1,2
into a dictionary structure appropriate for use with reassign-partitions tool:
{
"partitions": [
{"topic": "test_topic", "partition": 0, "replicas": [3, 1]},
{"topic": "test_topic", "partition": 1, "replicas": [1, 2]}
]
}
"""
lines = map(lambda x: x.strip(), topic_description.split("\n"))
partitions = []
for line in lines:
m = re.match(".*Leader:.*", line)
if m is None:
continue
fields = line.split("\t")
# ["Partition: 4", "Leader: 0"] -> ["4", "0"]
fields = map(lambda x: x.split(" ")[1], fields)
partitions.append(
{"topic": fields[0],
"partition": int(fields[1]),
"replicas": map(int, fields[3].split(','))})
return {"partitions": partitions}
def verify_reassign_partitions(self, reassignment, node=None):
"""Run the reassign partitions admin tool in "verify" mode
"""
if node is None:
node = self.nodes[0]
json_file = "/tmp/%s_reassign.json" % str(time.time())
# reassignment to json
json_str = json.dumps(reassignment)
json_str = json.dumps(json_str)
# create command
cmd = "echo %s > %s && " % (json_str, json_file)
cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
cmd += "--zookeeper %s " % self.zk.connect_setting()
cmd += "--reassignment-json-file %s " % json_file
cmd += "--verify "
cmd += "&& sleep 1 && rm -f %s" % json_file
# send command
self.logger.info("Verifying parition reassignment...")
self.logger.debug(cmd)
output = ""
for line in node.account.ssh_capture(cmd):
output += line
self.logger.debug(output)
if re.match(".*is in progress.*", output) is not None:
return False
return True
def execute_reassign_partitions(self, reassignment, node=None):
"""Run the reassign partitions admin tool in "verify" mode
"""
if node is None:
node = self.nodes[0]
json_file = "/tmp/%s_reassign.json" % str(time.time())
# reassignment to json
json_str = json.dumps(reassignment)
json_str = json.dumps(json_str)
# create command
cmd = "echo %s > %s && " % (json_str, json_file)
cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
cmd += "--zookeeper %s " % self.zk.connect_setting()
cmd += "--reassignment-json-file %s " % json_file
cmd += "--execute"
cmd += " && sleep 1 && rm -f %s" % json_file
# send command
self.logger.info("Executing parition reassignment...")
self.logger.debug(cmd)
output = ""
for line in node.account.ssh_capture(cmd):
output += line
self.logger.debug("Verify partition reassignment:")
self.logger.debug(output)
def search_data_files(self, topic, messages):
"""Check if a set of messages made it into the Kakfa data files. Note that
this method takes no account of replication. It simply looks for the
payload in all the partition files of the specified topic. 'messages' should be
an array of numbers. The list of missing messages is returned.
"""
payload_match = "payload: " + "$|payload: ".join(str(x) for x in messages) + "$"
found = set([])
for node in self.nodes:
# Grab all .log files in directories prefixed with this topic
files = node.account.ssh_capture("find %s -regex '.*/%s-.*/[^/]*.log'" % (KafkaService.DATA_LOG_DIR, topic))
# Check each data file to see if it contains the messages we want
for log in files:
cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files %s " \
"| grep -E \"%s\"" % (kafka_dir(node), log.strip(), payload_match)
for line in node.account.ssh_capture(cmd, allow_fail=True):
for val in messages:
if line.strip().endswith("payload: "+str(val)):
self.logger.debug("Found %s in data-file [%s] in line: [%s]" % (val, log.strip(), line.strip()))
found.add(val)
missing = list(set(messages) - found)
if len(missing) > 0:
self.logger.warn("The following values were not found in the data files: " + str(missing))
return missing
def restart_node(self, node, clean_shutdown=True):
"""Restart the given node."""
self.stop_node(node, clean_shutdown)
self.start_node(node)
def leader(self, topic, partition=0):
""" Get the leader replica for the given topic and partition.
"""
self.logger.debug("Querying zookeeper to find leader replica for topic: \n%s" % (topic))
zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
partition_state = self.zk.query(zk_path)
if partition_state is None:
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
partition_state = json.loads(partition_state)
self.logger.info(partition_state)
leader_idx = int(partition_state["leader"])
self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
return self.get_node(leader_idx)
def list_consumer_groups(self, node=None, new_consumer=False, command_config=None):
""" Get list of consumer groups.
"""
if node is None:
node = self.nodes[0]
if command_config is None:
command_config = ""
else:
command_config = "--command-config " + command_config
if new_consumer:
cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --list" % \
(kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config)
else:
cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --list" % \
(kafka_dir(node), self.zk.connect_setting(), command_config)
output = ""
self.logger.debug(cmd)
for line in node.account.ssh_capture(cmd):
if not line.startswith("SLF4J"):
output += line
self.logger.debug(output)
return output
def describe_consumer_group(self, group, node=None, new_consumer=False, command_config=None):
""" Describe a consumer group.
"""
if node is None:
node = self.nodes[0]
if command_config is None:
command_config = ""
else:
command_config = "--command-config " + command_config
if new_consumer:
cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --group %s --describe" % \
(kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config, group)
else:
cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --group %s --describe" % \
(kafka_dir(node), self.zk.connect_setting(), command_config, group)
output = ""
self.logger.debug(cmd)
for line in node.account.ssh_capture(cmd):
if not (line.startswith("SLF4J") or line.startswith("GROUP") or line.startswith("Could not fetch offset")):
output += line
self.logger.debug(output)
return output
def bootstrap_servers(self, protocol='PLAINTEXT'):
"""Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
This is the format expected by many config files.
"""
port_mapping = self.port_mappings[protocol]
self.logger.info("Bootstrap client port is: " + str(port_mapping.number))
if not port_mapping.open:
raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping))
return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes])
def controller(self):
""" Get the controller node
"""
self.logger.debug("Querying zookeeper to find controller broker")
controller_info = self.zk.query("/controller")
if controller_info is None:
raise Exception("Error finding controller info")
controller_info = json.loads(controller_info)
self.logger.debug(controller_info)
controller_idx = int(controller_info["brokerid"])
self.logger.info("Controller's ID: %d" % (controller_idx))
return self.get_node(controller_idx)
def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time):
node = self.nodes[0]
cmd = "/opt/%s/bin/" % kafka_dir(node)
cmd += "kafka-run-class.sh kafka.tools.GetOffsetShell"
cmd += " --topic %s --broker-list %s --max-wait-ms %s --offsets %s --time %s" % (topic, self.bootstrap_servers(self.security_protocol), max_wait_ms, offsets, time)
if partitions:
cmd += ' --partitions %s' % partitions
cmd += " 2>> /mnt/get_offset_shell.log | tee -a /mnt/get_offset_shell.log &"
output = ""
self.logger.debug(cmd)
for line in node.account.ssh_capture(cmd):
output += line
self.logger.debug(output)
return output