MINOR: Add generation to consumer assignor logs
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9838e7d..b965b91 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -68,6 +68,7 @@
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -83,6 +84,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
@@ -171,13 +173,23 @@
               metricGrpPrefix,
               time);
         this.rebalanceConfig = rebalanceConfig;
-        this.log = logContext.logger(ConsumerCoordinator.class);
+        final Supplier<String> dynamicPrefix =
+            () -> logContext.logPrefix() + "[generationId=" + generation().generationId + "] ";
+        this.log = new DynamicPrefixLogger(
+            dynamicPrefix,
+            LoggerFactory.getLogger(ConsumerCoordinator.class)
+        );
         this.metadata = metadata;
         this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch(), metadata.updateVersion());
         this.subscriptions = subscriptions;
         this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
         this.autoCommitEnabled = autoCommitEnabled;
         this.autoCommitIntervalMs = autoCommitIntervalMs;
+        for (final ConsumerPartitionAssignor assignor : assignors) {
+            if (assignor instanceof ContextualLogging) {
+                ((ContextualLogging) assignor).setLoggingContext(dynamicPrefix);
+            }
+        }
         this.assignors = assignors;
         this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
@@ -677,8 +689,8 @@
         isLeader = true;
 
         if (skipAssignment) {
-            log.info("Skipped assignment for returning static leader at generation {}. The static leader " +
-                "will continue with its existing assignment.", generation().generationId);
+            log.info("Skipped assignment for returning static leader. The static leader " +
+                "will continue with its existing assignment.");
             assignmentSnapshot = metadataSnapshot;
             return Collections.emptyMap();
         }
@@ -699,7 +711,7 @@
         // we must take the assignment snapshot after.
         assignmentSnapshot = metadataSnapshot;
 
-        log.info("Finished assignment for group at generation {}: {}", generation().generationId, assignments);
+        log.info("Finished assignment for group: {}", assignments);
 
         Map<String, ByteBuffer> groupAssignment = new HashMap<>();
         for (Map.Entry<String, Assignment> assignmentEntry : assignments.entrySet()) {
@@ -1218,7 +1230,7 @@
     private RequestFuture<Void> maybeAutoCommitOffsetsAsync() {
         if (autoCommitEnabled)
             return autoCommitOffsetsAsync();
-        return null;    
+        return null;
     }
 
     private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java
new file mode 100644
index 0000000..c076554
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ContextualLogging.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.function.Supplier;
+
+public interface ContextualLogging {
+    void setLoggingContext(Supplier<String> loggingContext);
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java
new file mode 100644
index 0000000..e2d9252
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/DynamicPrefixLogger.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.slf4j.Logger;
+import org.slf4j.Marker;
+
+import java.util.function.Supplier;
+
+public final class DynamicPrefixLogger implements Logger {
+
+    private final Supplier<String> prefix;
+    private final Logger delegate;
+
+    public DynamicPrefixLogger(final Supplier<String> prefix, final Logger delegate) {
+        this.prefix = prefix;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public String getName() {
+        return delegate.getName();
+    }
+
+    @Override
+    public boolean isTraceEnabled() {
+        return delegate.isTraceEnabled();
+    }
+
+    @Override
+    public void trace(final String msg) {
+        delegate.trace(prefix.get() + msg);
+    }
+
+    @Override
+    public void trace(final String format, final Object arg) {
+        delegate.trace(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void trace(final String format, final Object arg1, final Object arg2) {
+        delegate.trace(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void trace(final String format, final Object... arguments) {
+        delegate.trace(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void trace(final String msg, final Throwable t) {
+        delegate.trace(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isTraceEnabled(final Marker marker) {
+        return delegate.isTraceEnabled(marker);
+    }
+
+    @Override
+    public void trace(final Marker marker, final String msg) {
+        delegate.trace(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void trace(final Marker marker, final String format, final Object arg) {
+        delegate.trace(marker, prefix.get() + format, arg);
+    }
+
+    @Override
+    public void trace(final Marker marker,
+                      final String format,
+                      final Object arg1,
+                      final Object arg2) {
+        delegate.trace(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void trace(final Marker marker, final String format, final Object... argArray) {
+        delegate.trace(marker, prefix.get() + format, argArray);
+    }
+
+    @Override
+    public void trace(final Marker marker, final String msg, final Throwable t) {
+        delegate.trace(marker, prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isDebugEnabled() {
+        return delegate.isDebugEnabled();
+    }
+
+    @Override
+    public void debug(final String msg) {
+        delegate.debug(prefix.get() + msg);
+    }
+
+    @Override
+    public void debug(final String format, final Object arg) {
+        delegate.debug(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void debug(final String format, final Object arg1, final Object arg2) {
+        delegate.debug(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void debug(final String format, final Object... arguments) {
+        delegate.debug(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void debug(final String msg, final Throwable t) {
+        delegate.debug(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isDebugEnabled(final Marker marker) {
+        return delegate.isDebugEnabled(marker);
+    }
+
+    @Override
+    public void debug(final Marker marker, final String msg) {
+        delegate.debug(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void debug(final Marker marker, final String format, final Object arg) {
+        delegate.debug(marker, prefix.get() + format, arg);
+    }
+
+    @Override
+    public void debug(final Marker marker,
+                      final String format,
+                      final Object arg1,
+                      final Object arg2) {
+        delegate.debug(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void debug(final Marker marker, final String format, final Object... arguments) {
+        delegate.debug(marker, prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void debug(final Marker marker, final String msg, final Throwable t) {
+        delegate.debug(marker, prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isInfoEnabled() {
+        return delegate.isInfoEnabled();
+    }
+
+    @Override
+    public void info(final String msg) {
+        delegate.info(prefix.get() + msg);
+    }
+
+    @Override
+    public void info(final String format, final Object arg) {
+        delegate.info(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void info(final String format, final Object arg1, final Object arg2) {
+        delegate.info(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void info(final String format, final Object... arguments) {
+        delegate.info(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void info(final String msg, final Throwable t) {
+        delegate.info(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isInfoEnabled(final Marker marker) {
+        return delegate.isInfoEnabled(marker);
+    }
+
+    @Override
+    public void info(final Marker marker, final String msg) {
+        delegate.info(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void info(final Marker marker, final String format, final Object arg) {
+        delegate.info(marker, prefix.get() + format, arg);
+    }
+
+    @Override
+    public void info(final Marker marker,
+                     final String format,
+                     final Object arg1,
+                     final Object arg2) {
+        delegate.info(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void info(final Marker marker, final String format, final Object... arguments) {
+        delegate.info(marker, prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void info(final Marker marker, final String msg, final Throwable t) {
+        delegate.info(marker, prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isWarnEnabled() {
+        return delegate.isWarnEnabled();
+    }
+
+    @Override
+    public void warn(final String msg) {
+        delegate.warn(prefix.get() + msg);
+    }
+
+    @Override
+    public void warn(final String format, final Object arg) {
+        delegate.warn(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void warn(final String format, final Object... arguments) {
+        delegate.warn(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void warn(final String format, final Object arg1, final Object arg2) {
+        delegate.warn(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void warn(final String msg, final Throwable t) {
+        delegate.warn(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isWarnEnabled(final Marker marker) {
+        return delegate.isWarnEnabled(marker);
+    }
+
+    @Override
+    public void warn(final Marker marker, final String msg) {
+        delegate.warn(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void warn(final Marker marker, final String format, final Object arg) {
+        delegate.warn(marker, prefix.get() + format, arg);
+    }
+
+    @Override
+    public void warn(final Marker marker,
+                     final String format,
+                     final Object arg1,
+                     final Object arg2) {
+        delegate.warn(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void warn(final Marker marker, final String format, final Object... arguments) {
+        delegate.warn(marker, prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void warn(final Marker marker, final String msg, final Throwable t) {
+        delegate.warn(marker, prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isErrorEnabled() {
+        return delegate.isErrorEnabled();
+    }
+
+    @Override
+    public void error(final String msg) {
+        delegate.error(prefix.get() + msg);
+    }
+
+    @Override
+    public void error(final String format, final Object arg) {
+        delegate.error(prefix.get() + format, arg);
+    }
+
+    @Override
+    public void error(final String format, final Object arg1, final Object arg2) {
+        delegate.error(prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void error(final String format, final Object... arguments) {
+        delegate.error(prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void error(final String msg, final Throwable t) {
+        delegate.error(prefix.get() + msg, t);
+    }
+
+    @Override
+    public boolean isErrorEnabled(final Marker marker) {
+        return delegate.isErrorEnabled(marker);
+    }
+
+    @Override
+    public void error(final Marker marker, final String msg) {
+        delegate.error(marker, prefix.get() + msg);
+    }
+
+    @Override
+    public void error(final Marker marker, final String format, final Object arg) {
+        delegate.error(marker, format, arg);
+    }
+
+    @Override
+    public void error(final Marker marker,
+                      final String format,
+                      final Object arg1,
+                      final Object arg2) {
+        delegate.error(marker, prefix.get() + format, arg1, arg2);
+    }
+
+    @Override
+    public void error(final Marker marker, final String format, final Object... arguments) {
+        delegate.error(marker, prefix.get() + format, arguments);
+    }
+
+    @Override
+    public void error(final Marker marker, final String msg, final Throwable t) {
+        delegate.error(marker, prefix.get() + msg, t);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 288d5d2..c68e5ee 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -21,6 +21,8 @@
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.ContextualLogging;
+import org.apache.kafka.clients.consumer.internals.DynamicPrefixLogger;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
@@ -29,7 +31,6 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.MissingSourceTopicException;
@@ -52,6 +53,7 @@
 import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
 import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.time.Instant;
@@ -78,7 +80,6 @@
 import java.util.stream.Collectors;
 
 import static java.util.UUID.randomUUID;
-
 import static org.apache.kafka.common.utils.Utils.filterMap;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchCommittedOffsets;
 import static org.apache.kafka.streams.processor.internals.ClientUtils.fetchEndOffsetsFuture;
@@ -87,10 +88,21 @@
 import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
 import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
 
-public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
+public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable,
+    ContextualLogging {
 
-    private Logger log;
+    // set first via configure()
     private String logPrefix;
+    // overwritten via setLoggingContext() if it is called.
+    private Logger log = LoggerFactory.getLogger(StreamsPartitionAssignor.class);
+
+    @Override
+    public void setLoggingContext(final Supplier<String> loggingContext) {
+        this.log = new DynamicPrefixLogger(
+            () -> loggingContext.get() + logPrefix,
+            LoggerFactory.getLogger(StreamsPartitionAssignor.class)
+        );
+    }
 
     private static class AssignedPartition implements Comparable<AssignedPartition> {
 
@@ -204,7 +216,6 @@
         final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
 
         logPrefix = assignorConfiguration.logPrefix();
-        log = new LogContext(logPrefix).logger(getClass());
         usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);
 
         final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();