Merge branch 'trunk' into 0.10.0
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c457c83..5576431 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -584,7 +584,8 @@
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
-            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
+                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
             this.subscriptions = new SubscriptionState(offsetResetStrategy);
             List<PartitionAssignor> assignors = config.getConfiguredInstances(
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c79d8e7..496a114 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,7 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -217,13 +218,25 @@
             }
 
             RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
+            future.addListener(new RequestFutureListener<ByteBuffer>() {
+                @Override
+                public void onSuccess(ByteBuffer value) {
+                    // handle join completion in the callback so that the callback will be invoked
+                    // even if the consumer is woken up before finishing the rebalance
+                    onJoinComplete(generation, memberId, protocol, value);
+                    needsJoinPrepare = true;
+                    heartbeatTask.reset();
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    // we handle failures below after the request finishes. if the join completes
+                    // after having been woken up, the exception is ignored and we will rejoin
+                }
+            });
             client.poll(future);
 
-            if (future.succeeded()) {
-                onJoinComplete(generation, memberId, protocol, future.value());
-                needsJoinPrepare = true;
-                heartbeatTask.reset();
-            } else {
+            if (future.failed()) {
                 RuntimeException exception = future.exception();
                 if (exception instanceof UnknownMemberIdException ||
                         exception instanceof RebalanceInProgressException ||
@@ -521,6 +534,7 @@
     protected void coordinatorDead() {
         if (this.coordinator != null) {
             log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
+            client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
             this.coordinator = null;
         }
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a364987..86b60d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -422,9 +422,6 @@
                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                     if (exception == null) {
                         reschedule(now + interval);
-                    } else if (exception instanceof SendFailedException) {
-                        log.debug("Failed to send automatic offset commit for group {}", groupId);
-                        reschedule(now);
                     } else {
                         log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
                         reschedule(now + interval);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index b70994d..4119954 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -40,12 +41,6 @@
 /**
  * Higher level consumer access to the network layer with basic support for futures and
  * task scheduling. NOT thread-safe!
- *
- * TODO: The current implementation is simplistic in that it provides a facility for queueing requests
- * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time
- * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to
- * understand, but there are opportunities to provide timeout or retry capabilities in the future.
- * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
  */
 public class ConsumerNetworkClient implements Closeable {
     private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
@@ -57,17 +52,20 @@
     private final Metadata metadata;
     private final Time time;
     private final long retryBackoffMs;
+    private final long unsentExpiryMs;
     // wakeup enabled flag need to be volatile since it is allowed to be accessed concurrently
     volatile private boolean wakeupsEnabled = true;
 
     public ConsumerNetworkClient(KafkaClient client,
                                  Metadata metadata,
                                  Time time,
-                                 long retryBackoffMs) {
+                                 long retryBackoffMs,
+                                 long requestTimeoutMs) {
         this.client = client;
         this.metadata = metadata;
         this.time = time;
         this.retryBackoffMs = retryBackoffMs;
+        this.unsentExpiryMs = requestTimeoutMs;
     }
 
     /**
@@ -227,8 +225,8 @@
         // cleared or a connect finished in the poll
         trySend(now);
 
-        // fail all requests that couldn't be sent
-        failUnsentRequests();
+        // fail requests that couldn't be sent if they have expired
+        failExpiredRequests(now);
     }
 
     /**
@@ -274,29 +272,48 @@
             Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
             Node node = requestEntry.getKey();
             if (client.connectionFailed(node)) {
+                // Remove entry before invoking request callback to avoid callbacks handling
+                // coordinator failures traversing the unsent list again.
+                iterator.remove();
                 for (ClientRequest request : requestEntry.getValue()) {
                     RequestFutureCompletionHandler handler =
                             (RequestFutureCompletionHandler) request.callback();
                     handler.onComplete(new ClientResponse(request, now, true, null));
                 }
-                iterator.remove();
             }
         }
     }
 
-    private void failUnsentRequests() {
-        // clear all unsent requests and fail their corresponding futures
-        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
-            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
-            while (iterator.hasNext()) {
-                ClientRequest request = iterator.next();
-                RequestFutureCompletionHandler handler =
-                        (RequestFutureCompletionHandler) request.callback();
-                handler.raise(SendFailedException.INSTANCE);
+    private void failExpiredRequests(long now) {
+        // clear all expired unsent requests and fail their corresponding futures
+        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
+            Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
+            while (requestIterator.hasNext()) {
+                ClientRequest request = requestIterator.next();
+                if (request.createdTimeMs() < now - unsentExpiryMs) {
+                    RequestFutureCompletionHandler handler =
+                            (RequestFutureCompletionHandler) request.callback();
+                    handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
+                    requestIterator.remove();
+                } else
+                    break;
+            }
+            if (requestEntry.getValue().isEmpty())
                 iterator.remove();
+        }
+    }
+
+    protected void failUnsentRequests(Node node, RuntimeException e) {
+        // clear unsent requests to node and fail their corresponding futures
+        List<ClientRequest> unsentRequests = unsent.remove(node);
+        if (unsentRequests != null) {
+            for (ClientRequest request : unsentRequests) {
+                RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+                handler.raise(e);
             }
         }
-        unsent.clear();
     }
 
     private boolean trySend(long now) {
@@ -320,7 +337,6 @@
     private void clientPoll(long timeout, long now) {
         client.poll(timeout, now);
         if (wakeupsEnabled && wakeup.get()) {
-            failUnsentRequests();
             wakeup.set(false);
             throw new WakeupException();
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
deleted file mode 100644
index 3312a2c..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.apache.kafka.common.errors.RetriableException;
-
-/**
- * Exception used in {@link ConsumerNetworkClient} to indicate the failure
- * to transmit a request to the networking layer. This could be either because
- * the client is still connecting to the given host or its send buffer is full.
- */
-public class SendFailedException extends RetriableException {
-    public static final SendFailedException INSTANCE = new SendFailedException();
-
-    private static final long serialVersionUID = 1L;
-
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 6acc059..d60e28e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -711,13 +711,11 @@
         Integer partition = record.partition();
         if (partition != null) {
             List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
-            int numPartitions = partitions.size();
+            int lastPartition = partitions.size() - 1;
             // they have given us a partition, use it
-            if (partition < 0 || partition >= numPartitions)
-                throw new IllegalArgumentException("Invalid partition given with record: " + partition
-                                                   + " is not in the range [0..."
-                                                   + numPartitions
-                                                   + "].");
+            if (partition < 0 || partition > lastPartition) {
+                throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
+            }
             return partition;
         }
         return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
index c0949e3..554b885 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
@@ -18,6 +18,7 @@
  * not yet been created.
  */
 public class GroupCoordinatorNotAvailableException extends RetriableException {
+    public static final GroupCoordinatorNotAvailableException INSTANCE = new GroupCoordinatorNotAvailableException();
 
     private static final long serialVersionUID = 1L;
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
index d1d3b24..41e9160 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
@@ -31,7 +31,7 @@
         String key = "key";
         String value = "value";
 
-        ConsumerRecord record = new ConsumerRecord(topic, partition, offset, key, value);
+        ConsumerRecord<String, String> record = new ConsumerRecord<>(topic, partition, offset, key, value);
         assertEquals(topic, record.topic());
         assertEquals(partition, record.partition());
         assertEquals(offset, record.offset());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 623e5ef..b864d69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -35,6 +35,7 @@
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.internals.TopicConstants;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
@@ -103,7 +104,7 @@
         this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
         this.metadata = new Metadata(0, Long.MAX_VALUE);
         this.metadata.update(cluster, time.milliseconds());
-        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.defaultOffsetCommitCallback = new MockCommitCallback();
@@ -323,6 +324,45 @@
     }
 
     @Test
+    public void testWakeupDuringJoin() {
+        final String consumerId = "leader";
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        // ensure metadata is up-to-date for leader
+        metadata.setTopics(Arrays.asList(topicName));
+        metadata.update(cluster, time.milliseconds());
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
+        partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+
+        // prepare only the first half of the join and then trigger the wakeup
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+        consumerClient.wakeup();
+
+        try {
+            coordinator.ensurePartitionAssignment();
+        } catch (WakeupException e) {
+            // ignore
+        }
+
+        // now complete the second half
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        assertFalse(subscriptions.partitionAssignmentNeeded());
+        assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+        assertEquals(1, rebalanceListener.revokedCount);
+        assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+        assertEquals(1, rebalanceListener.assignedCount);
+        assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+    }
+
+    @Test
     public void testNormalJoinGroupFollower() {
         final String consumerId = "consumer";
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 1692010..f0f2a97 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -27,6 +27,8 @@
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -40,7 +42,7 @@
     private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
     private Node node = cluster.nodes().get(0);
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
-    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
 
     @Test
     public void send() {
@@ -104,6 +106,65 @@
         assertTrue(future.isDone());
     }
 
+    @Test
+    public void sendExpiry() throws InterruptedException {
+        long unsentExpiryMs = 10;
+        final AtomicBoolean isReady = new AtomicBoolean();
+        final AtomicBoolean disconnected = new AtomicBoolean();
+        client = new MockClient(time) {
+            @Override
+            public boolean ready(Node node, long now) {
+                if (isReady.get())
+                    return super.ready(node, now);
+                else
+                    return false;
+            }
+            @Override
+            public boolean connectionFailed(Node node) {
+                return disconnected.get();
+            }
+        };
+        // Queue first send, sleep long enough for this to expire and then queue second send
+        consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, unsentExpiryMs);
+        RequestFuture<ClientResponse> future1 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertEquals(1, consumerClient.pendingRequestCount(node));
+        assertFalse(future1.isDone());
+
+        time.sleep(unsentExpiryMs + 1);
+        RequestFuture<ClientResponse> future2 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        assertEquals(2, consumerClient.pendingRequestCount());
+        assertEquals(2, consumerClient.pendingRequestCount(node));
+        assertFalse(future2.isDone());
+
+        // First send should have expired and second send still pending
+        consumerClient.poll(0);
+        assertTrue(future1.isDone());
+        assertFalse(future1.succeeded());
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertEquals(1, consumerClient.pendingRequestCount(node));
+        assertFalse(future2.isDone());
+
+        // Enable send, the un-expired send should succeed on poll
+        isReady.set(true);
+        client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+        consumerClient.poll(future2);
+        ClientResponse clientResponse = future2.value();
+        HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+        assertEquals(Errors.NONE.code(), response.errorCode());
+
+        // Disable ready flag to delay send and queue another send. Disconnection should remove pending send
+        isReady.set(false);
+        RequestFuture<ClientResponse> future3 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+        assertEquals(1, consumerClient.pendingRequestCount());
+        assertEquals(1, consumerClient.pendingRequestCount(node));
+        disconnected.set(true);
+        consumerClient.poll(0);
+        assertTrue(future3.isDone());
+        assertFalse(future3.succeeded());
+        assertEquals(0, consumerClient.pendingRequestCount());
+        assertEquals(0, consumerClient.pendingRequestCount(node));
+    }
 
     private HeartbeatRequest heartbeatRequest() {
         return new HeartbeatRequest("group", 1, "memberId");
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 58c3841..9002e81 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -86,7 +86,7 @@
     private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
     private Metrics metrics = new Metrics(time);
     private static final double EPSILON = 0.0001;
-    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
 
     private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
     private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 7294ed4..57028ef 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -99,7 +99,8 @@
                     config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
                     config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
-            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
+                    config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
             this.coordinator = new WorkerCoordinator(this.client,
                     config.getString(DistributedConfig.GROUP_ID_CONFIG),
                     config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index abb62b9..bf33cb3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -88,7 +88,7 @@
         this.client = new MockClient(time);
         this.metadata = new Metadata(0, Long.MAX_VALUE);
         this.metadata.update(cluster, time.milliseconds());
-        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+        this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
         this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index b857315..ef76ffc 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -20,7 +20,7 @@
 import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
 import kafka.utils.Logging
 import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, SendFailedException}
+import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.DisconnectException
@@ -43,21 +43,15 @@
   private def send(target: Node,
                    api: ApiKeys,
                    request: AbstractRequest): Struct = {
-    var now = time.milliseconds()
-    val deadline = now + requestTimeoutMs
     var future: RequestFuture[ClientResponse] = null
 
-    do {
-      future = client.send(target, api, request)
-      client.poll(future)
+    future = client.send(target, api, request)
+    client.poll(future)
 
-      if (future.succeeded())
-        return future.value().responseBody()
-
-      now = time.milliseconds()
-    } while (now < deadline && future.exception().isInstanceOf[SendFailedException])
-
-    throw future.exception()
+    if (future.succeeded())
+      return future.value().responseBody()
+    else
+      throw future.exception()
   }
 
   private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
@@ -244,7 +238,8 @@
       networkClient,
       metadata,
       time,
-      DefaultRetryBackoffMs)
+      DefaultRetryBackoffMs,
+      DefaultRequestTimeoutMs)
 
     new AdminClient(
       time,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f050e27..22657f4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -522,8 +522,12 @@
             getReplicaOrException(topic, partition)
 
           // decide whether to only fetch committed data (i.e. messages below high watermark)
-          val maxOffsetOpt = if (readOnlyCommitted)
-            Some(localReplica.highWatermark.messageOffset)
+          val maxOffsetOpt = if (readOnlyCommitted) {
+            val maxOffset = localReplica.highWatermark.messageOffset
+            if(offset > maxOffset)
+              throw new OffsetOutOfRangeException("Request for offset %d beyond high watermark %d when reading from only committed offsets".format(offset, maxOffset))
+            Some(maxOffset)
+          }
           else
             None
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4d75d53..3f6a275 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -228,6 +228,7 @@
   /**
    * Test reading at the boundary of the log, specifically
    * - reading from the logEndOffset should give an empty message set
+   * - reading from the the maxOffset should give an empty message set
    * - reading beyond the log end offset should throw an OffsetOutOfRangeException
    */
   @Test
@@ -236,19 +237,21 @@
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
     val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
-    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes)
+    log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
+    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes)
     try {
-      log.read(0, 1024)
+      log.read(0, 1025)
       fail("Expected exception on invalid read.")
     } catch {
       case e: OffsetOutOfRangeException => "This is good."
     }
     try {
-      log.read(1025, 1000)
+      log.read(1026, 1000)
       fail("Expected exception on invalid read.")
     } catch {
       case e: OffsetOutOfRangeException => // This is good.
     }
+    assertEquals("Reading from maxOffset should produce 0 byte read.", 0, log.read(1024, 1000, Some(1024)).messageSet.sizeInBytes)
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ee14af4..c2c670e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -35,8 +35,8 @@
 import org.apache.kafka.common.utils.{MockTime => JMockTime}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.easymock.EasyMock
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
+import org.junit.Assert.{assertEquals, assertTrue, assertFalse}
+import org.junit.{Test, Before, After}
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
@@ -44,17 +44,28 @@
 class ReplicaManagerTest {
 
   val topic = "test-topic"
+  val time = new MockTime()
+  val jTime = new JMockTime
+  val metrics = new Metrics
+  var zkClient : ZkClient = _
+  var zkUtils : ZkUtils = _
+    
+  @Before
+  def setUp() {
+    zkClient = EasyMock.createMock(classOf[ZkClient])
+    zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
+  }
+  
+  @After
+  def tearDown() {
+    metrics.close();
+  }
 
   @Test
   def testHighWaterMarkDirectoryMapping() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
     try {
@@ -64,7 +75,6 @@
     } finally {
       // shutdown the replica manager upon test completion
       rm.shutdown(false)
-      metrics.close()
     }
   }
 
@@ -73,12 +83,7 @@
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
     try {
@@ -88,7 +93,6 @@
     } finally {
       // shutdown the replica manager upon test completion
       rm.shutdown(checkpointHW = false)
-      metrics.close()
     }
   }
 
@@ -96,12 +100,7 @@
   def testIllegalRequiredAcks() {
     val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), Option(this.getClass.getName))
     try {
@@ -116,7 +115,6 @@
         responseCallback = callback)
     } finally {
       rm.shutdown(checkpointHW = false)
-      metrics.close()
     }
 
     TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
@@ -127,12 +125,7 @@
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = KafkaConfig.fromProps(props)
-    val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
-    val time = new MockTime()
-    val jTime = new JMockTime
-    val metrics = new Metrics
     val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false))
 
@@ -192,7 +185,80 @@
       assertTrue(fetchCallbackFired)
     } finally {
       rm.shutdown(checkpointHW = false)
-      metrics.close()
+    }
+  }
+  
+  @Test
+  def testFetchBeyondHighWatermarkNotAllowedForConsumer() {
+    val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+    props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+    props.put("broker.id", Int.box(0))
+    val config = KafkaConfig.fromProps(props)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+    val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
+      new AtomicBoolean(false), Option(this.getClass.getName))
+    try {
+      val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1), new Broker(1, "host2", 2))
+      val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+      EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+      EasyMock.replay(metadataCache)
+      
+      val brokerList : java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
+      val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
+      
+      val partition = rm.getOrCreatePartition(topic, 0)
+      partition.getOrCreateReplica(0)
+      
+      // Make this replica the leader.
+      val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
+        collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava)
+      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+      rm.getLeaderReplicaIfLocal(topic, 0)
+
+      def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
+      
+      // Append a message.
+      for(i <- 1 to 2)
+        rm.appendMessages(
+          timeout = 1000,
+          requiredAcks = -1,
+          internalTopicsAllowed = false,
+          messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new Message("message %d".format(i).getBytes))),
+          responseCallback = produceCallback)
+      
+      var fetchCallbackFired = false
+      var fetchError = 0
+      def fetchCallback(responseStatus: Map[TopicAndPartition, FetchResponsePartitionData]) = {
+        fetchError = responseStatus.values.head.error
+        fetchCallbackFired = true
+      }
+      
+      // Fetch a message above the high watermark as a follower
+      rm.fetchMessages(
+        timeout = 1000,
+        replicaId = 1,
+        fetchMinBytes = 1,
+        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
+        responseCallback = fetchCallback)
+        
+      
+      assertTrue(fetchCallbackFired)
+      assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
+      fetchCallbackFired = false
+      
+      // Fetch a message above the high watermark as a consumer
+      rm.fetchMessages(
+        timeout = 1000,
+        replicaId = -1,
+        fetchMinBytes = 1,
+        fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
+        responseCallback = fetchCallback)
+          
+        assertTrue(fetchCallbackFired)
+        assertEquals("Should give OffsetOutOfRangeException", Errors.OFFSET_OUT_OF_RANGE.code, fetchError)
+    } finally {
+      rm.shutdown(checkpointHW = false)
     }
   }
 }
diff --git a/gradle.properties b/gradle.properties
index b058e58..0a612f6 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,7 +16,7 @@
 group=org.apache.kafka
 # NOTE: When you change this version number, you should also make sure to update
 # the version numbers in tests/kafkatest/__init__.py and kafka-merge-pr.py.
-version=0.10.0.0-SNAPSHOT
+version=0.10.1.0-SNAPSHOT
 scalaVersion=2.10.6
 task=build
 org.gradle.jvmargs=-XX:MaxPermSize=512m -Xmx1024m -Xss2m
diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py
index e124105..f26a0a9 100644
--- a/kafka-merge-pr.py
+++ b/kafka-merge-pr.py
@@ -72,7 +72,7 @@
 
 DEV_BRANCH_NAME = "trunk"
 
-DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.0.0")
+DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.1.0")
 
 def get_json(url):
     try:
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
new file mode 100644
index 0000000..83064e8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+
+/**
+ * The ForeachAction interface for performing an action on a key-value pair.
+ * Note that this action is stateless. If stateful processing is required, consider
+ * using {@link KStream#transform(TransformerSupplier, String...)} or
+ * {@link KStream#process(ProcessorSupplier, String...)} instead.
+ *
+ * @param <K>   original key type
+ * @param <V>   original value type
+ */
+public interface ForeachAction<K, V> {
+    void apply(K key, V value);
+}
+
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index e4933cb..27475aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -64,6 +64,52 @@
     <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
     /**
+     * Print the elements of this stream to System.out
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void print();
+
+
+    /**
+     * Print the elements of this stream to System.out
+     *
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     *
+     *                 Implementors will need to override toString for keys and values that are not of
+     *                 type String, Integer etc to get meaningful information.
+     */
+    void print(Serde<K> keySerde, Serde<V> valSerde);
+
+
+    /**
+     * Write the elements of this stream to a file at the given path.
+     *
+     * @param filePath name of file to write to
+     *
+     *                 Implementors will need to override toString for keys and values that are not of
+     *                 type String, Integer etc to get meaningful information.
+     */
+    void writeAsText(String filePath);
+
+    /**
+     * @param filePath name of file to write to
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     *
+     *                 Implementors will need to override toString for keys and values that are not of
+     *                 type String, Integer etc to get meaningful information.
+     */
+
+    void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
+
+    /**
      * Create a new instance of {@link KStream} by transforming each element in this stream into zero or more elements in the new stream.
      *
      * @param mapper        the instance of {@link KeyValueMapper}
@@ -101,6 +147,14 @@
     KStream<K, V> through(String topic);
 
     /**
+     * Perform an action on each element of {@link KStream}.
+     * Note that this is a terminal operation that returns void.
+     *
+     * @param action An action to perform on each element
+     */
+    void foreach(ForeachAction<K, V> action);
+
+    /**
      * Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
      * using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
      * This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 581ee28..bb6878f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -53,6 +53,49 @@
      */
     <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
 
+
+    /**
+     * Print the elements of this stream to System.out
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void print();
+
+    /**
+     * Print the elements of this stream to System.out
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void print(Serde<K> keySerde, Serde<V> valSerde);
+
+    /**
+     * Write the elements of this stream to a file at the given path.
+     * @param filePath name of file to write to
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void writeAsText(String filePath);
+
+    /**
+     *
+     * @param filePath name of file to write to
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default serde defined in the configs will be used
+     *
+     * Implementors will need to override toString for keys and values that are not of
+     * type String, Integer etc to get meaningful information.
+     */
+    void  writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
+
     /**
      * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
      * using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
@@ -297,4 +340,12 @@
      * @param <K1>          the key type of the aggregated {@link KTable}
      */
     <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name);
+
+    /**
+     * Perform an action on each element of {@link KTable}.
+     * Note that this is a terminal operation that returns void.
+     *
+     * @param action An action to perform on each element
+     */
+    void foreach(ForeachAction<K, V> action);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 8069dca..5197e94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -32,7 +32,7 @@
      * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
      * that contains it is initialized.
      * <p>
-     * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+     * If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
      * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
      *
      * @param context the context; may not be null
@@ -44,17 +44,18 @@
      *
      * @param key the key for the message
      * @param value the value for the message
-     * @return new value
+     * @return new value; if null no key-value pair will be forwarded to down stream
      */
     R transform(K key, V value);
 
     /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+     * Perform any periodic operations and possibly generate a key, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
      * during {@link #init(ProcessorContext) initialization}.
      *
      * @param timestamp the stream time when this method is being called
+     * @return new value; if null it will not be forwarded to down stream
      */
-    void punctuate(long timestamp);
+    R punctuate(long timestamp);
 
     /**
      * Close this processor and clean up any resources.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 1a0679d..63214fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -47,12 +47,13 @@
     R transform(V value);
 
     /**
-     * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+     * Perform any periodic operations and possibly return a new value, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
      * during {@link #init(ProcessorContext) initialization}.
      *
      * @param timestamp the stream time when this method is being called
+     * @return new value; if null it will not be forwarded to down stream
      */
-    void punctuate(long timestamp);
+    R punctuate(long timestamp);
 
     /**
      * Close this processor and clean up any resources.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index e9b7cad..5ea0791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
@@ -39,8 +40,21 @@
 
     @Override
     public byte[] serialize(String topic, Change<T> data) {
+        byte[] serializedKey;
+
         // only one of the old / new values would be not null
-        byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue : data.oldValue);
+        if (data.newValue != null) {
+            if (data.oldValue != null)
+                throw new StreamsException("Both old and new values are not null (" + data.oldValue
+                        + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+
+            serializedKey = inner.serialize(topic, data.newValue);
+        } else {
+            if (data.oldValue == null)
+                throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
+
+            serializedKey = inner.serialize(topic, data.oldValue);
+        }
 
         ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
         buf.put(serializedKey);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
new file mode 100644
index 0000000..2fd7ef9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamForeach<K, V> implements ProcessorSupplier<K, V> {
+
+    private final ForeachAction<K, V> action;
+
+    public KStreamForeach(ForeachAction<K, V> action) {
+        this.action = action;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamForeachProcessor();
+    }
+
+    private class KStreamForeachProcessor extends AbstractProcessor<K, V> {
+        @Override
+        public void process(K key, V value) {
+            action.apply(key, value);
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0fb3984..97a7aac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,12 +20,14 @@
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -41,7 +43,9 @@
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.Stores;
-
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.lang.reflect.Array;
 import java.util.HashSet;
 import java.util.Set;
@@ -78,9 +82,9 @@
 
     private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
 
-    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
+    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
 
-    private static final String SELECT_NAME = "KSTREAM-SELECT-";
+    private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
 
     public static final String SINK_NAME = "KSTREAM-SINK-";
 
@@ -92,6 +96,8 @@
 
     private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
 
+    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
+
     public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
         super(topology, name, sourceNodes);
     }
@@ -133,6 +139,37 @@
     }
 
     @Override
+    public void print() {
+        print(null, null);
+    }
+
+    @Override
+    public void print(Serde<K> keySerde, Serde<V> valSerde) {
+        String name = topology.newName(PRINTING_NAME);
+        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
+    }
+
+
+    @Override
+    public void writeAsText(String filePath) {
+        writeAsText(filePath, null, null);
+    }
+
+    @Override
+    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
+        String name = topology.newName(PRINTING_NAME);
+        try {
+
+            PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
+            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde), this.name);
+
+        } catch (FileNotFoundException e) {
+            String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
+            throw new TopologyBuilderException(message);
+        }
+    }
+
+    @Override
     public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
         String name = topology.newName(FLATMAP_NAME);
 
@@ -201,6 +238,13 @@
     }
 
     @Override
+    public void foreach(ForeachAction<K, V> action) {
+        String name = topology.newName(FOREACH_NAME);
+
+        topology.addProcessor(name, new KStreamForeach<>(action), this.name);
+    }
+
+    @Override
     public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         return through(keySerde, valSerde, null, topic);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 5b83b28..94e0b88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -55,8 +55,12 @@
 
         @Override
         public void process(K key, V value) {
-            context().forward(key, value);
-            window.put(key, value);
+            // if the key is null, we do not need to put the record into window store
+            // since it will never be considered for join operations
+            if (key != null) {
+                context().forward(key, value);
+                window.put(key, value);
+            }
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index a4ac9b3..d8caf3c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -63,6 +64,10 @@
 
         @Override
         public void process(K key, V1 value) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KStream-KStream join operator with other window state store " + otherWindowName + " should not be null.");
+
             boolean needOuterJoin = KStreamKStreamJoin.this.outer;
 
             long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
index dfca019..92b9b59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -55,7 +55,11 @@
 
         @Override
         public void process(K key, V1 value) {
-            context().forward(key, joiner.apply(value, valueGetter.get(key)));
+            // if the key is null, we do not need proceed joining
+            // the record with the table
+            if (key != null) {
+                context().forward(key, joiner.apply(value, valueGetter.get(key)));
+            }
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 4299c66..af100a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -34,13 +35,12 @@
 
     @Override
     public Processor<K, V> get() {
-        return new KStreamTransformProcessor(transformerSupplier.get());
+        return new KStreamTransformProcessor<>(transformerSupplier.get());
     }
 
-    public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1, V1> {
+    public static class KStreamTransformProcessor<K1, V1, K2, V2> extends AbstractProcessor<K1, V1> {
 
         private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
-        private ProcessorContext context;
 
         public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) {
             this.transformer = transformer;
@@ -48,19 +48,24 @@
 
         @Override
         public void init(ProcessorContext context) {
+            super.init(context);
             transformer.init(context);
-            this.context = context;
         }
 
         @Override
         public void process(K1 key, V1 value) {
             KeyValue<K2, V2> pair = transformer.transform(key, value);
-            context.forward(pair.key, pair.value);
+
+            if (pair != null)
+                context().forward(pair.key, pair.value);
         }
 
         @Override
         public void punctuate(long timestamp) {
-            transformer.punctuate(timestamp);
+            KeyValue<K2, V2> pair = transformer.punctuate(timestamp);
+
+            if (pair != null)
+                context().forward(pair.key, pair.value);
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 6f989e6..cb9aab1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -27,18 +27,18 @@
 
     private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
 
-    public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) {
+    public KStreamTransformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier) {
         this.valueTransformerSupplier = valueTransformerSupplier;
     }
 
     @Override
     public Processor<K, V> get() {
-        return new KStreamTransformValuesProcessor(valueTransformerSupplier.get());
+        return new KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
     }
 
     public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
 
-        private final ValueTransformer valueTransformer;
+        private final ValueTransformer<V, R> valueTransformer;
         private ProcessorContext context;
 
         public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
@@ -58,7 +58,10 @@
 
         @Override
         public void punctuate(long timestamp) {
-            valueTransformer.punctuate(timestamp);
+            R ret = valueTransformer.punctuate(timestamp);
+
+            if (ret != null)
+                context.forward(null, ret);
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 76964f9..f36cc8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -72,6 +72,11 @@
 
         @Override
         public void process(K key, V value) {
+            // if the key is null, we do not need proceed aggregating the record
+            // the record with the table
+            if (key == null)
+                return;
+
             // first get the matching windows
             long timestamp = context().timestamp();
             Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index d532e79..6c05ce3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -69,6 +69,11 @@
 
         @Override
         public void process(K key, V value) {
+            // if the key is null, we do not need proceed aggregating the record
+            // the record with the table
+            if (key == null)
+                return;
+
             // first get the matching windows
             long timestamp = context().timestamp();
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 156f2db..ee2c931 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,12 +19,14 @@
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Reducer;
@@ -35,6 +37,9 @@
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.Stores;
 
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.util.Collections;
 import java.util.Set;
 
@@ -68,6 +73,8 @@
 
     public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
 
+    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
+
     private static final String REDUCE_NAME = "KTABLE-REDUCE-";
 
     private static final String SELECT_NAME = "KTABLE-SELECT-";
@@ -76,6 +83,8 @@
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
+    private static final String FOREACH_NAME = "KTABLE-FOREACH-";
+
     public final ProcessorSupplier<?, ?> processorSupplier;
 
     private final Serde<K> keySerde;
@@ -132,6 +141,36 @@
     }
 
     @Override
+    public void print() {
+        print(null, null);
+    }
+
+    @Override
+    public void print(Serde<K> keySerde, Serde<V> valSerde) {
+        String name = topology.newName(PRINTING_NAME);
+        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
+    }
+
+
+    @Override
+    public void writeAsText(String filePath) {
+        writeAsText(filePath, null, null);
+    }
+
+    @Override
+    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
+        String name = topology.newName(PRINTING_NAME);
+        try {
+            PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
+            topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde), this.name);
+        } catch (FileNotFoundException e) {
+            String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
+            throw new TopologyBuilderException(message);
+        }
+    }
+
+
+    @Override
     public KTable<K, V> through(Serde<K> keySerde,
                                 Serde<V> valSerde,
                                 StreamPartitioner<K, V> partitioner,
@@ -142,6 +181,18 @@
     }
 
     @Override
+    public void foreach(final ForeachAction<K, V> action) {
+        String name = topology.newName(FOREACH_NAME);
+        KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() {
+            @Override
+            public void apply(K key, Change<V> value) {
+                action.apply(key, value.newValue);
+            }
+        });
+        topology.addProcessor(name, processorSupplier, this.name);
+    }
+
+    @Override
     public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
         return through(keySerde, valSerde, null, topic);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 6eb27b6..24c8da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable join operator should not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 00e872e..4bf45ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable left-join operator should not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 6ab0ae9..49eed53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable outer-join operator should not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = valueGetter.get(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index a6a13fc..7443d4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -62,6 +63,10 @@
 
         @Override
         public void process(K key, Change<V1> change) {
+            // the keys should never be null
+            if (key == null)
+                throw new StreamsException("Record key for KTable right-join operator should not be null.");
+
             R newValue = null;
             R oldValue = null;
             V2 value2 = valueGetter.get(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index ff69c37..142a279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
@@ -77,6 +78,10 @@
         public void process(K key, Change<V> change) {
             KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
 
+            // the selected repartition key should never be null
+            if (newPair.key == null)
+                throw new StreamsException("Record key for KTable repartition operator should not be null.");
+
             context().forward(newPair.key, new Change<>(newPair.value, null));
 
             if (change.oldValue != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
new file mode 100644
index 0000000..d1c1d8b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.io.PrintStream;
+
+
+class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
+
+    private final PrintStream printStream;
+    private Serde<?> keySerde;
+    private Serde<?> valueSerde;
+    private boolean notStandardOut;
+
+
+    KeyValuePrinter(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde) {
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        if (printStream == null) {
+            this.printStream = System.out;
+        } else {
+            this.printStream = printStream;
+            notStandardOut = true;
+        }
+    }
+
+    KeyValuePrinter(PrintStream printStream) {
+        this(printStream, null, null);
+    }
+
+    KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde) {
+        this(System.out, keySerde, valueSerde);
+    }
+
+    KeyValuePrinter() {
+        this(System.out, null, null);
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KeyValuePrinterProcessor(this.printStream, this.keySerde, this.valueSerde);
+    }
+
+
+    private class KeyValuePrinterProcessor extends AbstractProcessor<K, V> {
+        private final PrintStream printStream;
+        private Serde<?> keySerde;
+        private Serde<?> valueSerde;
+        private ProcessorContext processorContext;
+
+        private KeyValuePrinterProcessor(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde) {
+            this.printStream = printStream;
+            this.keySerde = keySerde;
+            this.valueSerde = valueSerde;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            this.processorContext = context;
+
+            if (this.keySerde == null) {
+                keySerde = this.processorContext.keySerde();
+            }
+
+            if (this.valueSerde == null) {
+                valueSerde = this.processorContext.valueSerde();
+            }
+        }
+
+        @Override
+        public void process(K key, V value) {
+            K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
+            V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer());
+
+            printStream.println(keyToPrint + " , " + valueToPrint);
+
+            this.processorContext.forward(key, value);
+        }
+
+
+        private Object maybeDeserialize(Object receivedElement, Deserializer<?> deserializer) {
+            if (receivedElement == null) {
+                return null;
+            }
+
+            if (receivedElement instanceof byte[]) {
+                return deserializer.deserialize(this.processorContext.topic(), (byte[]) receivedElement);
+            }
+
+            return receivedElement;
+        }
+
+        @Override
+        public void close() {
+            if (notStandardOut) {
+                this.printStream.close();
+            } else {
+                this.printStream.flush();
+            }
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 434996e..815b5b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -106,39 +106,57 @@
      * Forwards a key/value pair to one of the downstream processors designated by childIndex
      * @param key key
      * @param value value
+     * @param childIndex index in list of children of this node
      */
     <K, V> void forward(K key, V value, int childIndex);
 
     /**
+     * Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
+     * @param key key
+     * @param value value
+     * @param childName name of downstream processor
+     */
+    <K, V> void forward(K key, V value, String childName);
+
+    /**
      * Requests a commit
      */
     void commit();
 
     /**
-     * Returns the topic name of the current input record
+     * Returns the topic name of the current input record; could be null if it is not
+     * available (for example, if this method is invoked from the punctuate call)
      *
      * @return the topic name
      */
     String topic();
 
     /**
-     * Returns the partition id of the current input record
+     * Returns the partition id of the current input record; could be -1 if it is not
+     * available (for example, if this method is invoked from the punctuate call)
      *
      * @return the partition id
      */
     int partition();
 
     /**
-     * Returns the offset of the current input record
+     * Returns the offset of the current input record; could be -1 if it is not
+     * available (for example, if this method is invoked from the punctuate call)
      *
      * @return the offset
      */
     long offset();
 
     /**
-     * Returns the timestamp of the current input record. The timestamp is extracted from
+     * Returns the current timestamp.
+     *
+     * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
      * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
      *
+     * If it is triggered while processing a record generated not from the source processor (for example,
+     * if this method is invoked from the punctuate call), timestamp is defined as the current
+     * task's stream time, which is defined as the smallest among all its input stream partition timestamps.
+     *
      * @return the timestamp
      */
     long timestamp();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 3d8f792..ec89d47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -135,17 +135,14 @@
      * partition timestamp among all its partitions
      */
     public long timestamp() {
-        if (queuesByTime.isEmpty()) {
-            // if there is no data in all partitions, return the smallest of their last known times
-            long timestamp = Long.MAX_VALUE;
-            for (RecordQueue queue : partitionQueues.values()) {
-                if (timestamp > queue.timestamp())
-                    timestamp = queue.timestamp();
-            }
-            return timestamp;
-        } else {
-            return queuesByTime.peek().timestamp();
+        // we should always return the smallest timestamp of all partitions
+        // to avoid group partition time goes backward
+        long timestamp = Long.MAX_VALUE;
+        for (RecordQueue queue : partitionQueues.values()) {
+            if (timestamp > queue.timestamp())
+                timestamp = queue.timestamp();
         }
+        return timestamp;
     }
 
     public int numBuffered(TopicPartition partition) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 888b89e..1c398ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -30,6 +30,8 @@
 
 public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
 
+    public static final String NONEXIST_TOPIC = "__null_topic__";
+
     private final TaskId id;
     private final StreamTask task;
     private final StreamsMetrics metrics;
@@ -118,7 +120,7 @@
         if (node == null)
             throw new TopologyBuilderException("Accessing from an unknown node");
 
-        // TODO: restore this once we fix the ValueGetter initialiation issue
+        // TODO: restore this once we fix the ValueGetter initialization issue
         //if (!node.stateStores.contains(name))
         //    throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
 
@@ -130,7 +132,12 @@
         if (task.record() == null)
             throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
 
-        return task.record().topic();
+        String topic = task.record().topic();
+
+        if (topic.equals(NONEXIST_TOPIC))
+            return null;
+        else
+            return topic;
     }
 
     @Override
@@ -168,6 +175,11 @@
     }
 
     @Override
+    public <K, V> void forward(K key, V value, String childName) {
+        task.forward(key, value, childName);
+    }
+
+    @Override
     public void commit() {
         task.needCommit();
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 6db83a1..50e3a0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -48,7 +48,7 @@
         return name;
     }
 
-    public final Processor processor() {
+    public final Processor<K, V> processor() {
         return processor;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index d7d7eee..824e20a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -43,7 +43,7 @@
                 PunctuationSchedule sched = top;
                 pq.poll();
                 punctuator.punctuate(sched.node(), timestamp);
-                pq.add(sched.next());
+                pq.add(sched.next(timestamp));
                 punctuated = true;
 
                 top = pq.peek();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index dc9a50d..98919d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -22,11 +22,11 @@
     final long interval;
 
     public PunctuationSchedule(ProcessorNode node, long interval) {
-        this(node, System.currentTimeMillis(), interval);
+        this(node, 0L, interval);
     }
 
     public PunctuationSchedule(ProcessorNode node, long time, long interval) {
-        super(node, time + interval);
+        super(node, time);
         this.interval = interval;
     }
 
@@ -34,8 +34,13 @@
         return value;
     }
 
-    public PunctuationSchedule next() {
-        return new PunctuationSchedule(value, timestamp, interval);
+    public PunctuationSchedule next(long currTimestamp) {
+        // we need to special handle the case when it is firstly triggered (i.e. the timestamp
+        // is equal to the interval) by reschedule based on the currTimestamp
+        if (timestamp == 0L)
+            return new PunctuationSchedule(value, currTimestamp + interval, interval);
+        else
+            return new PunctuationSchedule(value, timestamp + interval, interval);
     }
 
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 3ad06e2..468fe74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -108,46 +108,51 @@
 
     @Override
     public StateStore getStateStore(String name) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
     }
 
     @Override
     public String topic() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: topic() not supported in standby tasks.");
     }
 
     @Override
     public int partition() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: partition() not supported in standby tasks.");
     }
 
     @Override
     public long offset() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: offset() not supported in standby tasks.");
     }
 
     @Override
     public long timestamp() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: timestamp() not supported in standby tasks.");
     }
 
     @Override
     public <K, V> void forward(K key, V value) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
     @Override
     public <K, V> void forward(K key, V value, int childIndex) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
+    }
+
+    @Override
+    public <K, V> void forward(K key, V value, String childName) {
+        throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
     }
 
     @Override
     public void commit() {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: commit() not supported in standby tasks.");
     }
 
     @Override
     public void schedule(long interval) {
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 61aeced..53d0a8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -43,6 +43,8 @@
 
     private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
 
+    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
+
     private final int maxBufferedSize;
 
     private final PartitionGroup partitionGroup;
@@ -202,11 +204,11 @@
 
     /**
      * Possibly trigger registered punctuation functions if
-     * current time has reached the defined stamp
-     *
-     * @param timestamp
+     * current partition group timestamp has reached the defined stamp
      */
-    public boolean maybePunctuate(long timestamp) {
+    public boolean maybePunctuate() {
+        long timestamp = partitionGroup.timestamp();
+
         return punctuationQueue.mayPunctuate(timestamp, this);
     }
 
@@ -216,10 +218,13 @@
             throw new IllegalStateException("Current node is not null");
 
         currNode = node;
+        currRecord = new StampedRecord(DUMMY_RECORD, timestamp);
+
         try {
             node.processor().punctuate(timestamp);
         } finally {
             currNode = null;
+            currRecord = null;
         }
     }
 
@@ -342,4 +347,20 @@
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value, String childName) {
+        ProcessorNode thisNode = currNode;
+        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+            if (childNode.name().equals(childName)) {
+                currNode = childNode;
+                try {
+                    childNode.process(key, value);
+                } finally {
+                    currNode = thisNode;
+                }
+                break;
+            }
+        }
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c2a8e06..38dc356 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -341,8 +341,9 @@
 
             totalNumBuffered = 0;
 
+            // try to process one fetch record from each task via the topology, and also trigger punctuate
+            // functions if necessary, which may result in more records going through the topology in this loop
             if (!activeTasks.isEmpty()) {
-                // try to process one record from each task
                 for (StreamTask task : activeTasks.values()) {
                     long startProcess = time.milliseconds();
 
@@ -431,7 +432,9 @@
         try {
             long now = time.milliseconds();
 
-            if (task.maybePunctuate(now))
+            // check whether we should punctuate based on the task's partition group timestamp;
+            // which are essentially based on record timestamp.
+            if (task.maybePunctuate())
                 sensors.punctuateTimeSensor.record(time.milliseconds() - now);
 
         } catch (KafkaException e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
new file mode 100644
index 0000000..6573779
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamForeachTest {
+
+    final private String topicName = "topic";
+
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    @Test
+    public void testForeach() {
+        // Given
+        List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+            new KeyValue<>(0, "zero"),
+            new KeyValue<>(1, "one"),
+            new KeyValue<>(2, "two"),
+            new KeyValue<>(3, "three")
+        );
+
+        List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
+            new KeyValue<>(0, "ZERO"),
+            new KeyValue<>(2, "ONE"),
+            new KeyValue<>(4, "TWO"),
+            new KeyValue<>(6, "THREE")
+        );
+
+        final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
+        ForeachAction<Integer, String> action =
+            new ForeachAction<Integer, String>() {
+                @Override
+                public void apply(Integer key, String value) {
+                    actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
+                }
+            };
+
+        // When
+        KStreamBuilder builder = new KStreamBuilder();
+        KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
+        stream.foreach(action);
+
+        // Then
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (KeyValue<Integer, String> record: inputRecords) {
+            driver.process(topicName, record.key, record.value);
+        }
+
+        assertEquals(expectedRecords.size(), actualRecords.size());
+        for (int i = 0; i < expectedRecords.size(); i++) {
+            KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
+            KeyValue<Integer, String> actualRecord = actualRecords.get(i);
+            assertEquals(expectedRecord, actualRecord);
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index d24ab15..19a9411 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -90,7 +90,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push two items to the other stream. this should produce two items.
             // w1 = { 0:X0, 1:X1 }
@@ -102,7 +102,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
             // push all four items to the primary stream. this should produce two items.
             // w1 = { 0:X0, 1:X1 }
@@ -114,7 +114,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
             // push all items to the other stream. this should produce six items.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -126,7 +126,7 @@
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             // push all four items to the primary stream. this should produce six items.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -138,7 +138,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
             // push two items to the other stream. this should produce six item.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
@@ -150,7 +150,7 @@
                 driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
 
         } finally {
             Utils.delete(baseDir);
@@ -195,7 +195,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
             // push two items to the other stream. this should produce two items.
             // w1 = { 0:X0, 1:X1 }
@@ -207,7 +207,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
             // push all four items to the primary stream. this should produce four items.
             // w1 = { 0:X0, 1:X1 }
@@ -219,7 +219,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
             // push all items to the other stream. this should produce six items.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -231,7 +231,7 @@
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             // push all four items to the primary stream. this should produce six items.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -243,7 +243,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
             // push two items to the other stream. this should produce six item.
             // w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3,  0:XX0, 1:XX1, 2:XX2, 3:XX3 }
@@ -255,7 +255,7 @@
                 driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+            processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
 
         } finally {
             Utils.delete(baseDir);
@@ -302,7 +302,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push two items to the other stream. this should produce two items.
             // w1 = { 0:X0, 1:X1 }
@@ -314,7 +314,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
             // clear logically
             time = 1000L;
@@ -323,7 +323,7 @@
                 driver.setTime(time + i);
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // gradually expires items in w1
             // w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -335,35 +335,35 @@
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("3:X3+YY3");
+            processor.checkAndClearProcessResult("3:X3+YY3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // go back to the time before expiration
 
@@ -373,35 +373,35 @@
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0");
+            processor.checkAndClearProcessResult("0:X0+YY0");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             // clear (logically)
             time = 2000L;
@@ -411,7 +411,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // gradually expires items in w2
             // w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
@@ -422,35 +422,35 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("3:XX3+Y3");
+            processor.checkAndClearProcessResult("3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // go back to the time before expiration
 
@@ -460,35 +460,35 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0");
+            processor.checkAndClearProcessResult("0:XX0+Y0");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
         } finally {
             Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 166e8ba..65226d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -88,7 +88,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
             // push two items to the other stream. this should produce two items.
             // w {}
@@ -98,7 +98,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
             // w = { 0:Y0, 1:Y1 }
@@ -108,7 +108,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
             // push all items to the other stream. this should produce no items.
             // w = { 0:Y0, 1:Y1 }
@@ -118,7 +118,7 @@
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
             // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
@@ -128,7 +128,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
 
         } finally {
             Utils.delete(baseDir);
@@ -173,7 +173,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
             // push two items to the other stream. this should produce no items.
             // w = {}
@@ -183,7 +183,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // clear logically
             time = 1000L;
@@ -196,7 +196,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // gradually expire items in window.
             // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
@@ -207,35 +207,35 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
             // go back to the time before expiration
 
@@ -245,35 +245,35 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
 
             driver.setTime(++time);
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+            processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
 
         } finally {
             Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 8e672a2..3acb59a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -95,7 +95,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
             // push two items to the other stream. this should not produce any item.
 
@@ -103,7 +103,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -111,14 +111,14 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
 
             // push all items to the other stream. this should not produce any item
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -126,7 +126,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
 
             // push two items with null to the other stream as deletes. this should not produce any item.
 
@@ -134,7 +134,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult();
+            processor.checkAndClearProcessResult();
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -142,7 +142,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
 
         } finally {
             Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 4244de5..a0a61f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -59,7 +59,8 @@
                         }
 
                         @Override
-                        public void punctuate(long timestamp) {
+                        public KeyValue<Integer, Integer> punctuate(long timestamp) {
+                            return KeyValue.pair(-1, (int) timestamp);
                         }
 
                         @Override
@@ -80,9 +81,12 @@
             driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
         }
 
-        assertEquals(4, processor.processed.size());
+        driver.punctuate(2);
+        driver.punctuate(3);
 
-        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"};
+        assertEquals(6, processor.processed.size());
+
+        String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
 
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], processor.processed.get(i));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 52abdf7..f5f9698 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -58,7 +58,8 @@
                         }
 
                         @Override
-                        public void punctuate(long timestamp) {
+                        public Integer punctuate(long timestamp) {
+                            return (int) timestamp;
                         }
 
                         @Override
@@ -82,7 +83,10 @@
 
         assertEquals(4, processor.processed.size());
 
-        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
+        driver.punctuate(2);
+        driver.punctuate(3);
+
+        String[] expected = {"1:10", "10:110", "100:1110", "1000:11110", "null:2", "null:3"};
 
         for (int i = 0; i < expected.length; i++) {
             assertEquals(expected[i], processor.processed.get(i));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index e19510f..3c7a1bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -182,15 +182,15 @@
             driver.setTime(4L);
             driver.process(topic1, "A", "1");
 
-            proc1.checkAndClearResult(
+            proc1.checkAndClearProcessResult(
                     "[A@0]:0+1",
                     "[B@0]:0+2",
                     "[C@0]:0+3",
                     "[D@0]:0+4",
                     "[A@0]:0+1+1"
             );
-            proc2.checkAndClearResult();
-            proc3.checkAndClearResult(
+            proc2.checkAndClearProcessResult();
+            proc3.checkAndClearProcessResult(
                     "[A@0]:null",
                     "[B@0]:null",
                     "[C@0]:null",
@@ -209,15 +209,15 @@
             driver.setTime(9L);
             driver.process(topic1, "C", "3");
 
-            proc1.checkAndClearResult(
+            proc1.checkAndClearProcessResult(
                     "[A@0]:0+1+1+1", "[A@5]:0+1",
                     "[B@0]:0+2+2", "[B@5]:0+2",
                     "[D@0]:0+4+4", "[D@5]:0+4",
                     "[B@0]:0+2+2+2", "[B@5]:0+2+2",
                     "[C@0]:0+3+3", "[C@5]:0+3"
             );
-            proc2.checkAndClearResult();
-            proc3.checkAndClearResult(
+            proc2.checkAndClearProcessResult();
+            proc3.checkAndClearProcessResult(
                     "[A@0]:null", "[A@5]:null",
                     "[B@0]:null", "[B@5]:null",
                     "[D@0]:null", "[D@5]:null",
@@ -236,15 +236,15 @@
             driver.setTime(4L);
             driver.process(topic2, "A", "a");
 
-            proc1.checkAndClearResult();
-            proc2.checkAndClearResult(
+            proc1.checkAndClearProcessResult();
+            proc2.checkAndClearProcessResult(
                     "[A@0]:0+a",
                     "[B@0]:0+b",
                     "[C@0]:0+c",
                     "[D@0]:0+d",
                     "[A@0]:0+a+a"
             );
-            proc3.checkAndClearResult(
+            proc3.checkAndClearProcessResult(
                     "[A@0]:0+1+1+1%0+a",
                     "[B@0]:0+2+2+2%0+b",
                     "[C@0]:0+3+3%0+c",
@@ -262,15 +262,15 @@
             driver.setTime(9L);
             driver.process(topic2, "C", "c");
 
-            proc1.checkAndClearResult();
-            proc2.checkAndClearResult(
+            proc1.checkAndClearProcessResult();
+            proc2.checkAndClearProcessResult(
                     "[A@0]:0+a+a+a", "[A@5]:0+a",
                     "[B@0]:0+b+b", "[B@5]:0+b",
                     "[D@0]:0+d+d", "[D@5]:0+d",
                     "[B@0]:0+b+b+b", "[B@5]:0+b+b",
                     "[C@0]:0+c+c", "[C@5]:0+c"
             );
-            proc3.checkAndClearResult(
+            proc3.checkAndClearProcessResult(
                     "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
                     "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
                     "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 78d274e..ee26058 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -74,8 +74,8 @@
         driver.process(topic1, "A", null);
         driver.process(topic1, "B", null);
 
-        proc2.checkAndClearResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
-        proc3.checkAndClearResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
+        proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
+        proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
     }
 
     @Test
@@ -193,25 +193,25 @@
             driver.process(topic1, "B", 1);
             driver.process(topic1, "C", 1);
 
-            proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
 
             driver.process(topic1, "A", 2);
             driver.process(topic1, "B", 2);
 
-            proc1.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
-            proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+            proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+            proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
             driver.process(topic1, "A", 3);
 
-            proc1.checkAndClearResult("A:(3<-null)");
-            proc2.checkAndClearResult("A:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(3<-null)");
+            proc2.checkAndClearProcessResult("A:(null<-null)");
 
             driver.process(topic1, "A", null);
             driver.process(topic1, "B", null);
 
-            proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
-            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
 
         } finally {
             Utils.delete(stateDir);
@@ -250,25 +250,25 @@
             driver.process(topic1, "B", 1);
             driver.process(topic1, "C", 1);
 
-            proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
-            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
 
             driver.process(topic1, "A", 2);
             driver.process(topic1, "B", 2);
 
-            proc1.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
-            proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+            proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+            proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
             driver.process(topic1, "A", 3);
 
-            proc1.checkAndClearResult("A:(3<-2)");
-            proc2.checkAndClearResult("A:(null<-2)");
+            proc1.checkAndClearProcessResult("A:(3<-2)");
+            proc2.checkAndClearProcessResult("A:(null<-2)");
 
             driver.process(topic1, "A", null);
             driver.process(topic1, "B", null);
 
-            proc1.checkAndClearResult("A:(null<-3)", "B:(null<-2)");
-            proc2.checkAndClearResult("A:(null<-null)", "B:(null<-2)");
+            proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
+            proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
 
         } finally {
             Utils.delete(stateDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
new file mode 100644
index 0000000..4b612a5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class KTableForeachTest {
+
+    final private String topicName = "topic";
+
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    @Test
+    public void testForeach() {
+        // Given
+        List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+            new KeyValue<>(0, "zero"),
+            new KeyValue<>(1, "one"),
+            new KeyValue<>(2, "two"),
+            new KeyValue<>(3, "three")
+        );
+
+        List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
+            new KeyValue<>(0, "ZERO"),
+            new KeyValue<>(2, "ONE"),
+            new KeyValue<>(4, "TWO"),
+            new KeyValue<>(6, "THREE")
+        );
+
+        final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
+        ForeachAction<Integer, String> action =
+            new ForeachAction<Integer, String>() {
+                @Override
+                public void apply(Integer key, String value) {
+                    actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
+                }
+            };
+
+        // When
+        KStreamBuilder builder = new KStreamBuilder();
+        KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName);
+        table.foreach(action);
+
+        // Then
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (KeyValue<Integer, String> record: inputRecords) {
+            driver.process(topicName, record.key, record.value);
+        }
+
+        assertEquals(expectedRecords.size(), actualRecords.size());
+        for (int i = 0; i < expectedRecords.size(); i++) {
+            KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
+            KeyValue<Integer, String> actualRecord = actualRecords.get(i);
+            assertEquals(expectedRecord, actualRecord);
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 5f30574..f6ebbe1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -100,7 +100,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:null", "1:null");
+            processor.checkAndClearProcessResult("0:null", "1:null");
             checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
 
             // push two items to the other stream. this should produce two items.
@@ -109,7 +109,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
             // push all four items to the primary stream. this should produce four items.
@@ -118,7 +118,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
             // push all items to the other stream. this should produce four items.
@@ -126,7 +126,7 @@
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -135,7 +135,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push two items with null to the other stream as deletes. this should produce two item.
@@ -144,7 +144,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult("0:null", "1:null");
+            processor.checkAndClearProcessResult("0:null", "1:null");
             checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -153,7 +153,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
             checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
         } finally {
@@ -195,7 +195,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -203,7 +203,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -211,14 +211,14 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -226,7 +226,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -234,7 +234,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -242,7 +242,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
         } finally {
             Utils.delete(baseDir);
@@ -285,7 +285,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -293,7 +293,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -301,14 +301,14 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -316,7 +316,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -324,7 +324,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
+            proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -332,7 +332,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
         } finally {
             Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index f92c5ca..449ea05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -105,7 +105,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
             checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
 
             // push two items to the other stream. this should produce two items.
@@ -114,7 +114,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
             // push all four items to the primary stream. this should produce four items.
@@ -123,7 +123,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
             // push all items to the other stream. this should produce four items.
@@ -131,7 +131,7 @@
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -140,7 +140,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push two items with null to the other stream as deletes. this should produce two item.
@@ -149,7 +149,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
             checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -158,7 +158,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
             checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
         } finally {
@@ -200,7 +200,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -208,7 +208,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -216,14 +216,14 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -231,7 +231,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -239,7 +239,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -247,7 +247,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
         } finally {
             Utils.delete(baseDir);
@@ -290,7 +290,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -298,7 +298,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -306,14 +306,14 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -321,7 +321,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -329,7 +329,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -337,7 +337,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
         } finally {
             Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 6cc77e0..ea7476a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -100,7 +100,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
             checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
 
             // push two items to the other stream. this should produce two items.
@@ -109,7 +109,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
 
             // push all four items to the primary stream. this should produce four items.
@@ -118,7 +118,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+            processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
             checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
 
             // push all items to the other stream. this should produce four items.
@@ -126,7 +126,7 @@
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -135,7 +135,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+            processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
             checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push two items with null to the other stream as deletes. this should produce two item.
@@ -144,7 +144,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+            processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
             checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
 
             // push all four items to the primary stream. this should produce four items.
@@ -153,7 +153,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+            processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
             checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
 
             // push middle two items to the primary stream with null. this should produce two items.
@@ -162,7 +162,7 @@
                 driver.process(topic1, expectedKeys[i], null);
             }
 
-            processor.checkAndClearResult("1:null", "2:null+YY2");
+            processor.checkAndClearProcessResult("1:null", "2:null+YY2");
             checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3"));
 
         } finally {
@@ -204,7 +204,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -212,7 +212,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -220,14 +220,14 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -235,7 +235,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -243,7 +243,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -251,7 +251,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+            proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
             // push middle two items to the primary stream with null. this should produce two items.
 
@@ -259,7 +259,7 @@
                 driver.process(topic1, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("1:(null<-null)", "2:(null+YY2<-null)");
+            proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)");
 
         } finally {
             Utils.delete(baseDir);
@@ -302,7 +302,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
 
             // push two items to the other stream. this should produce two items.
 
@@ -310,7 +310,7 @@
                 driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -318,14 +318,14 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
             // push all items to the other stream. this should produce four items.
             for (int i = 0; i < expectedKeys.length; i++) {
                 driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -333,7 +333,7 @@
                 driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
             // push two items with null to the other stream as deletes. this should produce two item.
 
@@ -341,7 +341,7 @@
                 driver.process(topic2, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
 
             // push all four items to the primary stream. this should produce four items.
 
@@ -349,7 +349,7 @@
                 driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
             }
 
-            proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
             // push middle two items to the primary stream with null. this should produce two items.
 
@@ -357,7 +357,7 @@
                 driver.process(topic1, expectedKeys[i], null);
             }
 
-            proc.checkAndClearResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
+            proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
 
         } finally {
             Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 9ec1258..9cafe8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -223,20 +223,20 @@
             driver.process(topic1, "B", "01");
             driver.process(topic1, "C", "01");
 
-            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
             driver.process(topic1, "A", "02");
             driver.process(topic1, "B", "02");
 
-            proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+            proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
 
             driver.process(topic1, "A", "03");
 
-            proc.checkAndClearResult("A:(3<-null)");
+            proc.checkAndClearProcessResult("A:(3<-null)");
 
             driver.process(topic1, "A", null);
 
-            proc.checkAndClearResult("A:(null<-null)");
+            proc.checkAndClearProcessResult("A:(null<-null)");
 
         } finally {
             Utils.delete(stateDir);
@@ -276,20 +276,20 @@
             driver.process(topic1, "B", "01");
             driver.process(topic1, "C", "01");
 
-            proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+            proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
 
             driver.process(topic1, "A", "02");
             driver.process(topic1, "B", "02");
 
-            proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
+            proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
 
             driver.process(topic1, "A", "03");
 
-            proc.checkAndClearResult("A:(3<-2)");
+            proc.checkAndClearProcessResult("A:(3<-2)");
 
             driver.process(topic1, "A", null);
 
-            proc.checkAndClearResult("A:(null<-3)");
+            proc.checkAndClearProcessResult("A:(null<-3)");
 
         } finally {
             Utils.delete(stateDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 51276f3..7c158e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -131,21 +131,21 @@
             driver.process(topic1, "B", "01");
             driver.process(topic1, "C", "01");
 
-            proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
             driver.process(topic1, "A", "02");
             driver.process(topic1, "B", "02");
 
-            proc1.checkAndClearResult("A:(02<-null)", "B:(02<-null)");
+            proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
 
             driver.process(topic1, "A", "03");
 
-            proc1.checkAndClearResult("A:(03<-null)");
+            proc1.checkAndClearProcessResult("A:(03<-null)");
 
             driver.process(topic1, "A", null);
             driver.process(topic1, "B", null);
 
-            proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+            proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
 
         } finally {
             Utils.delete(stateDir);
@@ -176,21 +176,21 @@
             driver.process(topic1, "B", "01");
             driver.process(topic1, "C", "01");
 
-            proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+            proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
 
             driver.process(topic1, "A", "02");
             driver.process(topic1, "B", "02");
 
-            proc1.checkAndClearResult("A:(02<-01)", "B:(02<-01)");
+            proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
 
             driver.process(topic1, "A", "03");
 
-            proc1.checkAndClearResult("A:(03<-02)");
+            proc1.checkAndClearProcessResult("A:(03<-02)");
 
             driver.process(topic1, "A", null);
             driver.process(topic1, "B", null);
 
-            proc1.checkAndClearResult("A:(null<-03)", "B:(null<-02)");
+            proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
 
         } finally {
             Utils.delete(stateDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
new file mode 100644
index 0000000..22948ab
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class KeyValuePrinterProcessorTest {
+
+    private String topicName = "topic";
+    private Serde<String> stringSerde = Serdes.String();
+    private Serde<byte[]> bytesSerde = Serdes.ByteArray();
+    private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    private KStreamBuilder builder = new KStreamBuilder();
+    private PrintStream printStream = new PrintStream(baos);
+
+
+    @Test
+    public void testPrintKeyValueDefaultSerde() throws Exception {
+
+        KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printStream);
+        String[] suppliedKeys = {"foo", "bar", null};
+        String[] suppliedValues = {"value1", "value2", "value3"};
+        String[] expectedValues = {"foo , value1", "bar , value2", "null , value3"};
+
+
+        KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName);
+        stream.process(keyValuePrinter);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+        for (int i = 0; i < suppliedKeys.length; i++) {
+            driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
+        }
+
+        String[] capturedValues = new String(baos.toByteArray(), Charset.forName("UTF-8")).split("\n");
+
+        for (int i = 0; i < capturedValues.length; i++) {
+            assertEquals(capturedValues[i], expectedValues[i]);
+        }
+    }
+
+
+    @Test
+    public void testPrintKeyValueWithProvidedSerde() throws Exception {
+
+        Serde<MockObject> mockObjectSerde = Serdes.serdeFrom(new MockSerializer(), new MockDeserializer());
+        KeyValuePrinter<String, MockObject> keyValuePrinter = new KeyValuePrinter<>(printStream, stringSerde, mockObjectSerde);
+        KStream<String, MockObject> stream = builder.stream(stringSerde, mockObjectSerde, topicName);
+
+        stream.process(keyValuePrinter);
+
+        KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+        String suppliedKey = null;
+        byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));
+
+        driver.process(topicName, suppliedKey, suppliedValue);
+        String expectedPrintedValue = "null , name:print label:test";
+        String capturedValue = new String(baos.toByteArray(), Charset.forName("UTF-8")).trim();
+
+        assertEquals(capturedValue, expectedPrintedValue);
+
+    }
+
+    private static class MockObject {
+        public String name;
+        public String label;
+
+        public MockObject() {
+        }
+
+        MockObject(String name, String label) {
+            this.name = name;
+            this.label = label;
+        }
+
+        @Override
+        public String toString() {
+            return "name:" + name + " label:" + label;
+        }
+    }
+
+
+    private static class MockDeserializer implements Deserializer<MockObject> {
+
+        private com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+
+        }
+
+        @Override
+        public MockObject deserialize(String topic, byte[] data) {
+            MockObject mockObject;
+            try {
+                mockObject = objectMapper.readValue(data, MockObject.class);
+            } catch (Exception e) {
+                throw new SerializationException(e);
+            }
+            return mockObject;
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+
+    private static class MockSerializer implements Serializer<MockObject> {
+        private final com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+
+        }
+
+        @Override
+        public byte[] serialize(String topic, MockObject data) {
+            try {
+                return objectMapper.writeValueAsBytes(data);
+            } catch (Exception e) {
+                throw new SerializationException("Error serializing JSON message", e);
+            }
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
+
+
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 5bf1b5e..a1c07af7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -21,7 +21,6 @@
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -60,17 +59,17 @@
 
         // add three 3 records with timestamp 1, 3, 5 to partition-1
         List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
 
         group.addRawRecords(partition1, list1);
 
         // add three 3 records with timestamp 2, 4, 6 to partition-2
         List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
-            new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
-            new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+            new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
 
         group.addRawRecords(partition2, list2);
 
@@ -82,7 +81,7 @@
         StampedRecord record;
         PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
 
-        // get one record
+        // get one record, now the time should be advanced
         record = group.nextRecord(info);
         assertEquals(partition1, info.partition());
         assertEquals(1L, record.timestamp);
@@ -99,5 +98,72 @@
         assertEquals(2, group.numBuffered(partition1));
         assertEquals(2, group.numBuffered(partition2));
         assertEquals(3L, group.timestamp());
+
+        // add three 3 records with timestamp 2, 4, 6 to partition-1 again
+        List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue));
+
+        group.addRawRecords(partition1, list3);
+
+        assertEquals(6, group.numBuffered());
+        assertEquals(4, group.numBuffered(partition1));
+        assertEquals(2, group.numBuffered(partition2));
+        assertEquals(3L, group.timestamp());
+
+        // get one record, time should not be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition1, info.partition());
+        assertEquals(3L, record.timestamp);
+        assertEquals(5, group.numBuffered());
+        assertEquals(3, group.numBuffered(partition1));
+        assertEquals(2, group.numBuffered(partition2));
+        assertEquals(3L, group.timestamp());
+
+        // get one more record, now time should be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition1, info.partition());
+        assertEquals(5L, record.timestamp);
+        assertEquals(4, group.numBuffered());
+        assertEquals(2, group.numBuffered(partition1));
+        assertEquals(2, group.numBuffered(partition2));
+        assertEquals(3L, group.timestamp());
+
+        // get one more record, time should not be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition1, info.partition());
+        assertEquals(2L, record.timestamp);
+        assertEquals(3, group.numBuffered());
+        assertEquals(1, group.numBuffered(partition1));
+        assertEquals(2, group.numBuffered(partition2));
+        assertEquals(4L, group.timestamp());
+
+        // get one more record, now time should be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition2, info.partition());
+        assertEquals(4L, record.timestamp);
+        assertEquals(2, group.numBuffered());
+        assertEquals(1, group.numBuffered(partition1));
+        assertEquals(1, group.numBuffered(partition2));
+        assertEquals(4L, group.timestamp());
+
+        // get one more record, time should not be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition1, info.partition());
+        assertEquals(4L, record.timestamp);
+        assertEquals(1, group.numBuffered());
+        assertEquals(0, group.numBuffered(partition1));
+        assertEquals(1, group.numBuffered(partition2));
+        assertEquals(4L, group.timestamp());
+
+        // get one more record, time should not be advanced
+        record = group.nextRecord(info);
+        assertEquals(partition2, info.partition());
+        assertEquals(6L, record.timestamp);
+        assertEquals(0, group.numBuffered());
+        assertEquals(0, group.numBuffered(partition1));
+        assertEquals(0, group.numBuffered(partition2));
+        assertEquals(4L, group.timestamp());
+
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index ef08176..1095fcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -157,6 +157,28 @@
     }
 
     @Test
+    public void testDrivingMultiplexByNameTopology() {
+        driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology());
+        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
+
+        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
+
+        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
+    }
+
+    @Test
     public void testDrivingStatefulTopology() {
         String storeName = "entries";
         driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName);
@@ -215,6 +237,13 @@
                                     .addSink("sink2", OUTPUT_TOPIC_2, "processor");
     }
 
+    protected TopologyBuilder createMultiplexByNameTopology() {
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+            .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
+            .addSink("sink0", OUTPUT_TOPIC_1, "processor")
+            .addSink("sink1", OUTPUT_TOPIC_2, "processor");
+    }
+
     protected TopologyBuilder createStatefulTopology(String storeName) {
         return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
                                     .addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
@@ -268,6 +297,33 @@
     }
 
     /**
+     * A processor that forwards slightly-modified messages to each named child.
+     * Note: the children are assumed to be named "sink{child number}", e.g., sink1, or sink2, etc.
+     */
+    protected static class MultiplexByNameProcessor extends AbstractProcessor<String, String> {
+
+        private final int numChildren;
+
+        public MultiplexByNameProcessor(int numChildren) {
+            this.numChildren = numChildren;
+        }
+
+        @Override
+        public void process(String key, String value) {
+            for (int i = 0; i != numChildren; ++i) {
+                context().forward(key, value + "(" + (i + 1) + ")", "sink" + i);
+            }
+        }
+
+        @Override
+        public void punctuate(long streamTime) {
+            for (int i = 0; i != numChildren; ++i) {
+                context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i);
+            }
+        }
+    }
+
+    /**
      * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When
      * {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
      */
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 33fa5c4..6014c36 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -32,6 +32,7 @@
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Test;
@@ -46,6 +47,7 @@
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class StreamTaskTest {
@@ -58,10 +60,12 @@
     private final TopicPartition partition2 = new TopicPartition("topic2", 1);
     private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
 
-    private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
-    private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+    private final MockProcessorNode<Integer, Integer>  processor = new MockProcessorNode<>(10L);
+
     private final ProcessorTopology topology = new ProcessorTopology(
-            Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
+            Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2, (ProcessorNode) processor),
             new HashMap<String, SourceNode>() {
                 {
                     put("topic1", source1);
@@ -94,6 +98,8 @@
     @Before
     public void setup() {
         consumer.assign(Arrays.asList(partition1, partition2));
+        source1.addChild(processor);
+        source2.addChild(processor);
     }
 
     @SuppressWarnings("unchecked")
@@ -211,6 +217,73 @@
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testMaybePunctuate() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+            StreamsConfig config = createConfig(baseDir);
+            StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+
+            task.addRecords(partition1, records(
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+            ));
+
+            task.addRecords(partition2, records(
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+                    new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+            ));
+
+            assertTrue(task.maybePunctuate());
+
+            assertEquals(5, task.process());
+            assertEquals(1, source1.numReceived);
+            assertEquals(0, source2.numReceived);
+
+            assertFalse(task.maybePunctuate());
+
+            assertEquals(4, task.process());
+            assertEquals(1, source1.numReceived);
+            assertEquals(1, source2.numReceived);
+
+            assertTrue(task.maybePunctuate());
+
+            assertEquals(3, task.process());
+            assertEquals(2, source1.numReceived);
+            assertEquals(1, source2.numReceived);
+
+            assertFalse(task.maybePunctuate());
+
+            assertEquals(2, task.process());
+            assertEquals(2, source1.numReceived);
+            assertEquals(2, source2.numReceived);
+
+            assertTrue(task.maybePunctuate());
+
+            assertEquals(1, task.process());
+            assertEquals(3, source1.numReceived);
+            assertEquals(2, source2.numReceived);
+
+            assertFalse(task.maybePunctuate());
+
+            assertEquals(0, task.process());
+            assertEquals(3, source1.numReceived);
+            assertEquals(3, source2.numReceived);
+
+            assertFalse(task.maybePunctuate());
+
+            processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
+
+            task.close();
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
     private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 0c56c26..2ee8730 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -87,6 +87,21 @@
         }
     }
 
+    public void punctuate(long timestamp) {
+        setTime(timestamp);
+
+        for (ProcessorNode processor : topology.processors()) {
+            if (processor.processor() != null) {
+                currNode = processor;
+                try {
+                    processor.processor().punctuate(timestamp);
+                } finally {
+                    currNode = null;
+                }
+            }
+        }
+    }
+
     public void setTime(long timestamp) {
         context.setTime(timestamp);
     }
@@ -120,6 +135,22 @@
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value, String childName) {
+        ProcessorNode thisNode = currNode;
+        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+            if (childNode.name().equals(childName)) {
+                currNode = childNode;
+                try {
+                    childNode.process(key, value);
+                } finally {
+                    currNode = thisNode;
+                }
+                break;
+            }
+        }
+    }
+
     public Map<String, StateStore> allStateStores() {
         return context.allStateStores();
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index e57e1c7..2e2c221 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -143,7 +143,7 @@
 
     @Override
     public void schedule(long interval) {
-        throw new UnsupportedOperationException("schedule() not supported");
+        throw new UnsupportedOperationException("schedule() not supported.");
     }
 
     @Override
@@ -159,23 +159,29 @@
     }
 
     @Override
+    @SuppressWarnings("unchecked")
+    public <K, V> void forward(K key, V value, String childName) {
+        driver.forward(key, value, childName);
+    }
+
+    @Override
     public void commit() {
         throw new UnsupportedOperationException("commit() not supported.");
     }
 
     @Override
     public String topic() {
-        throw new UnsupportedOperationException("topic() not supported.");
+        return null;
     }
 
     @Override
     public int partition() {
-        throw new UnsupportedOperationException("partition() not supported.");
+        return -1;
     }
 
     @Override
     public long offset() {
-        throw new UnsupportedOperationException("offset() not supported.");
+        return -1L;
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
new file mode 100644
index 0000000..cf8a526
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
+
+    public static final String NAME = "MOCK-PROCESS-";
+    public static final AtomicInteger INDEX = new AtomicInteger(1);
+
+    public int numReceived = 0;
+
+    public final MockProcessorSupplier<K, V> supplier;
+
+    public MockProcessorNode(long scheduleInterval) {
+        this(new MockProcessorSupplier<K, V>(scheduleInterval));
+    }
+
+    private MockProcessorNode(MockProcessorSupplier<K, V> supplier) {
+        super(NAME + INDEX.getAndIncrement(), supplier.get(), Collections.<String>emptySet());
+
+        this.supplier = supplier;
+    }
+
+    @Override
+    public void process(K key, V value) {
+        this.numReceived++;
+        processor().process(key, value);
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index b402525..9cf0eb2 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.test;
 
+import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -30,36 +31,48 @@
     public final ArrayList<String> processed = new ArrayList<>();
     public final ArrayList<Long> punctuated = new ArrayList<>();
 
+    private final long scheduleInterval;
+
+    public MockProcessorSupplier() {
+        this(-1L);
+    }
+
+    public MockProcessorSupplier(long scheduleInterval) {
+        this.scheduleInterval = scheduleInterval;
+    }
+
     @Override
     public Processor<K, V> get() {
         return new MockProcessor();
     }
 
-    public class MockProcessor implements Processor<K, V> {
+    public class MockProcessor extends AbstractProcessor<K, V> {
 
         @Override
         public void init(ProcessorContext context) {
-            // do nothing
+            super.init(context);
+            if (scheduleInterval > 0L)
+                context.schedule(scheduleInterval);
         }
 
         @Override
         public void process(K key, V value) {
-            processed.add(key + ":" + value);
+            processed.add((key == null ? "null" : key) + ":" +
+                    (value == null ? "null" : value));
         }
 
         @Override
         public void punctuate(long streamTime) {
+            assertEquals(streamTime, context().timestamp());
+            assertEquals(null, context().topic());
+            assertEquals(-1, context().partition());
+            assertEquals(-1L, context().offset());
+
             punctuated.add(streamTime);
         }
-
-        @Override
-        public void close() {
-            // do nothing
-        }
-
     }
 
-    public void checkAndClearResult(String... expected) {
+    public void checkAndClearProcessResult(String... expected) {
         assertEquals("the number of outputs:", expected.length, processed.size());
 
         for (int i = 0; i < expected.length; i++) {
@@ -69,4 +82,14 @@
         processed.clear();
     }
 
+    public void checkAndClearPunctuateResult(long... expected) {
+        assertEquals("the number of outputs:", expected.length, punctuated.size());
+
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i));
+        }
+
+        processed.clear();
+    }
+
 }
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index df1a612..10163a0 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -23,4 +23,4 @@
 # Instead, in trunk, the version should have a suffix of the form ".devN"
 #
 # For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
-__version__ = '0.10.0.0.dev0'
+__version__ = '0.10.1.0.dev0'
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index f2ea421..414da84 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -52,12 +52,8 @@
                num_nodes = 1
                * is_int_with_prefix recommended if num_nodes > 1, because otherwise each producer
                will produce exactly same messages, and validation may miss missing messages.
-        :param compression_types: If None, all producers will not use compression; or a list of one or
-        more compression types (including "none"). Each producer will pick a compression type
-        from the list in round-robin fashion. Example: compression_types = ["none", "snappy"] and
-        num_nodes = 3, then producer 1 and 2 will not use compression, and producer 3 will use
-        compression type = snappy. If in this example, num_nodes is 1, then first (and only)
-        producer will not use compression.
+        :param compression_types: If None, all producers will not use compression; or a list of
+        compression types, one per producer (could be "none").
         """
         super(VerifiableProducer, self).__init__(context, num_nodes)
 
@@ -67,30 +63,39 @@
         self.throughput = throughput
         self.message_validator = message_validator
         self.compression_types = compression_types
+        if self.compression_types is not None:
+            assert len(self.compression_types) == num_nodes, "Specify one compression type per node"
 
         for node in self.nodes:
             node.version = version
         self.acked_values = []
         self.not_acked_values = []
         self.produced_count = {}
-        self.prop_file = ""
+
+
+    @property
+    def security_config(self):
+        return self.kafka.security_config.client_config()
+
+    def prop_file(self, node):
+        idx = self.idx(node)
+        prop_file = str(self.security_config)
+        if self.compression_types is not None:
+            compression_index = idx - 1
+            self.logger.info("VerifiableProducer (index = %d) will use compression type = %s", idx,
+                             self.compression_types[compression_index])
+            prop_file += "\ncompression.type=%s\n" % self.compression_types[compression_index]
+        return prop_file
 
     def _worker(self, idx, node):
         node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False)
 
         # Create and upload log properties
-        self.security_config = self.kafka.security_config.client_config(self.prop_file)
-        producer_prop_file = str(self.security_config)
         log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
         node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
 
         # Create and upload config file
-        if self.compression_types is not None:
-            compression_index = (idx - 1) % len(self.compression_types)
-            self.logger.info("VerifiableProducer (index = %d) will use compression type = %s", idx,
-                             self.compression_types[compression_index])
-            producer_prop_file += "\ncompression.type=%s\n" % self.compression_types[compression_index]
-
+        producer_prop_file = self.prop_file(node)
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)
@@ -197,7 +202,7 @@
 
     def each_produced_at_least(self, count):
         with self.lock:
-            for idx in range(1, self.num_nodes):
+            for idx in range(1, self.num_nodes + 1):
                 if self.produced_count.get(idx) is None or self.produced_count[idx] < count:
                     return False
             return True
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index 084b19d..534f65c 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -66,6 +66,12 @@
                 self.await_all_members(consumer)
                 self.await_consumed_messages(consumer)
 
+    def setup_consumer(self, topic, **kwargs):
+        # collect verifiable consumer events since this makes debugging much easier
+        consumer = super(OffsetValidationTest, self).setup_consumer(topic, **kwargs)
+        self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
+        return consumer
+
     def test_broker_rolling_bounce(self):
         """
         Verify correct consumer behavior when the brokers are consecutively restarted.
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 25b87bd..1880d7a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -256,9 +256,16 @@
     }
 
     private static abstract class ConsumerEvent {
+        private final long timestamp = System.currentTimeMillis();
+
         @JsonProperty
         public abstract String name();
 
+        @JsonProperty
+        public long timestamp() {
+            return timestamp;
+        }
+
         @JsonProperty("class")
         public String clazz() {
             return VerifiableConsumer.class.getName();