blob: 12e2cd1468c894b0a19063ffc82913bb9abafe02 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.fetcher;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.DelayedOperations;
import kafka.cluster.AlterPartitionListener;
import kafka.cluster.Partition;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.LogAppendInfo;
import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.BrokerFeatures;
import kafka.server.BrokerTopicStats;
import kafka.server.FailedPartitions;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.OffsetTruncationState;
import kafka.server.QuotaFactory;
import kafka.server.RemoteLeaderEndPoint;
import kafka.server.BrokerBlockingSender;
import kafka.server.ReplicaFetcherThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
import kafka.server.checkpoints.OffsetCheckpoints;
import kafka.server.metadata.MockConfigRepository;
import kafka.server.metadata.ZkMetadataCache;
import kafka.utils.Pool;
import kafka.utils.TestUtils;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.RecordsSend;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.server.util.KafkaScheduler;
import org.mockito.Mockito;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.collection.Iterator;
import scala.collection.Map;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 15)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class ReplicaFetcherThreadBenchmark {
@Param({"100", "500", "1000", "5000"})
private int partitionCount;
private ReplicaFetcherBenchThread fetcher;
private LogManager logManager;
private File logDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
private KafkaScheduler scheduler = new KafkaScheduler(1, true, "scheduler");
private Pool<TopicPartition, Partition> pool = new Pool<TopicPartition, Partition>(Option.empty());
private Metrics metrics = new Metrics();
private ReplicaManager replicaManager;
private ReplicaQuota replicaQuota;
private Option<Uuid> topicId = Option.apply(Uuid.randomUuid());
@Setup(Level.Trial)
public void setup() throws IOException {
if (!logDir.mkdir())
throw new IOException("error creating test directory");
scheduler.startup();
Properties props = new Properties();
props.put("zookeeper.connect", "127.0.0.1:9999");
KafkaConfig config = new KafkaConfig(props);
LogConfig logConfig = createLogConfig();
BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
LogDirFailureChannel logDirFailureChannel = Mockito.mock(LogDirFailureChannel.class);
List<File> logDirs = Collections.singletonList(logDir);
logManager = new LogManagerBuilder().
setLogDirs(logDirs).
setInitialOfflineDirs(Collections.emptyList()).
setConfigRepository(new MockConfigRepository()).
setInitialDefaultConfig(logConfig).
setCleanerConfig(new CleanerConfig(0, 0, 0, 0, 0, 0.0, 0, false)).
setRecoveryThreadsPerDataDir(1).
setFlushCheckMs(1000L).
setFlushRecoveryOffsetCheckpointMs(10000L).
setFlushStartOffsetCheckpointMs(10000L).
setRetentionCheckMs(1000L).
setProducerStateManagerConfig(60000, false).
setInterBrokerProtocolVersion(MetadataVersion.latest()).
setScheduler(scheduler).
setBrokerTopicStats(brokerTopicStats).
setLogDirFailureChannel(logDirFailureChannel).
setTime(Time.SYSTEM).
setKeepPartitionMetadataFile(true).
build();
LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> initialFetched = new LinkedHashMap<>();
HashMap<String, Uuid> topicIds = new HashMap<>();
scala.collection.mutable.Map<TopicPartition, InitialFetchState> initialFetchStates = new scala.collection.mutable.HashMap<>();
List<UpdateMetadataRequestData.UpdateMetadataPartitionState> updatePartitionState = new ArrayList<>();
for (int i = 0; i < partitionCount; i++) {
TopicPartition tp = new TopicPartition("topic", i);
List<Integer> replicas = Arrays.asList(0, 1, 2);
LeaderAndIsrRequestData.LeaderAndIsrPartitionState partitionState = new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(replicas)
.setPartitionEpoch(1)
.setReplicas(replicas)
.setIsNew(true);
AlterPartitionListener alterPartitionListener = Mockito.mock(AlterPartitionListener.class);
OffsetCheckpoints offsetCheckpoints = Mockito.mock(OffsetCheckpoints.class);
Mockito.when(offsetCheckpoints.fetch(logDir.getAbsolutePath(), tp)).thenReturn(Option.apply(0L));
AlterPartitionManager isrChannelManager = Mockito.mock(AlterPartitionManager.class);
Partition partition = new Partition(tp, 100, MetadataVersion.latest(),
0, () -> -1, Time.SYSTEM, alterPartitionListener, new DelayedOperationsMock(tp),
Mockito.mock(MetadataCache.class), logManager, isrChannelManager);
partition.makeFollower(partitionState, offsetCheckpoints, topicId);
pool.put(tp, partition);
initialFetchStates.put(tp, new InitialFetchState(topicId, new BrokerEndPoint(3, "host", 3000), 0, 0));
BaseRecords fetched = new BaseRecords() {
@Override
public int sizeInBytes() {
return 0;
}
@Override
public RecordsSend<? extends BaseRecords> toSend() {
return null;
}
};
initialFetched.put(new TopicIdPartition(topicId.get(), tp), new FetchResponseData.PartitionData()
.setPartitionIndex(tp.partition())
.setLastStableOffset(0)
.setLogStartOffset(0)
.setRecords(fetched));
updatePartitionState.add(
new UpdateMetadataRequestData.UpdateMetadataPartitionState()
.setTopicName("topic")
.setPartitionIndex(i)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(replicas)
.setZkVersion(1)
.setReplicas(replicas));
}
UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion(),
0, 0, 0, updatePartitionState, Collections.emptyList(), topicIds).build();
// TODO: fix to support raft
ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), null);
metadataCache.updateMetadata(0, updateMetadataRequest);
replicaManager = new ReplicaManagerBuilder().
setConfig(config).
setMetrics(metrics).
setTime(new MockTime()).
setZkClient(Mockito.mock(KafkaZkClient.class)).
setScheduler(scheduler).
setLogManager(logManager).
setQuotaManagers(Mockito.mock(QuotaFactory.QuotaManagers.class)).
setBrokerTopicStats(brokerTopicStats).
setMetadataCache(metadataCache).
setLogDirFailureChannel(new LogDirFailureChannel(logDirs.size())).
setAlterPartitionManager(TestUtils.createAlterIsrManager()).
build();
replicaQuota = new ReplicaQuota() {
@Override
public boolean isQuotaExceeded() {
return false;
}
@Override
public void record(long value) {
}
@Override
public boolean isThrottled(TopicPartition topicPartition) {
return false;
}
};
fetcher = new ReplicaFetcherBenchThread(config, replicaManager, replicaQuota, pool);
fetcher.addPartitions(initialFetchStates);
// force a pass to move partitions to fetching state. We do this in the setup phase
// so that we do not measure this time as part of the steady state work
fetcher.doWork();
// handle response to engage the incremental fetch session handler
((RemoteLeaderEndPoint) fetcher.leader()).fetchSessionHandler().handleResponse(FetchResponse.of(Errors.NONE, 0, 999, initialFetched), ApiKeys.FETCH.latestVersion());
}
@TearDown(Level.Trial)
public void tearDown() throws IOException, InterruptedException {
metrics.close();
replicaManager.shutdown(false);
logManager.shutdown();
scheduler.shutdown();
Utils.delete(logDir);
}
@Benchmark
public long testFetcher() {
fetcher.doWork();
return fetcher.fetcherStats().requestRate().count();
}
// avoid mocked DelayedOperations to avoid mocked class affecting benchmark results
private static class DelayedOperationsMock extends DelayedOperations {
DelayedOperationsMock(TopicPartition topicPartition) {
super(topicPartition, null, null, null);
}
@Override
public int numDelayedDelete() {
return 0;
}
}
private static LogConfig createLogConfig() {
return new LogConfig(new Properties());
}
static class ReplicaFetcherBenchThread extends ReplicaFetcherThread {
private final Pool<TopicPartition, Partition> pool;
ReplicaFetcherBenchThread(KafkaConfig config,
ReplicaManager replicaManager,
ReplicaQuota replicaQuota,
Pool<TopicPartition,
Partition> partitions) {
super("name",
new RemoteLeaderEndPoint(
String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3),
new BrokerBlockingSender(
new BrokerEndPoint(3, "host", 3000),
config,
new Metrics(),
Time.SYSTEM,
3,
String.format("broker-%d-fetcher-%d", 3, 3),
new LogContext(String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3))
),
new FetchSessionHandler(
new LogContext(String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)), 3),
config,
replicaManager,
replicaQuota,
config::interBrokerProtocolVersion,
() -> -1
) {
@Override
public OffsetAndEpoch fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
return new OffsetAndEpoch(0L, 0);
}
@Override
public Map<TopicPartition, EpochEndOffset> fetchEpochEndOffsets(Map<TopicPartition, OffsetForLeaderPartition> partitions) {
scala.collection.mutable.Map<TopicPartition, EpochEndOffset> endOffsets = new scala.collection.mutable.HashMap<>();
Iterator<TopicPartition> iterator = partitions.keys().iterator();
while (iterator.hasNext()) {
TopicPartition tp = iterator.next();
endOffsets.put(tp, new EpochEndOffset()
.setPartition(tp.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderEpoch(0)
.setEndOffset(100));
}
return endOffsets;
}
@Override
public Map<TopicPartition, FetchResponseData.PartitionData> fetch(FetchRequest.Builder fetchRequest) {
return new scala.collection.mutable.HashMap<>();
}
},
config,
new FailedPartitions(),
replicaManager,
replicaQuota,
String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3),
config::interBrokerProtocolVersion
);
pool = partitions;
}
@Override
public Option<Object> latestEpoch(TopicPartition topicPartition) {
return Option.apply(0);
}
@Override
public long logStartOffset(TopicPartition topicPartition) {
return pool.get(topicPartition).localLogOrException().logStartOffset();
}
@Override
public long logEndOffset(TopicPartition topicPartition) {
return 0;
}
@Override
public void truncate(TopicPartition tp, OffsetTruncationState offsetTruncationState) {
// pretend to truncate to move to Fetching state
}
@Override
public Option<OffsetAndEpoch> endOffsetForEpoch(TopicPartition topicPartition, int epoch) {
return Option.apply(new OffsetAndEpoch(0, 0));
}
@Override
public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long fetchOffset,
FetchResponseData.PartitionData partitionData) {
return Option.empty();
}
}
}