blob: 3605a175e08aa5e931c4d22d6679413b1ba3655d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.DirectoryData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.PartitionData;
import org.apache.kafka.common.message.AssignReplicasToDirsRequestData.TopicData;
import org.apache.kafka.common.message.AssignReplicasToDirsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
import org.apache.kafka.common.utils.ExponentialBackoff;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.kafka.common.requests.AssignReplicasToDirsRequest.MAX_ASSIGNMENTS_PER_REQUEST;
public final class AssignmentsManager {
static final ExponentialBackoff STANDARD_BACKOFF = new ExponentialBackoff(
TimeUnit.MILLISECONDS.toNanos(100),
2,
TimeUnit.SECONDS.toNanos(10),
0.02);
/**
* The minimum amount of time we will wait before logging individual assignment failures.
*/
static final long MIN_NOISY_FAILURE_INTERVAL_NS = TimeUnit.MINUTES.toNanos(2);
/**
* The metric reflecting the number of pending assignments.
*/
static final MetricName QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC =
metricName("QueuedReplicaToDirAssignments");
/**
* The event at which we send assignments, if appropriate.
*/
static final String MAYBE_SEND_ASSIGNMENTS_EVENT = "MaybeSendAssignmentsEvent";
/**
* The log4j object.
*/
private final Logger log;
/**
* The exponential backoff strategy to use.
*/
private final ExponentialBackoff backoff;
/**
* The clock object to use.
*/
private final Time time;
/**
* Used to send messages to the controller.
*/
private final NodeToControllerChannelManager channelManager;
/**
* The node ID.
*/
private final int nodeId;
/**
* Supplies the latest MetadataImage.
*/
private final Supplier<MetadataImage> metadataImageSupplier;
/**
* Maps directory IDs to descriptions for logging purposes.
*/
private final Function<Uuid, String> directoryIdToDescription;
/**
* Maps partitions to assignments that are ready to send.
*/
private final ConcurrentHashMap<TopicIdPartition, Assignment> ready;
/**
* Maps partitions to assignments that are in-flight. Older entries come first.
*/
private volatile Map<TopicIdPartition, Assignment> inflight;
/**
* The registry to register our metrics with.
*/
private final MetricsRegistry metricsRegistry;
/**
* The number of global failures we had previously (cleared after any success).
*/
private int previousGlobalFailures;
/**
* The event queue.
*/
private final KafkaEventQueue eventQueue;
static MetricName metricName(String name) {
return KafkaYammerMetrics.getMetricName("org.apache.kafka.server", "AssignmentsManager", name);
}
public AssignmentsManager(
Time time,
NodeToControllerChannelManager channelManager,
int nodeId,
Supplier<MetadataImage> metadataImageSupplier,
Function<Uuid, String> directoryIdToDescription
) {
this(STANDARD_BACKOFF,
time,
channelManager,
nodeId,
metadataImageSupplier,
directoryIdToDescription,
KafkaYammerMetrics.defaultRegistry());
}
AssignmentsManager(
ExponentialBackoff backoff,
Time time,
NodeToControllerChannelManager channelManager,
int nodeId,
Supplier<MetadataImage> metadataImageSupplier,
Function<Uuid, String> directoryIdToDescription,
MetricsRegistry metricsRegistry
) {
this.log = new LogContext("[AssignmentsManager id=" + nodeId + "] ").
logger(AssignmentsManager.class);
this.backoff = backoff;
this.time = time;
this.channelManager = channelManager;
this.nodeId = nodeId;
this.directoryIdToDescription = directoryIdToDescription;
this.metadataImageSupplier = metadataImageSupplier;
this.ready = new ConcurrentHashMap<>();
this.inflight = Collections.emptyMap();
this.metricsRegistry = metricsRegistry;
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new Gauge<Integer>() {
@Override
public Integer value() {
return numPending();
}
});
this.previousGlobalFailures = 0;
this.eventQueue = new KafkaEventQueue(time,
new LogContext("[AssignmentsManager id=" + nodeId + "]"),
"broker-" + nodeId + "-directory-assignments-manager-",
new ShutdownEvent());
channelManager.start();
}
public int numPending() {
return ready.size() + inflight.size();
}
public void close() throws InterruptedException {
eventQueue.close();
}
public void onAssignment(
TopicIdPartition topicIdPartition,
Uuid directoryId,
String reason,
Runnable successCallback
) {
long nowNs = time.nanoseconds();
Assignment assignment = new Assignment(
topicIdPartition, directoryId, nowNs, successCallback);
ready.put(topicIdPartition, assignment);
if (log.isTraceEnabled()) {
String topicDescription = Optional.ofNullable(metadataImageSupplier.get().topics().
getTopic(assignment.topicIdPartition().topicId())).
map(TopicImage::name).orElse(assignment.topicIdPartition().topicId().toString());
log.trace("Registered assignment {}: {}, moving {}-{} into {}",
assignment,
reason,
topicDescription,
topicIdPartition.partitionId(),
directoryIdToDescription.apply(assignment.directoryId()));
}
rescheduleMaybeSendAssignmentsEvent(nowNs);
}
void rescheduleMaybeSendAssignmentsEvent(long nowNs) {
eventQueue.scheduleDeferred(MAYBE_SEND_ASSIGNMENTS_EVENT,
new AssignmentsManagerDeadlineFunction(backoff,
nowNs, previousGlobalFailures, !inflight.isEmpty(), ready.size()),
new MaybeSendAssignmentsEvent());
}
/**
* Handles shutdown.
*/
private class ShutdownEvent implements EventQueue.Event {
@Override
public void run() {
log.info("shutting down.");
try {
channelManager.shutdown();
} catch (Exception e) {
log.error("Unexpected exception shutting down NodeToControllerChannelManager", e);
}
try {
metricsRegistry.removeMetric(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC);
} catch (Exception e) {
log.error("Unexpected exception removing metrics.", e);
}
}
}
/**
* An event that processes the assignments in the ready map.
*/
private class MaybeSendAssignmentsEvent implements EventQueue.Event {
@Override
public void run() {
try {
maybeSendAssignments();
} catch (Exception e) {
log.error("Unexpected exception in MaybeSendAssignmentsEvent", e);
}
}
}
/**
* An event that handles the controller's response to our request.
*/
private class HandleResponseEvent implements EventQueue.Event {
private final Map<TopicIdPartition, Assignment> sent;
private final Optional<ClientResponse> response;
HandleResponseEvent(
Map<TopicIdPartition, Assignment> sent,
Optional<ClientResponse> response
) {
this.sent = sent;
this.response = response;
}
@Override
public void run() {
try {
handleResponse(sent, response);
} catch (Exception e) {
log.error("Unexpected exception in HandleResponseEvent", e);
} finally {
if (!ready.isEmpty()) {
rescheduleMaybeSendAssignmentsEvent(time.nanoseconds());
}
}
}
}
/**
* A callback object that handles the controller's response to our request.
*/
private class CompletionHandler implements ControllerRequestCompletionHandler {
private final Map<TopicIdPartition, Assignment> sent;
CompletionHandler(Map<TopicIdPartition, Assignment> sent) {
this.sent = sent;
}
@Override
public void onTimeout() {
eventQueue.append(new HandleResponseEvent(sent, Optional.empty()));
}
@Override
public void onComplete(ClientResponse response) {
eventQueue.append(new HandleResponseEvent(sent, Optional.of(response)));
}
}
void maybeSendAssignments() {
int inflightSize = inflight.size();
if (log.isTraceEnabled()) {
log.trace("maybeSendAssignments: inflightSize = {}.", inflightSize);
}
if (inflightSize > 0) {
log.trace("maybeSendAssignments: cannot send new assignments because there are " +
"{} still in flight.", inflightSize);
return;
}
MetadataImage image = metadataImageSupplier.get();
Map<TopicIdPartition, Assignment> newInFlight = new HashMap<>();
int numInvalid = 0;
for (Iterator<Assignment> iterator = ready.values().iterator();
iterator.hasNext() && newInFlight.size() < MAX_ASSIGNMENTS_PER_REQUEST;
) {
Assignment assignment = iterator.next();
iterator.remove();
if (assignment.valid(nodeId, image)) {
newInFlight.put(assignment.topicIdPartition(), assignment);
} else {
numInvalid++;
}
}
log.info("maybeSendAssignments: sending {} assignments; invalidated {} assignments " +
"prior to sending.", newInFlight.size(), numInvalid);
if (!newInFlight.isEmpty()) {
sendAssignments(image.cluster().brokerEpoch(nodeId), newInFlight);
}
}
void sendAssignments(long brokerEpoch, Map<TopicIdPartition, Assignment> newInflight) {
CompletionHandler completionHandler = new CompletionHandler(newInflight);
channelManager.sendRequest(new AssignReplicasToDirsRequest.Builder(
buildRequestData(nodeId, brokerEpoch, newInflight)),
completionHandler);
inflight = newInflight;
}
void handleResponse(
Map<TopicIdPartition, Assignment> sent,
Optional<ClientResponse> assignmentResponse
) {
inflight = Collections.emptyMap();
Optional<String> globalResponseError = globalResponseError(assignmentResponse);
if (globalResponseError.isPresent()) {
previousGlobalFailures++;
log.error("handleResponse: {} assignments failed; global error: {}. Retrying.",
sent.size(), globalResponseError.get());
sent.entrySet().forEach(e -> ready.putIfAbsent(e.getKey(), e.getValue()));
return;
}
previousGlobalFailures = 0;
AssignReplicasToDirsResponseData responseData =
((AssignReplicasToDirsResponse) assignmentResponse.get().responseBody()).data();
long nowNs = time.nanoseconds();
for (AssignReplicasToDirsResponseData.DirectoryData directoryData : responseData.directories()) {
for (AssignReplicasToDirsResponseData.TopicData topicData : directoryData.topics()) {
for (AssignReplicasToDirsResponseData.PartitionData partitionData : topicData.partitions()) {
TopicIdPartition topicIdPartition =
new TopicIdPartition(topicData.topicId(), partitionData.partitionIndex());
handleAssignmentResponse(topicIdPartition, sent,
Errors.forCode(partitionData.errorCode()), nowNs);
sent.remove(topicIdPartition);
}
}
}
for (Assignment assignment : sent.values()) {
ready.putIfAbsent(assignment.topicIdPartition(), assignment);
log.error("handleResponse: no result in response for partition {}.",
assignment.topicIdPartition());
}
}
void handleAssignmentResponse(
TopicIdPartition topicIdPartition,
Map<TopicIdPartition, Assignment> sent,
Errors error,
long nowNs
) {
Assignment assignment = sent.get(topicIdPartition);
if (assignment == null) {
log.error("handleResponse: response contained topicIdPartition {}, but this was not " +
"in the request.", topicIdPartition);
} else if (error.equals(Errors.NONE)) {
try {
assignment.successCallback().run();
} catch (Exception e) {
log.error("handleResponse: unexpected callback exception", e);
}
} else {
ready.putIfAbsent(topicIdPartition, assignment);
if (log.isDebugEnabled() || nowNs > assignment.submissionTimeNs() + MIN_NOISY_FAILURE_INTERVAL_NS) {
log.error("handleResponse: error assigning {}: {}.", assignment.topicIdPartition(), error);
}
}
}
int previousGlobalFailures() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = new CompletableFuture<>();
eventQueue.append(() -> future.complete(previousGlobalFailures));
return future.get();
}
int numInFlight() {
return inflight.size();
}
static Optional<String> globalResponseError(Optional<ClientResponse> response) {
if (response.isEmpty()) {
return Optional.of("Timeout");
}
if (response.get().authenticationException() != null) {
return Optional.of("AuthenticationException");
}
if (response.get().wasTimedOut()) {
return Optional.of("Disonnected[Timeout]");
}
if (response.get().wasDisconnected()) {
return Optional.of("Disconnected");
}
if (response.get().versionMismatch() != null) {
return Optional.of("UnsupportedVersionException");
}
if (response.get().responseBody() == null) {
return Optional.of("EmptyResponse");
}
if (!(response.get().responseBody() instanceof AssignReplicasToDirsResponse)) {
return Optional.of("ClassCastException");
}
AssignReplicasToDirsResponseData data = ((AssignReplicasToDirsResponse)
response.get().responseBody()).data();
Errors error = Errors.forCode(data.errorCode());
if (error != Errors.NONE) {
return Optional.of("Response-level error: " + error.name());
}
return Optional.empty();
}
static AssignReplicasToDirsRequestData buildRequestData(
int nodeId,
long brokerEpoch,
Map<TopicIdPartition, Assignment> assignments
) {
Map<Uuid, DirectoryData> directoryMap = new HashMap<>();
Map<Uuid, Map<Uuid, TopicData>> topicMap = new HashMap<>();
for (Map.Entry<TopicIdPartition, Assignment> entry : assignments.entrySet()) {
TopicIdPartition topicPartition = entry.getKey();
Uuid directoryId = entry.getValue().directoryId();
DirectoryData directory = directoryMap.computeIfAbsent(directoryId, d -> new DirectoryData().setId(directoryId));
TopicData topic = topicMap.computeIfAbsent(directoryId, d -> new HashMap<>())
.computeIfAbsent(topicPartition.topicId(), topicId -> {
TopicData data = new TopicData().setTopicId(topicId);
directory.topics().add(data);
return data;
});
PartitionData partition = new PartitionData().setPartitionIndex(topicPartition.partitionId());
topic.partitions().add(partition);
}
return new AssignReplicasToDirsRequestData()
.setBrokerId(nodeId)
.setBrokerEpoch(brokerEpoch)
.setDirectories(new ArrayList<>(directoryMap.values()));
}
}