MINOR: Add generation to consumer assignor logs
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9838e7d..b965b91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -68,6 +68,7 @@
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -83,6 +84,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
@@ -171,13 +173,23 @@
metricGrpPrefix,
time);
this.rebalanceConfig = rebalanceConfig;
- this.log = logContext.logger(ConsumerCoordinator.class);
+ final Supplier<String> dynamicPrefix =
+ () -> logContext.logPrefix() + "[generationId=" + generation().generationId + "] ";
+ this.log = new DynamicPrefixLogger(
+ dynamicPrefix,
+ LoggerFactory.getLogger(ConsumerCoordinator.class)
+ );
this.metadata = metadata;
this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
this.autoCommitEnabled = autoCommitEnabled;
this.autoCommitIntervalMs = autoCommitIntervalMs;
+ for (final ConsumerPartitionAssignor assignor : assignors) {
+ if (assignor instanceof ContextualLogging) {
+ ((ContextualLogging) assignor).setLoggingContext(dynamicPrefix);
+ }
+ }
this.assignors = assignors;
this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
@@ -677,8 +689,8 @@
isLeader = true;
if (skipAssignment) {
- log.info("Skipped assignment for returning static leader at generation {}. The static leader " +
- "will continue with its existing assignment.", generation().generationId);
+ log.info("Skipped assignment for returning static leader. The static leader " +
+ "will continue with its existing assignment.");
assignmentSnapshot = metadataSnapshot;
return Collections.emptyMap();
}
@@ -699,7 +711,7 @@
// we must take the assignment snapshot after.
assignmentSnapshot = metadataSnapshot;
- log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments);
+ log.info("Finished assignment for group: {}", assignments);
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet()) {
@@ -1218,7 +1230,7 @@
private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
if (autoCommitEnabled)
return autoCommitOffsetsAsync();
- return null;
+ return null;
}
private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java
new file mode 100644
index 0000000..c076554
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.function.Supplier;
+
+public interface ContextualLogging {
+ void setLoggingContext(Supplier<String> loggingContext);
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java
new file mode 100644
index 0000000..e2d9252
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.slf4j.Logger;
+import org.slf4j.Marker;
+
+import java.util.function.Supplier;
+
+public final class DynamicPrefixLogger implements Logger {
+
+ private final Supplier<String> prefix;
+ private final Logger delegate;
+
+ public DynamicPrefixLogger(final Supplier<String> prefix, final Logger delegate) {
+ this.prefix = prefix;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String getName() {
+ return delegate.getName();
+ }
+
+ @Override
+ public boolean isTraceEnabled() {
+ return delegate.isTraceEnabled();
+ }
+
+ @Override
+ public void trace(final String msg) {
+ delegate.trace(prefix.get() + msg);
+ }
+
+ @Override
+ public void trace(final String format, final Object arg) {
+ delegate.trace(prefix.get() + format, arg);
+ }
+
+ @Override
+ public void trace(final String format, final Object arg1, final Object arg2) {
+ delegate.trace(prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void trace(final String format, final Object... arguments) {
+ delegate.trace(prefix.get() + format, arguments);
+ }
+
+ @Override
+ public void trace(final String msg, final Throwable t) {
+ delegate.trace(prefix.get() + msg, t);
+ }
+
+ @Override
+ public boolean isTraceEnabled(final Marker marker) {
+ return delegate.isTraceEnabled(marker);
+ }
+
+ @Override
+ public void trace(final Marker marker, final String msg) {
+ delegate.trace(marker, prefix.get() + msg);
+ }
+
+ @Override
+ public void trace(final Marker marker, final String format, final Object arg) {
+ delegate.trace(marker, prefix.get() + format, arg);
+ }
+
+ @Override
+ public void trace(final Marker marker,
+ final String format,
+ final Object arg1,
+ final Object arg2) {
+ delegate.trace(marker, prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void trace(final Marker marker, final String format, final Object... argArray) {
+ delegate.trace(marker, prefix.get() + format, argArray);
+ }
+
+ @Override
+ public void trace(final Marker marker, final String msg, final Throwable t) {
+ delegate.trace(marker, prefix.get() + msg, t);
+ }
+
+ @Override
+ public boolean isDebugEnabled() {
+ return delegate.isDebugEnabled();
+ }
+
+ @Override
+ public void debug(final String msg) {
+ delegate.debug(prefix.get() + msg);
+ }
+
+ @Override
+ public void debug(final String format, final Object arg) {
+ delegate.debug(prefix.get() + format, arg);
+ }
+
+ @Override
+ public void debug(final String format, final Object arg1, final Object arg2) {
+ delegate.debug(prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void debug(final String format, final Object... arguments) {
+ delegate.debug(prefix.get() + format, arguments);
+ }
+
+ @Override
+ public void debug(final String msg, final Throwable t) {
+ delegate.debug(prefix.get() + msg, t);
+ }
+
+ @Override
+ public boolean isDebugEnabled(final Marker marker) {
+ return delegate.isDebugEnabled(marker);
+ }
+
+ @Override
+ public void debug(final Marker marker, final String msg) {
+ delegate.debug(marker, prefix.get() + msg);
+ }
+
+ @Override
+ public void debug(final Marker marker, final String format, final Object arg) {
+ delegate.debug(marker, prefix.get() + format, arg);
+ }
+
+ @Override
+ public void debug(final Marker marker,
+ final String format,
+ final Object arg1,
+ final Object arg2) {
+ delegate.debug(marker, prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void debug(final Marker marker, final String format, final Object... arguments) {
+ delegate.debug(marker, prefix.get() + format, arguments);
+ }
+
+ @Override
+ public void debug(final Marker marker, final String msg, final Throwable t) {
+ delegate.debug(marker, prefix.get() + msg, t);
+ }
+
+ @Override
+ public boolean isInfoEnabled() {
+ return delegate.isInfoEnabled();
+ }
+
+ @Override
+ public void info(final String msg) {
+ delegate.info(prefix.get() + msg);
+ }
+
+ @Override
+ public void info(final String format, final Object arg) {
+ delegate.info(prefix.get() + format, arg);
+ }
+
+ @Override
+ public void info(final String format, final Object arg1, final Object arg2) {
+ delegate.info(prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void info(final String format, final Object... arguments) {
+ delegate.info(prefix.get() + format, arguments);
+ }
+
+ @Override
+ public void info(final String msg, final Throwable t) {
+ delegate.info(prefix.get() + msg, t);
+ }
+
+ @Override
+ public boolean isInfoEnabled(final Marker marker) {
+ return delegate.isInfoEnabled(marker);
+ }
+
+ @Override
+ public void info(final Marker marker, final String msg) {
+ delegate.info(marker, prefix.get() + msg);
+ }
+
+ @Override
+ public void info(final Marker marker, final String format, final Object arg) {
+ delegate.info(marker, prefix.get() + format, arg);
+ }
+
+ @Override
+ public void info(final Marker marker,
+ final String format,
+ final Object arg1,
+ final Object arg2) {
+ delegate.info(marker, prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void info(final Marker marker, final String format, final Object... arguments) {
+ delegate.info(marker, prefix.get() + format, arguments);
+ }
+
+ @Override
+ public void info(final Marker marker, final String msg, final Throwable t) {
+ delegate.info(marker, prefix.get() + msg, t);
+ }
+
+ @Override
+ public boolean isWarnEnabled() {
+ return delegate.isWarnEnabled();
+ }
+
+ @Override
+ public void warn(final String msg) {
+ delegate.warn(prefix.get() + msg);
+ }
+
+ @Override
+ public void warn(final String format, final Object arg) {
+ delegate.warn(prefix.get() + format, arg);
+ }
+
+ @Override
+ public void warn(final String format, final Object... arguments) {
+ delegate.warn(prefix.get() + format, arguments);
+ }
+
+ @Override
+ public void warn(final String format, final Object arg1, final Object arg2) {
+ delegate.warn(prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void warn(final String msg, final Throwable t) {
+ delegate.warn(prefix.get() + msg, t);
+ }
+
+ @Override
+ public boolean isWarnEnabled(final Marker marker) {
+ return delegate.isWarnEnabled(marker);
+ }
+
+ @Override
+ public void warn(final Marker marker, final String msg) {
+ delegate.warn(marker, prefix.get() + msg);
+ }
+
+ @Override
+ public void warn(final Marker marker, final String format, final Object arg) {
+ delegate.warn(marker, prefix.get() + format, arg);
+ }
+
+ @Override
+ public void warn(final Marker marker,
+ final String format,
+ final Object arg1,
+ final Object arg2) {
+ delegate.warn(marker, prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void warn(final Marker marker, final String format, final Object... arguments) {
+ delegate.warn(marker, prefix.get() + format, arguments);
+ }
+
+ @Override
+ public void warn(final Marker marker, final String msg, final Throwable t) {
+ delegate.warn(marker, prefix.get() + msg, t);
+ }
+
+ @Override
+ public boolean isErrorEnabled() {
+ return delegate.isErrorEnabled();
+ }
+
+ @Override
+ public void error(final String msg) {
+ delegate.error(prefix.get() + msg);
+ }
+
+ @Override
+ public void error(final String format, final Object arg) {
+ delegate.error(prefix.get() + format, arg);
+ }
+
+ @Override
+ public void error(final String format, final Object arg1, final Object arg2) {
+ delegate.error(prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void error(final String format, final Object... arguments) {
+ delegate.error(prefix.get() + format, arguments);
+ }
+
+ @Override
+ public void error(final String msg, final Throwable t) {
+ delegate.error(prefix.get() + msg, t);
+ }
+
+ @Override
+ public boolean isErrorEnabled(final Marker marker) {
+ return delegate.isErrorEnabled(marker);
+ }
+
+ @Override
+ public void error(final Marker marker, final String msg) {
+ delegate.error(marker, prefix.get() + msg);
+ }
+
+ @Override
+ public void error(final Marker marker, final String format, final Object arg) {
+ delegate.error(marker, format, arg);
+ }
+
+ @Override
+ public void error(final Marker marker,
+ final String format,
+ final Object arg1,
+ final Object arg2) {
+ delegate.error(marker, prefix.get() + format, arg1, arg2);
+ }
+
+ @Override
+ public void error(final Marker marker, final String format, final Object... arguments) {
+ delegate.error(marker, prefix.get() + format, arguments);
+ }
+
+ @Override
+ public void error(final Marker marker, final String msg, final Throwable t) {
+ delegate.error(marker, prefix.get() + msg, t);
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 288d5d2..c68e5ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -21,6 +21,8 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ContextualLogging;
+import org.apache.kafka.clients.consumer.internals.DynamicPrefixLogger;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
@@ -29,7 +31,6 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
@@ -52,6 +53,7 @@
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.time.Instant;
@@ -78,7 +80,6 @@
import java.util.stream.Collectors;
import static java.util.UUID.randomUUID;
-
import static org.apache.kafka.common.utils.Utils.filterMap;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
@@ -87,10 +88,21 @@
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
-public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
+public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable,
+ ContextualLogging {
- private Logger log;
+ // set first via configure()
private String logPrefix;
+ // overwritten via setLoggingContext() if it is called.
+ private Logger log = LoggerFactory.getLogger(StreamsPartitionAssignor.class);
+
+ @Override
+ public void setLoggingContext(final Supplier<String> loggingContext) {
+ this.log = new DynamicPrefixLogger(
+ () -> loggingContext.get() + logPrefix,
+ LoggerFactory.getLogger(StreamsPartitionAssignor.class)
+ );
+ }
private static class AssignedPartition implements Comparable<AssignedPartition> {
@@ -204,7 +216,6 @@
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
logPrefix = assignorConfiguration.logPrefix();
- log = new LogContext(logPrefix).logger(getClass());
usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);
final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();