blob: 887f47c1cde4ccd437967499cf9d0a781f63e8ce [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* This class manages the coordination process with the consumer coordinator.
*/
public final class ConsumerCoordinator extends AbstractCoordinator {
private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
private final List<PartitionAssignor> assignors;
private final org.apache.kafka.clients.Metadata metadata;
private final ConsumerCoordinatorMetrics sensors;
private final SubscriptionState subscriptions;
private final OffsetCommitCallback defaultOffsetCommitCallback;
private final boolean autoCommitEnabled;
private final AutoCommitTask autoCommitTask;
private final ConsumerInterceptors<?, ?> interceptors;
private final boolean excludeInternalTopics;
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot;
/**
* Initialize the coordination manager.
*/
public ConsumerCoordinator(ConsumerNetworkClient client,
String groupId,
int sessionTimeoutMs,
int heartbeatIntervalMs,
List<PartitionAssignor> assignors,
Metadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
Time time,
long retryBackoffMs,
OffsetCommitCallback defaultOffsetCommitCallback,
boolean autoCommitEnabled,
long autoCommitIntervalMs,
ConsumerInterceptors<?, ?> interceptors,
boolean excludeInternalTopics) {
super(client,
groupId,
sessionTimeoutMs,
heartbeatIntervalMs,
metrics,
metricGrpPrefix,
time,
retryBackoffMs);
this.metadata = metadata;
this.metadata.requestUpdate();
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
this.autoCommitEnabled = autoCommitEnabled;
this.assignors = assignors;
addMetadataListener();
if (autoCommitEnabled) {
this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
this.autoCommitTask.reschedule();
} else {
this.autoCommitTask = null;
}
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
this.interceptors = interceptors;
this.excludeInternalTopics = excludeInternalTopics;
}
@Override
public String protocolType() {
return ConsumerProtocol.PROTOCOL_TYPE;
}
@Override
public List<ProtocolMetadata> metadata() {
List<ProtocolMetadata> metadataList = new ArrayList<>();
for (PartitionAssignor assignor : assignors) {
Subscription subscription = assignor.subscription(subscriptions.subscription());
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
}
return metadataList;
}
private void addMetadataListener() {
this.metadata.addListener(new Metadata.Listener() {
@Override
public void onMetadataUpdate(Cluster cluster) {
// if we encounter any unauthorized topics, raise an exception to the user
if (!cluster.unauthorizedTopics().isEmpty())
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));
if (subscriptions.hasPatternSubscription()) {
final List<String> topicsToSubscribe = new ArrayList<>();
for (String topic : cluster.topics())
if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
!(excludeInternalTopics && TopicConstants.INTERNAL_TOPICS.contains(topic)))
topicsToSubscribe.add(topic);
subscriptions.changeSubscription(topicsToSubscribe);
metadata.setTopics(subscriptions.groupSubscription());
}
// check if there are any changes to the metadata which should trigger a rebalance
if (subscriptions.partitionsAutoAssigned()) {
MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
if (!snapshot.equals(metadataSnapshot)) {
metadataSnapshot = snapshot;
subscriptions.needReassignment();
}
}
}
});
}
private PartitionAssignor lookupAssignor(String name) {
for (PartitionAssignor assignor : this.assignors) {
if (assignor.name().equals(name))
return assignor;
}
return null;
}
@Override
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// if we were the assignor, then we need to make sure that there have been no metadata updates
// since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
subscriptions.needReassignment();
return;
}
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
// set the flag to refresh last committed offsets
subscriptions.needRefreshCommits();
// update partition assignment
subscriptions.assignFromSubscribed(assignment.partitions());
// give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment);
// reschedule the auto commit starting from now
if (autoCommitEnabled)
autoCommitTask.reschedule();
// execute the user's callback after rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
try {
Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
} catch (WakeupException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} for group {} failed on partition assignment",
listener.getClass().getName(), groupId, e);
}
}
@Override
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions) {
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
Set<String> allSubscribedTopics = new HashSet<>();
Map<String, Subscription> subscriptions = new HashMap<>();
for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
subscriptions.put(subscriptionEntry.getKey(), subscription);
allSubscribedTopics.addAll(subscription.topics());
}
// the leader will begin watching for changes to any of the topics the group is interested in,
// which ensures that all metadata changes will eventually be seen
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());
// update metadata (if needed) and keep track of the metadata used for assignment so that
// we can check after rebalance completion whether anything has changed
client.ensureFreshMetadata();
assignmentSnapshot = metadataSnapshot;
log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
groupId, assignor.name(), subscriptions);
Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
log.debug("Finished assignment for group {}: {}", groupId, assignment);
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}
return groupAssignment;
}
@Override
protected void onJoinPrepare(int generation, String memberId) {
// commit offsets prior to rebalance if auto-commit enabled
maybeAutoCommitOffsetsSync();
// execute the user's callback before rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
try {
Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsRevoked(revoked);
} catch (WakeupException e) {
throw e;
} catch (Exception e) {
log.error("User provided listener {} for group {} failed on partition revocation",
listener.getClass().getName(), groupId, e);
}
assignmentSnapshot = null;
subscriptions.needReassignment();
}
@Override
public boolean needRejoin() {
return subscriptions.partitionsAutoAssigned() &&
(super.needRejoin() || subscriptions.partitionAssignmentNeeded());
}
/**
* Refresh the committed offsets for provided partitions.
*/
public void refreshCommittedOffsetsIfNeeded() {
if (subscriptions.refreshCommitsNeeded()) {
Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition tp = entry.getKey();
// verify assignment is still active
if (subscriptions.isAssigned(tp))
this.subscriptions.committed(tp, entry.getValue());
}
this.subscriptions.commitsRefreshed();
}
}
/**
* Fetch the current committed offsets from the coordinator for a set of partitions.
* @param partitions The partitions to fetch offsets for
* @return A map from partition to the committed offset
*/
public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
while (true) {
ensureCoordinatorKnown();
// contact coordinator to fetch committed offsets
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = sendOffsetFetchRequest(partitions);
client.poll(future);
if (future.succeeded())
return future.value();
if (!future.isRetriable())
throw future.exception();
time.sleep(retryBackoffMs);
}
}
/**
* Ensure that we have a valid partition assignment from the coordinator.
*/
public void ensurePartitionAssignment() {
if (subscriptions.partitionsAutoAssigned())
ensureActiveGroup();
}
@Override
public void close() {
// we do not need to re-enable wakeups since we are closing already
client.disableWakeups();
try {
maybeAutoCommitOffsetsSync();
} finally {
super.close();
}
}
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
this.subscriptions.needRefreshCommits();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
if (interceptors != null)
interceptors.onCommit(offsets);
cb.onComplete(offsets, null);
}
@Override
public void onFailure(RuntimeException e) {
cb.onComplete(offsets, e);
}
});
// ensure commit has a chance to be transmitted (without blocking on its completion)
// note that we allow delayed tasks to be executed in case heartbeats need to be sent
client.quickPoll(true);
}
/**
* Commit offsets synchronously. This method will retry until the commit completes successfully
* or an unrecoverable error is encountered.
* @param offsets The offsets to be committed
* @throws org.apache.kafka.common.errors.AuthorizationException if the consumer is not authorized to the group
* or to any of the specified partitions
* @throws CommitFailedException if an unrecoverable error occurs before the commit can be completed
*/
public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
if (offsets.isEmpty())
return;
while (true) {
ensureCoordinatorKnown();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
client.poll(future);
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return;
}
if (!future.isRetriable())
throw future.exception();
time.sleep(retryBackoffMs);
}
}
private class AutoCommitTask implements DelayedTask {
private final long interval;
public AutoCommitTask(long interval) {
this.interval = interval;
}
private void reschedule() {
client.schedule(this, time.milliseconds() + interval);
}
private void reschedule(long at) {
client.schedule(this, at);
}
public void run(final long now) {
if (coordinatorUnknown()) {
log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId);
reschedule(now + retryBackoffMs);
return;
}
if (needRejoin()) {
// skip the commit when we're rejoining since we'll commit offsets synchronously
// before the revocation callback is invoked
reschedule(now + interval);
return;
}
commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
reschedule(now + interval);
} else {
log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
reschedule(now + interval);
}
}
});
}
}
private void maybeAutoCommitOffsetsSync() {
if (autoCommitEnabled) {
try {
commitOffsetsSync(subscriptions.allConsumed());
} catch (WakeupException e) {
// rethrow wakeups since they are triggered by the user
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
log.warn("Auto offset commit failed for group {}: {}", groupId, e.getMessage());
}
}
}
/**
* Commit offsets for the specified list of topics and partitions. This is a non-blocking call
* which returns a request future that can be polled in the case of a synchronous commit or ignored in the
* asynchronous case.
*
* @param offsets The list of offsets per partition that should be committed.
* @return A request future whose value indicates whether the commit was successful or not
*/
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
if (offsets.isEmpty())
return RequestFuture.voidSuccess();
// create the offset commit request
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
OffsetAndMetadata offsetAndMetadata = entry.getValue();
offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
}
OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
this.generation,
this.memberId,
OffsetCommitRequest.DEFAULT_RETENTION_TIME,
offsetData);
log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
.compose(new OffsetCommitResponseHandler(offsets));
}
public static class DefaultOffsetCommitCallback implements OffsetCommitCallback {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null)
log.error("Offset commit failed.", exception);
}
}
private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
this.offsets = offsets;
}
@Override
public OffsetCommitResponse parse(ClientResponse response) {
return new OffsetCommitResponse(response.responseBody());
}
@Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitLatency.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();
for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
long offset = offsetAndMetadata.offset();
Errors error = Errors.forCode(entry.getValue());
if (error == Errors.NONE) {
log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
if (subscriptions.isAssigned(tp))
// update the local cache only if the partition is still assigned
subscriptions.committed(tp, offsetAndMetadata);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
log.error("Not authorized to commit offsets for group {}", groupId);
future.raise(new GroupAuthorizationException(groupId));
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(tp.topic());
} else if (error == Errors.OFFSET_METADATA_TOO_LARGE
|| error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
// raise the error to the user
log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
future.raise(error);
return;
} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
// just retry
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
future.raise(error);
return;
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP
|| error == Errors.REQUEST_TIMED_OUT) {
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
coordinatorDead();
future.raise(error);
return;
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION
|| error == Errors.REBALANCE_IN_PROGRESS) {
// need to re-join group
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
subscriptions.needReassignment();
future.raise(new CommitFailedException("Commit cannot be completed since the group has already " +
"rebalanced and assigned the partitions to another member. This means that the time " +
"between subsequent calls to poll() was longer than the configured session.timeout.ms, " +
"which typically implies that the poll loop is spending too much time message processing. " +
"You can address this either by increasing the session timeout or by reducing the maximum " +
"size of batches returned in poll() with max.poll.records."));
return;
} else {
log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
return;
}
}
if (!unauthorizedTopics.isEmpty()) {
log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
future.raise(new TopicAuthorizationException(unauthorizedTopics));
} else {
future.complete(null);
}
}
}
/**
* Fetch the committed offsets for a set of partitions. This is a non-blocking call. The
* returned future can be polled to get the actual offsets returned from the broker.
*
* @param partitions The set of partitions to get offsets for.
* @return A request future containing the committed offsets.
*/
private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions);
// construct the request
OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions));
// send the request with a callback
return client.send(coordinator, ApiKeys.OFFSET_FETCH, request)
.compose(new OffsetFetchResponseHandler());
}
private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
@Override
public OffsetFetchResponse parse(ClientResponse response) {
return new OffsetFetchResponse(response.responseBody());
}
@Override
public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size());
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetFetchResponse.PartitionData data = entry.getValue();
if (data.hasError()) {
Errors error = Errors.forCode(data.errorCode);
log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message());
if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
// just retry
future.raise(error);
} else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
// re-discover the coordinator and retry
coordinatorDead();
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
// need to re-join group
subscriptions.needReassignment();
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
}
return;
} else if (data.offset >= 0) {
// record the position with the offset (-1 indicates no committed offset to fetch)
offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
} else {
log.debug("Group {} has no committed offset for partition {}", groupId, tp);
}
}
future.complete(offsets);
}
}
private class ConsumerCoordinatorMetrics {
public final Metrics metrics;
public final String metricGrpName;
public final Sensor commitLatency;
public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
this.metrics = metrics;
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.commitLatency = metrics.sensor("commit-latency");
this.commitLatency.add(metrics.metricName("commit-latency-avg",
this.metricGrpName,
"The average time taken for a commit request"), new Avg());
this.commitLatency.add(metrics.metricName("commit-latency-max",
this.metricGrpName,
"The max time taken for a commit request"), new Max());
this.commitLatency.add(metrics.metricName("commit-rate",
this.metricGrpName,
"The number of commit calls per second"), new Rate(new Count()));
Measurable numParts =
new Measurable() {
public double measure(MetricConfig config, long now) {
return subscriptions.assignedPartitions().size();
}
};
metrics.addMetric(metrics.metricName("assigned-partitions",
this.metricGrpName,
"The number of partitions currently assigned to this consumer"), numParts);
}
}
private static class MetadataSnapshot {
private final Map<String, Integer> partitionsPerTopic;
public MetadataSnapshot(SubscriptionState subscription, Cluster cluster) {
Map<String, Integer> partitionsPerTopic = new HashMap<>();
for (String topic : subscription.groupSubscription())
partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic));
this.partitionsPerTopic = partitionsPerTopic;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MetadataSnapshot that = (MetadataSnapshot) o;
return partitionsPerTopic != null ? partitionsPerTopic.equals(that.partitionsPerTopic) : that.partitionsPerTopic == null;
}
@Override
public int hashCode() {
return partitionsPerTopic != null ? partitionsPerTopic.hashCode() : 0;
}
}
}