Merge branch 'trunk' into 0.10.0
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c457c83..5576431 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -584,7 +584,8 @@
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
- this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+ this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
+ config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
this.subscriptions = new SubscriptionState(offsetResetStrategy);
List<PartitionAssignor> assignors = config.getConfiguredInstances(
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c79d8e7..496a114 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -217,13 +218,25 @@
}
RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
+ future.addListener(new RequestFutureListener<ByteBuffer>() {
+ @Override
+ public void onSuccess(ByteBuffer value) {
+ // handle join completion in the callback so that the callback will be invoked
+ // even if the consumer is woken up before finishing the rebalance
+ onJoinComplete(generation, memberId, protocol, value);
+ needsJoinPrepare = true;
+ heartbeatTask.reset();
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ // we handle failures below after the request finishes. if the join completes
+ // after having been woken up, the exception is ignored and we will rejoin
+ }
+ });
client.poll(future);
- if (future.succeeded()) {
- onJoinComplete(generation, memberId, protocol, future.value());
- needsJoinPrepare = true;
- heartbeatTask.reset();
- } else {
+ if (future.failed()) {
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
@@ -521,6 +534,7 @@
protected void coordinatorDead() {
if (this.coordinator != null) {
log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
+ client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
this.coordinator = null;
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a364987..86b60d0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -422,9 +422,6 @@
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
reschedule(now + interval);
- } else if (exception instanceof SendFailedException) {
- log.debug("Failed to send automatic offset commit for group {}", groupId);
- reschedule(now);
} else {
log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
reschedule(now + interval);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index b70994d..4119954 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -19,6 +19,7 @@
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -40,12 +41,6 @@
/**
* Higher level consumer access to the network layer with basic support for futures and
* task scheduling. NOT thread-safe!
- *
- * TODO: The current implementation is simplistic in that it provides a facility for queueing requests
- * prior to delivery, but it makes no effort to retry requests which cannot be sent at the time
- * {@link #poll(long)} is called. This makes the behavior of the queue predictable and easy to
- * understand, but there are opportunities to provide timeout or retry capabilities in the future.
- * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior.
*/
public class ConsumerNetworkClient implements Closeable {
private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class);
@@ -57,17 +52,20 @@
private final Metadata metadata;
private final Time time;
private final long retryBackoffMs;
+ private final long unsentExpiryMs;
// wakeup enabled flag need to be volatile since it is allowed to be accessed concurrently
volatile private boolean wakeupsEnabled = true;
public ConsumerNetworkClient(KafkaClient client,
Metadata metadata,
Time time,
- long retryBackoffMs) {
+ long retryBackoffMs,
+ long requestTimeoutMs) {
this.client = client;
this.metadata = metadata;
this.time = time;
this.retryBackoffMs = retryBackoffMs;
+ this.unsentExpiryMs = requestTimeoutMs;
}
/**
@@ -227,8 +225,8 @@
// cleared or a connect finished in the poll
trySend(now);
- // fail all requests that couldn't be sent
- failUnsentRequests();
+ // fail requests that couldn't be sent if they have expired
+ failExpiredRequests(now);
}
/**
@@ -274,29 +272,48 @@
Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
Node node = requestEntry.getKey();
if (client.connectionFailed(node)) {
+ // Remove entry before invoking request callback to avoid callbacks handling
+ // coordinator failures traversing the unsent list again.
+ iterator.remove();
for (ClientRequest request : requestEntry.getValue()) {
RequestFutureCompletionHandler handler =
(RequestFutureCompletionHandler) request.callback();
handler.onComplete(new ClientResponse(request, now, true, null));
}
- iterator.remove();
}
}
}
- private void failUnsentRequests() {
- // clear all unsent requests and fail their corresponding futures
- for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
- Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
- while (iterator.hasNext()) {
- ClientRequest request = iterator.next();
- RequestFutureCompletionHandler handler =
- (RequestFutureCompletionHandler) request.callback();
- handler.raise(SendFailedException.INSTANCE);
+ private void failExpiredRequests(long now) {
+ // clear all expired unsent requests and fail their corresponding futures
+ Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
+ Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
+ while (requestIterator.hasNext()) {
+ ClientRequest request = requestIterator.next();
+ if (request.createdTimeMs() < now - unsentExpiryMs) {
+ RequestFutureCompletionHandler handler =
+ (RequestFutureCompletionHandler) request.callback();
+ handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));
+ requestIterator.remove();
+ } else
+ break;
+ }
+ if (requestEntry.getValue().isEmpty())
iterator.remove();
+ }
+ }
+
+ protected void failUnsentRequests(Node node, RuntimeException e) {
+ // clear unsent requests to node and fail their corresponding futures
+ List<ClientRequest> unsentRequests = unsent.remove(node);
+ if (unsentRequests != null) {
+ for (ClientRequest request : unsentRequests) {
+ RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
+ handler.raise(e);
}
}
- unsent.clear();
}
private boolean trySend(long now) {
@@ -320,7 +337,6 @@
private void clientPoll(long timeout, long now) {
client.poll(timeout, now);
if (wakeupsEnabled && wakeup.get()) {
- failUnsentRequests();
wakeup.set(false);
throw new WakeupException();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
deleted file mode 100644
index 3312a2c..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.apache.kafka.common.errors.RetriableException;
-
-/**
- * Exception used in {@link ConsumerNetworkClient} to indicate the failure
- * to transmit a request to the networking layer. This could be either because
- * the client is still connecting to the given host or its send buffer is full.
- */
-public class SendFailedException extends RetriableException {
- public static final SendFailedException INSTANCE = new SendFailedException();
-
- private static final long serialVersionUID = 1L;
-
-}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 6acc059..d60e28e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -711,13 +711,11 @@
Integer partition = record.partition();
if (partition != null) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
- int numPartitions = partitions.size();
+ int lastPartition = partitions.size() - 1;
// they have given us a partition, use it
- if (partition < 0 || partition >= numPartitions)
- throw new IllegalArgumentException("Invalid partition given with record: " + partition
- + " is not in the range [0..."
- + numPartitions
- + "].");
+ if (partition < 0 || partition > lastPartition) {
+ throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
+ }
return partition;
}
return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
index c0949e3..554b885 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
@@ -18,6 +18,7 @@
* not yet been created.
*/
public class GroupCoordinatorNotAvailableException extends RetriableException {
+ public static final GroupCoordinatorNotAvailableException INSTANCE = new GroupCoordinatorNotAvailableException();
private static final long serialVersionUID = 1L;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
index d1d3b24..41e9160 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java
@@ -31,7 +31,7 @@
String key = "key";
String value = "value";
- ConsumerRecord record = new ConsumerRecord(topic, partition, offset, key, value);
+ ConsumerRecord<String, String> record = new ConsumerRecord<>(topic, partition, offset, key, value);
assertEquals(topic, record.topic());
assertEquals(partition, record.partition());
assertEquals(offset, record.offset());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 623e5ef..b864d69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -35,6 +35,7 @@
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
@@ -103,7 +104,7 @@
this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata.update(cluster, time.milliseconds());
- this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
this.defaultOffsetCommitCallback = new MockCommitCallback();
@@ -323,6 +324,45 @@
}
@Test
+ public void testWakeupDuringJoin() {
+ final String consumerId = "leader";
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ // ensure metadata is up-to-date for leader
+ metadata.setTopics(Arrays.asList(topicName));
+ metadata.update(cluster, time.milliseconds());
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
+ partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+
+ // prepare only the first half of the join and then trigger the wakeup
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+ consumerClient.wakeup();
+
+ try {
+ coordinator.ensurePartitionAssignment();
+ } catch (WakeupException e) {
+ // ignore
+ }
+
+ // now complete the second half
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+ assertEquals(1, rebalanceListener.assignedCount);
+ assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ }
+
+ @Test
public void testNormalJoinGroupFollower() {
final String consumerId = "consumer";
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 1692010..f0f2a97 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -27,6 +27,8 @@
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -40,7 +42,7 @@
private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
private Node node = cluster.nodes().get(0);
private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
- private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
@Test
public void send() {
@@ -104,6 +106,65 @@
assertTrue(future.isDone());
}
+ @Test
+ public void sendExpiry() throws InterruptedException {
+ long unsentExpiryMs = 10;
+ final AtomicBoolean isReady = new AtomicBoolean();
+ final AtomicBoolean disconnected = new AtomicBoolean();
+ client = new MockClient(time) {
+ @Override
+ public boolean ready(Node node, long now) {
+ if (isReady.get())
+ return super.ready(node, now);
+ else
+ return false;
+ }
+ @Override
+ public boolean connectionFailed(Node node) {
+ return disconnected.get();
+ }
+ };
+ // Queue first send, sleep long enough for this to expire and then queue second send
+ consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, unsentExpiryMs);
+ RequestFuture<ClientResponse> future1 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertEquals(1, consumerClient.pendingRequestCount(node));
+ assertFalse(future1.isDone());
+
+ time.sleep(unsentExpiryMs + 1);
+ RequestFuture<ClientResponse> future2 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ assertEquals(2, consumerClient.pendingRequestCount());
+ assertEquals(2, consumerClient.pendingRequestCount(node));
+ assertFalse(future2.isDone());
+
+ // First send should have expired and second send still pending
+ consumerClient.poll(0);
+ assertTrue(future1.isDone());
+ assertFalse(future1.succeeded());
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertEquals(1, consumerClient.pendingRequestCount(node));
+ assertFalse(future2.isDone());
+
+ // Enable send, the un-expired send should succeed on poll
+ isReady.set(true);
+ client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+ consumerClient.poll(future2);
+ ClientResponse clientResponse = future2.value();
+ HeartbeatResponse response = new HeartbeatResponse(clientResponse.responseBody());
+ assertEquals(Errors.NONE.code(), response.errorCode());
+
+ // Disable ready flag to delay send and queue another send. Disconnection should remove pending send
+ isReady.set(false);
+ RequestFuture<ClientResponse> future3 = consumerClient.send(node, ApiKeys.METADATA, heartbeatRequest());
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertEquals(1, consumerClient.pendingRequestCount(node));
+ disconnected.set(true);
+ consumerClient.poll(0);
+ assertTrue(future3.isDone());
+ assertFalse(future3.succeeded());
+ assertEquals(0, consumerClient.pendingRequestCount());
+ assertEquals(0, consumerClient.pendingRequestCount(node));
+ }
private HeartbeatRequest heartbeatRequest() {
return new HeartbeatRequest("group", 1, "memberId");
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 58c3841..9002e81 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -86,7 +86,7 @@
private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
private Metrics metrics = new Metrics(time);
private static final double EPSILON = 0.0001;
- private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 7294ed4..57028ef 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -99,7 +99,8 @@
config.getInt(CommonClientConfigs.SEND_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG), time);
- this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
+ this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
+ config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
this.coordinator = new WorkerCoordinator(this.client,
config.getString(DistributedConfig.GROUP_ID_CONFIG),
config.getInt(DistributedConfig.SESSION_TIMEOUT_MS_CONFIG),
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index abb62b9..bf33cb3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -88,7 +88,7 @@
this.client = new MockClient(time);
this.metadata = new Metadata(0, Long.MAX_VALUE);
this.metadata.update(cluster, time.milliseconds());
- this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index b857315..ef76ffc 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -20,7 +20,7 @@
import kafka.coordinator.{GroupOverview, GroupSummary, MemberSummary}
import kafka.utils.Logging
import org.apache.kafka.clients._
-import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture, SendFailedException}
+import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture}
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.errors.DisconnectException
@@ -43,21 +43,15 @@
private def send(target: Node,
api: ApiKeys,
request: AbstractRequest): Struct = {
- var now = time.milliseconds()
- val deadline = now + requestTimeoutMs
var future: RequestFuture[ClientResponse] = null
- do {
- future = client.send(target, api, request)
- client.poll(future)
+ future = client.send(target, api, request)
+ client.poll(future)
- if (future.succeeded())
- return future.value().responseBody()
-
- now = time.milliseconds()
- } while (now < deadline && future.exception().isInstanceOf[SendFailedException])
-
- throw future.exception()
+ if (future.succeeded())
+ return future.value().responseBody()
+ else
+ throw future.exception()
}
private def sendAnyNode(api: ApiKeys, request: AbstractRequest): Struct = {
@@ -244,7 +238,8 @@
networkClient,
metadata,
time,
- DefaultRetryBackoffMs)
+ DefaultRetryBackoffMs,
+ DefaultRequestTimeoutMs)
new AdminClient(
time,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f050e27..22657f4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -522,8 +522,12 @@
getReplicaOrException(topic, partition)
// decide whether to only fetch committed data (i.e. messages below high watermark)
- val maxOffsetOpt = if (readOnlyCommitted)
- Some(localReplica.highWatermark.messageOffset)
+ val maxOffsetOpt = if (readOnlyCommitted) {
+ val maxOffset = localReplica.highWatermark.messageOffset
+ if(offset > maxOffset)
+ throw new OffsetOutOfRangeException("Request for offset %d beyond high watermark %d when reading from only committed offsets".format(offset, maxOffset))
+ Some(maxOffset)
+ }
else
None
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 4d75d53..3f6a275 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -228,6 +228,7 @@
/**
* Test reading at the boundary of the log, specifically
* - reading from the logEndOffset should give an empty message set
+ * - reading from the the maxOffset should give an empty message set
* - reading beyond the log end offset should throw an OffsetOutOfRangeException
*/
@Test
@@ -236,19 +237,21 @@
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time)
- assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes)
+ log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = new Message("42".getBytes)))
+ assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1025, 1000).messageSet.sizeInBytes)
try {
- log.read(0, 1024)
+ log.read(0, 1025)
fail("Expected exception on invalid read.")
} catch {
case e: OffsetOutOfRangeException => "This is good."
}
try {
- log.read(1025, 1000)
+ log.read(1026, 1000)
fail("Expected exception on invalid read.")
} catch {
case e: OffsetOutOfRangeException => // This is good.
}
+ assertEquals("Reading from maxOffset should produce 0 byte read.", 0, log.read(1024, 1000, Some(1024)).messageSet.sizeInBytes)
}
/**
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index ee14af4..c2c670e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -35,8 +35,8 @@
import org.apache.kafka.common.utils.{MockTime => JMockTime}
import org.apache.kafka.common.{Node, TopicPartition}
import org.easymock.EasyMock
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
+import org.junit.Assert.{assertEquals, assertTrue, assertFalse}
+import org.junit.{Test, Before, After}
import scala.collection.JavaConverters._
import scala.collection.Map
@@ -44,17 +44,28 @@
class ReplicaManagerTest {
val topic = "test-topic"
+ val time = new MockTime()
+ val jTime = new JMockTime
+ val metrics = new Metrics
+ var zkClient : ZkClient = _
+ var zkUtils : ZkUtils = _
+
+ @Before
+ def setUp() {
+ zkClient = EasyMock.createMock(classOf[ZkClient])
+ zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
+ }
+
+ @After
+ def tearDown() {
+ metrics.close();
+ }
@Test
def testHighWaterMarkDirectoryMapping() {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props)
- val zkClient = EasyMock.createMock(classOf[ZkClient])
- val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
- val time = new MockTime()
- val jTime = new JMockTime
- val metrics = new Metrics
val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
try {
@@ -64,7 +75,6 @@
} finally {
// shutdown the replica manager upon test completion
rm.shutdown(false)
- metrics.close()
}
}
@@ -73,12 +83,7 @@
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
- val zkClient = EasyMock.createMock(classOf[ZkClient])
- val zkUtils = ZkUtils(zkClient, false)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
- val time = new MockTime()
- val jTime = new JMockTime
- val metrics = new Metrics
val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
try {
@@ -88,7 +93,6 @@
} finally {
// shutdown the replica manager upon test completion
rm.shutdown(checkpointHW = false)
- metrics.close()
}
}
@@ -96,12 +100,7 @@
def testIllegalRequiredAcks() {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props)
- val zkClient = EasyMock.createMock(classOf[ZkClient])
- val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
- val time = new MockTime()
- val jTime = new JMockTime
- val metrics = new Metrics
val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false), Option(this.getClass.getName))
try {
@@ -116,7 +115,6 @@
responseCallback = callback)
} finally {
rm.shutdown(checkpointHW = false)
- metrics.close()
}
TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
@@ -127,12 +125,7 @@
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
- val zkClient = EasyMock.createMock(classOf[ZkClient])
- val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
- val time = new MockTime()
- val jTime = new JMockTime
- val metrics = new Metrics
val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))
@@ -192,7 +185,80 @@
assertTrue(fetchCallbackFired)
} finally {
rm.shutdown(checkpointHW = false)
- metrics.close()
+ }
+ }
+
+ @Test
+ def testFetchBeyondHighWatermarkNotAllowedForConsumer() {
+ val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+ props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
+ props.put("broker.id", Int.box(0))
+ val config = KafkaConfig.fromProps(props)
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
+ val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
+ new AtomicBoolean(false), Option(this.getClass.getName))
+ try {
+ val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1), new Broker(1, "host2", 2))
+ val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+ EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
+ EasyMock.replay(metadataCache)
+
+ val brokerList : java.util.List[Integer] = Seq[Integer](0, 1, 2).asJava
+ val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1, 2).asJava
+
+ val partition = rm.getOrCreatePartition(topic, 0)
+ partition.getOrCreateReplica(0)
+
+ // Make this replica the leader.
+ val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
+ collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
+ Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava)
+ rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
+ rm.getLeaderReplicaIfLocal(topic, 0)
+
+ def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {}
+
+ // Append a message.
+ for(i <- 1 to 2)
+ rm.appendMessages(
+ timeout = 1000,
+ requiredAcks = -1,
+ internalTopicsAllowed = false,
+ messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new Message("message %d".format(i).getBytes))),
+ responseCallback = produceCallback)
+
+ var fetchCallbackFired = false
+ var fetchError = 0
+ def fetchCallback(responseStatus: Map[TopicAndPartition, FetchResponsePartitionData]) = {
+ fetchError = responseStatus.values.head.error
+ fetchCallbackFired = true
+ }
+
+ // Fetch a message above the high watermark as a follower
+ rm.fetchMessages(
+ timeout = 1000,
+ replicaId = 1,
+ fetchMinBytes = 1,
+ fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
+ responseCallback = fetchCallback)
+
+
+ assertTrue(fetchCallbackFired)
+ assertEquals("Should not give an exception", Errors.NONE.code, fetchError)
+ fetchCallbackFired = false
+
+ // Fetch a message above the high watermark as a consumer
+ rm.fetchMessages(
+ timeout = 1000,
+ replicaId = -1,
+ fetchMinBytes = 1,
+ fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(1, 100000)),
+ responseCallback = fetchCallback)
+
+ assertTrue(fetchCallbackFired)
+ assertEquals("Should give OffsetOutOfRangeException", Errors.OFFSET_OUT_OF_RANGE.code, fetchError)
+ } finally {
+ rm.shutdown(checkpointHW = false)
}
}
}
diff --git a/gradle.properties b/gradle.properties
index b058e58..0a612f6 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -16,7 +16,7 @@
group=org.apache.kafka
# NOTE: When you change this version number, you should also make sure to update
# the version numbers in tests/kafkatest/__init__.py and kafka-merge-pr.py.
-version=0.10.0.0-SNAPSHOT
+version=0.10.1.0-SNAPSHOT
scalaVersion=2.10.6
task=build
org.gradle.jvmargs=-XX:MaxPermSize=512m -Xmx1024m -Xss2m
diff --git a/kafka-merge-pr.py b/kafka-merge-pr.py
index e124105..f26a0a9 100644
--- a/kafka-merge-pr.py
+++ b/kafka-merge-pr.py
@@ -72,7 +72,7 @@
DEV_BRANCH_NAME = "trunk"
-DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.0.0")
+DEFAULT_FIX_VERSION = os.environ.get("DEFAULT_FIX_VERSION", "0.10.1.0")
def get_json(url):
try:
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
new file mode 100644
index 0000000..83064e8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ForeachAction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+
+
+/**
+ * The ForeachAction interface for performing an action on a key-value pair.
+ * Note that this action is stateless. If stateful processing is required, consider
+ * using {@link KStream#transform(TransformerSupplier, String...)} or
+ * {@link KStream#process(ProcessorSupplier, String...)} instead.
+ *
+ * @param <K> original key type
+ * @param <V> original value type
+ */
+public interface ForeachAction<K, V> {
+ void apply(K key, V value);
+}
+
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index e4933cb..27475aa 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -64,6 +64,52 @@
<V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
/**
+ * Print the elements of this stream to System.out
+ *
+ * Implementors will need to override toString for keys and values that are not of
+ * type String, Integer etc to get meaningful information.
+ */
+ void print();
+
+
+ /**
+ * Print the elements of this stream to System.out
+ *
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ *
+ * Implementors will need to override toString for keys and values that are not of
+ * type String, Integer etc to get meaningful information.
+ */
+ void print(Serde<K> keySerde, Serde<V> valSerde);
+
+
+ /**
+ * Write the elements of this stream to a file at the given path.
+ *
+ * @param filePath name of file to write to
+ *
+ * Implementors will need to override toString for keys and values that are not of
+ * type String, Integer etc to get meaningful information.
+ */
+ void writeAsText(String filePath);
+
+ /**
+ * @param filePath name of file to write to
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ *
+ * Implementors will need to override toString for keys and values that are not of
+ * type String, Integer etc to get meaningful information.
+ */
+
+ void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
+
+ /**
* Create a new instance of {@link KStream} by transforming each element in this stream into zero or more elements in the new stream.
*
* @param mapper the instance of {@link KeyValueMapper}
@@ -101,6 +147,14 @@
KStream<K, V> through(String topic);
/**
+ * Perform an action on each element of {@link KStream}.
+ * Note that this is a terminal operation that returns void.
+ *
+ * @param action An action to perform on each element
+ */
+ void foreach(ForeachAction<K, V> action);
+
+ /**
* Materialize this stream to a topic, also creates a new instance of {@link KStream} from the topic
* using default serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
* This is equivalent to calling {@link #to(StreamPartitioner, String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#stream(String...)}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 581ee28..bb6878f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -53,6 +53,49 @@
*/
<V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper);
+
+ /**
+ * Print the elements of this stream to System.out
+ *
+ * Implementors will need to override toString for keys and values that are not of
+ * type String, Integer etc to get meaningful information.
+ */
+ void print();
+
+ /**
+ * Print the elements of this stream to System.out
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ *
+ * Implementors will need to override toString for keys and values that are not of
+ * type String, Integer etc to get meaningful information.
+ */
+ void print(Serde<K> keySerde, Serde<V> valSerde);
+
+ /**
+ * Write the elements of this stream to a file at the given path.
+ * @param filePath name of file to write to
+ *
+ * Implementors will need to override toString for keys and values that are not of
+ * type String, Integer etc to get meaningful information.
+ */
+ void writeAsText(String filePath);
+
+ /**
+ *
+ * @param filePath name of file to write to
+ * @param keySerde key serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ * @param valSerde value serde used to send key-value pairs,
+ * if not specified the default serde defined in the configs will be used
+ *
+ * Implementors will need to override toString for keys and values that are not of
+ * type String, Integer etc to get meaningful information.
+ */
+ void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
+
/**
* Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
* using default serializers and deserializers and producer's {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
@@ -297,4 +340,12 @@
* @param <K1> the key type of the aggregated {@link KTable}
*/
<K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, String name);
+
+ /**
+ * Perform an action on each element of {@link KTable}.
+ * Note that this is a terminal operation that returns void.
+ *
+ * @param action An action to perform on each element
+ */
+ void foreach(ForeachAction<K, V> action);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 8069dca..5197e94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -32,7 +32,7 @@
* Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
* that contains it is initialized.
* <p>
- * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
+ * If this transformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
* {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
*
* @param context the context; may not be null
@@ -44,17 +44,18 @@
*
* @param key the key for the message
* @param value the value for the message
- * @return new value
+ * @return new value; if null no key-value pair will be forwarded to down stream
*/
R transform(K key, V value);
/**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+ * Perform any periodic operations and possibly generate a key, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
* during {@link #init(ProcessorContext) initialization}.
*
* @param timestamp the stream time when this method is being called
+ * @return new value; if null it will not be forwarded to down stream
*/
- void punctuate(long timestamp);
+ R punctuate(long timestamp);
/**
* Close this processor and clean up any resources.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 1a0679d..63214fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -47,12 +47,13 @@
R transform(V value);
/**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
+ * Perform any periodic operations and possibly return a new value, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
* during {@link #init(ProcessorContext) initialization}.
*
* @param timestamp the stream time when this method is being called
+ * @return new value; if null it will not be forwarded to down stream
*/
- void punctuate(long timestamp);
+ R punctuate(long timestamp);
/**
* Close this processor and clean up any resources.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index e9b7cad..5ea0791 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.StreamsException;
import java.nio.ByteBuffer;
import java.util.Map;
@@ -39,8 +40,21 @@
@Override
public byte[] serialize(String topic, Change<T> data) {
+ byte[] serializedKey;
+
// only one of the old / new values would be not null
- byte[] serializedKey = inner.serialize(topic, data.newValue != null ? data.newValue : data.oldValue);
+ if (data.newValue != null) {
+ if (data.oldValue != null)
+ throw new StreamsException("Both old and new values are not null (" + data.oldValue
+ + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+
+ serializedKey = inner.serialize(topic, data.newValue);
+ } else {
+ if (data.oldValue == null)
+ throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
+
+ serializedKey = inner.serialize(topic, data.oldValue);
+ }
ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
buf.put(serializedKey);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
new file mode 100644
index 0000000..2fd7ef9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+class KStreamForeach<K, V> implements ProcessorSupplier<K, V> {
+
+ private final ForeachAction<K, V> action;
+
+ public KStreamForeach(ForeachAction<K, V> action) {
+ this.action = action;
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KStreamForeachProcessor();
+ }
+
+ private class KStreamForeachProcessor extends AbstractProcessor<K, V> {
+ @Override
+ public void process(K key, V value) {
+ action.apply(key, value);
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 0fb3984..97a7aac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -20,12 +20,14 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -41,7 +43,9 @@
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;
-
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
import java.lang.reflect.Array;
import java.util.HashSet;
import java.util.Set;
@@ -78,9 +82,9 @@
private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
- private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
+ private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
- private static final String SELECT_NAME = "KSTREAM-SELECT-";
+ private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
public static final String SINK_NAME = "KSTREAM-SINK-";
@@ -92,6 +96,8 @@
private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
+ private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
+
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
super(topology, name, sourceNodes);
}
@@ -133,6 +139,37 @@
}
@Override
+ public void print() {
+ print(null, null);
+ }
+
+ @Override
+ public void print(Serde<K> keySerde, Serde<V> valSerde) {
+ String name = topology.newName(PRINTING_NAME);
+ topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
+ }
+
+
+ @Override
+ public void writeAsText(String filePath) {
+ writeAsText(filePath, null, null);
+ }
+
+ @Override
+ public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
+ String name = topology.newName(PRINTING_NAME);
+ try {
+
+ PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
+ topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde), this.name);
+
+ } catch (FileNotFoundException e) {
+ String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
+ throw new TopologyBuilderException(message);
+ }
+ }
+
+ @Override
public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper) {
String name = topology.newName(FLATMAP_NAME);
@@ -201,6 +238,13 @@
}
@Override
+ public void foreach(ForeachAction<K, V> action) {
+ String name = topology.newName(FOREACH_NAME);
+
+ topology.addProcessor(name, new KStreamForeach<>(action), this.name);
+ }
+
+ @Override
public KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
return through(keySerde, valSerde, null, topic);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
index 5b83b28..94e0b88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java
@@ -55,8 +55,12 @@
@Override
public void process(K key, V value) {
- context().forward(key, value);
- window.put(key, value);
+ // if the key is null, we do not need to put the record into window store
+ // since it will never be considered for join operations
+ if (key != null) {
+ context().forward(key, value);
+ window.put(key, value);
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index a4ac9b3..d8caf3c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -63,6 +64,10 @@
@Override
public void process(K key, V1 value) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KStream-KStream join operator with other window state store " + otherWindowName + " should not be null.");
+
boolean needOuterJoin = KStreamKStreamJoin.this.outer;
long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
index dfca019..92b9b59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java
@@ -55,7 +55,11 @@
@Override
public void process(K key, V1 value) {
- context().forward(key, joiner.apply(value, valueGetter.get(key)));
+ // if the key is null, we do not need proceed joining
+ // the record with the table
+ if (key != null) {
+ context().forward(key, joiner.apply(value, valueGetter.get(key)));
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index 4299c66..af100a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -20,6 +20,7 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -34,13 +35,12 @@
@Override
public Processor<K, V> get() {
- return new KStreamTransformProcessor(transformerSupplier.get());
+ return new KStreamTransformProcessor<>(transformerSupplier.get());
}
- public static class KStreamTransformProcessor<K1, V1, K2, V2> implements Processor<K1, V1> {
+ public static class KStreamTransformProcessor<K1, V1, K2, V2> extends AbstractProcessor<K1, V1> {
private final Transformer<K1, V1, KeyValue<K2, V2>> transformer;
- private ProcessorContext context;
public KStreamTransformProcessor(Transformer<K1, V1, KeyValue<K2, V2>> transformer) {
this.transformer = transformer;
@@ -48,19 +48,24 @@
@Override
public void init(ProcessorContext context) {
+ super.init(context);
transformer.init(context);
- this.context = context;
}
@Override
public void process(K1 key, V1 value) {
KeyValue<K2, V2> pair = transformer.transform(key, value);
- context.forward(pair.key, pair.value);
+
+ if (pair != null)
+ context().forward(pair.key, pair.value);
}
@Override
public void punctuate(long timestamp) {
- transformer.punctuate(timestamp);
+ KeyValue<K2, V2> pair = transformer.punctuate(timestamp);
+
+ if (pair != null)
+ context().forward(pair.key, pair.value);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index 6f989e6..cb9aab1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -27,18 +27,18 @@
private final ValueTransformerSupplier<V, R> valueTransformerSupplier;
- public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) {
+ public KStreamTransformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier) {
this.valueTransformerSupplier = valueTransformerSupplier;
}
@Override
public Processor<K, V> get() {
- return new KStreamTransformValuesProcessor(valueTransformerSupplier.get());
+ return new KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
}
public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K, V> {
- private final ValueTransformer valueTransformer;
+ private final ValueTransformer<V, R> valueTransformer;
private ProcessorContext context;
public KStreamTransformValuesProcessor(ValueTransformer<V, R> valueTransformer) {
@@ -58,7 +58,10 @@
@Override
public void punctuate(long timestamp) {
- valueTransformer.punctuate(timestamp);
+ R ret = valueTransformer.punctuate(timestamp);
+
+ if (ret != null)
+ context.forward(null, ret);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 76964f9..f36cc8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -72,6 +72,11 @@
@Override
public void process(K key, V value) {
+ // if the key is null, we do not need proceed aggregating the record
+ // the record with the table
+ if (key == null)
+ return;
+
// first get the matching windows
long timestamp = context().timestamp();
Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index d532e79..6c05ce3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -69,6 +69,11 @@
@Override
public void process(K key, V value) {
+ // if the key is null, we do not need proceed aggregating the record
+ // the record with the table
+ if (key == null)
+ return;
+
// first get the matching windows
long timestamp = context().timestamp();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 156f2db..ee2c931 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -19,12 +19,14 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
@@ -35,6 +37,9 @@
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.Stores;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
import java.util.Collections;
import java.util.Set;
@@ -68,6 +73,8 @@
public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
+ private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
+
private static final String REDUCE_NAME = "KTABLE-REDUCE-";
private static final String SELECT_NAME = "KTABLE-SELECT-";
@@ -76,6 +83,8 @@
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+ private static final String FOREACH_NAME = "KTABLE-FOREACH-";
+
public final ProcessorSupplier<?, ?> processorSupplier;
private final Serde<K> keySerde;
@@ -132,6 +141,36 @@
}
@Override
+ public void print() {
+ print(null, null);
+ }
+
+ @Override
+ public void print(Serde<K> keySerde, Serde<V> valSerde) {
+ String name = topology.newName(PRINTING_NAME);
+ topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde), this.name);
+ }
+
+
+ @Override
+ public void writeAsText(String filePath) {
+ writeAsText(filePath, null, null);
+ }
+
+ @Override
+ public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
+ String name = topology.newName(PRINTING_NAME);
+ try {
+ PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
+ topology.addProcessor(name, new KeyValuePrinter<>(printStream, keySerde, valSerde), this.name);
+ } catch (FileNotFoundException e) {
+ String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
+ throw new TopologyBuilderException(message);
+ }
+ }
+
+
+ @Override
public KTable<K, V> through(Serde<K> keySerde,
Serde<V> valSerde,
StreamPartitioner<K, V> partitioner,
@@ -142,6 +181,18 @@
}
@Override
+ public void foreach(final ForeachAction<K, V> action) {
+ String name = topology.newName(FOREACH_NAME);
+ KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() {
+ @Override
+ public void apply(K key, Change<V> value) {
+ action.apply(key, value.newValue);
+ }
+ });
+ topology.addProcessor(name, processorSupplier, this.name);
+ }
+
+ @Override
public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
return through(keySerde, valSerde, null, topic);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
index 6eb27b6..24c8da6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@
@Override
public void process(K key, Change<V1> change) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable join operator should not be null.");
+
R newValue = null;
R oldValue = null;
V2 value2 = null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 00e872e..4bf45ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@
@Override
public void process(K key, Change<V1> change) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable left-join operator should not be null.");
+
R newValue = null;
R oldValue = null;
V2 value2 = null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 6ab0ae9..49eed53 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -61,6 +62,10 @@
@Override
public void process(K key, Change<V1> change) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable outer-join operator should not be null.");
+
R newValue = null;
R oldValue = null;
V2 value2 = valueGetter.get(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index a6a13fc..7443d4a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -62,6 +63,10 @@
@Override
public void process(K key, Change<V1> change) {
+ // the keys should never be null
+ if (key == null)
+ throw new StreamsException("Record key for KTable right-join operator should not be null.");
+
R newValue = null;
R oldValue = null;
V2 value2 = valueGetter.get(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index ff69c37..142a279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
@@ -77,6 +78,10 @@
public void process(K key, Change<V> change) {
KeyValue<K1, V1> newPair = computeValue(key, change.newValue);
+ // the selected repartition key should never be null
+ if (newPair.key == null)
+ throw new StreamsException("Record key for KTable repartition operator should not be null.");
+
context().forward(newPair.key, new Change<>(newPair.value, null));
if (change.oldValue != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
new file mode 100644
index 0000000..d1c1d8b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.io.PrintStream;
+
+
+class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
+
+ private final PrintStream printStream;
+ private Serde<?> keySerde;
+ private Serde<?> valueSerde;
+ private boolean notStandardOut;
+
+
+ KeyValuePrinter(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde) {
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ if (printStream == null) {
+ this.printStream = System.out;
+ } else {
+ this.printStream = printStream;
+ notStandardOut = true;
+ }
+ }
+
+ KeyValuePrinter(PrintStream printStream) {
+ this(printStream, null, null);
+ }
+
+ KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde) {
+ this(System.out, keySerde, valueSerde);
+ }
+
+ KeyValuePrinter() {
+ this(System.out, null, null);
+ }
+
+ @Override
+ public Processor<K, V> get() {
+ return new KeyValuePrinterProcessor(this.printStream, this.keySerde, this.valueSerde);
+ }
+
+
+ private class KeyValuePrinterProcessor extends AbstractProcessor<K, V> {
+ private final PrintStream printStream;
+ private Serde<?> keySerde;
+ private Serde<?> valueSerde;
+ private ProcessorContext processorContext;
+
+ private KeyValuePrinterProcessor(PrintStream printStream, Serde<?> keySerde, Serde<?> valueSerde) {
+ this.printStream = printStream;
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ this.processorContext = context;
+
+ if (this.keySerde == null) {
+ keySerde = this.processorContext.keySerde();
+ }
+
+ if (this.valueSerde == null) {
+ valueSerde = this.processorContext.valueSerde();
+ }
+ }
+
+ @Override
+ public void process(K key, V value) {
+ K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
+ V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer());
+
+ printStream.println(keyToPrint + " , " + valueToPrint);
+
+ this.processorContext.forward(key, value);
+ }
+
+
+ private Object maybeDeserialize(Object receivedElement, Deserializer<?> deserializer) {
+ if (receivedElement == null) {
+ return null;
+ }
+
+ if (receivedElement instanceof byte[]) {
+ return deserializer.deserialize(this.processorContext.topic(), (byte[]) receivedElement);
+ }
+
+ return receivedElement;
+ }
+
+ @Override
+ public void close() {
+ if (notStandardOut) {
+ this.printStream.close();
+ } else {
+ this.printStream.flush();
+ }
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 434996e..815b5b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -106,39 +106,57 @@
* Forwards a key/value pair to one of the downstream processors designated by childIndex
* @param key key
* @param value value
+ * @param childIndex index in list of children of this node
*/
<K, V> void forward(K key, V value, int childIndex);
/**
+ * Forwards a key/value pair to one of the downstream processors designated by the downstream processor name
+ * @param key key
+ * @param value value
+ * @param childName name of downstream processor
+ */
+ <K, V> void forward(K key, V value, String childName);
+
+ /**
* Requests a commit
*/
void commit();
/**
- * Returns the topic name of the current input record
+ * Returns the topic name of the current input record; could be null if it is not
+ * available (for example, if this method is invoked from the punctuate call)
*
* @return the topic name
*/
String topic();
/**
- * Returns the partition id of the current input record
+ * Returns the partition id of the current input record; could be -1 if it is not
+ * available (for example, if this method is invoked from the punctuate call)
*
* @return the partition id
*/
int partition();
/**
- * Returns the offset of the current input record
+ * Returns the offset of the current input record; could be -1 if it is not
+ * available (for example, if this method is invoked from the punctuate call)
*
* @return the offset
*/
long offset();
/**
- * Returns the timestamp of the current input record. The timestamp is extracted from
+ * Returns the current timestamp.
+ *
+ * If it is triggered while processing a record streamed from the source processor, timestamp is defined as the timestamp of the current input record; the timestamp is extracted from
* {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}.
*
+ * If it is triggered while processing a record generated not from the source processor (for example,
+ * if this method is invoked from the punctuate call), timestamp is defined as the current
+ * task's stream time, which is defined as the smallest among all its input stream partition timestamps.
+ *
* @return the timestamp
*/
long timestamp();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 3d8f792..ec89d47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -135,17 +135,14 @@
* partition timestamp among all its partitions
*/
public long timestamp() {
- if (queuesByTime.isEmpty()) {
- // if there is no data in all partitions, return the smallest of their last known times
- long timestamp = Long.MAX_VALUE;
- for (RecordQueue queue : partitionQueues.values()) {
- if (timestamp > queue.timestamp())
- timestamp = queue.timestamp();
- }
- return timestamp;
- } else {
- return queuesByTime.peek().timestamp();
+ // we should always return the smallest timestamp of all partitions
+ // to avoid group partition time goes backward
+ long timestamp = Long.MAX_VALUE;
+ for (RecordQueue queue : partitionQueues.values()) {
+ if (timestamp > queue.timestamp())
+ timestamp = queue.timestamp();
}
+ return timestamp;
}
public int numBuffered(TopicPartition partition) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 888b89e..1c398ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -30,6 +30,8 @@
public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
+ public static final String NONEXIST_TOPIC = "__null_topic__";
+
private final TaskId id;
private final StreamTask task;
private final StreamsMetrics metrics;
@@ -118,7 +120,7 @@
if (node == null)
throw new TopologyBuilderException("Accessing from an unknown node");
- // TODO: restore this once we fix the ValueGetter initialiation issue
+ // TODO: restore this once we fix the ValueGetter initialization issue
//if (!node.stateStores.contains(name))
// throw new TopologyBuilderException("Processor " + node.name() + " has no access to StateStore " + name);
@@ -130,7 +132,12 @@
if (task.record() == null)
throw new IllegalStateException("This should not happen as topic() should only be called while a record is processed");
- return task.record().topic();
+ String topic = task.record().topic();
+
+ if (topic.equals(NONEXIST_TOPIC))
+ return null;
+ else
+ return topic;
}
@Override
@@ -168,6 +175,11 @@
}
@Override
+ public <K, V> void forward(K key, V value, String childName) {
+ task.forward(key, value, childName);
+ }
+
+ @Override
public void commit() {
task.needCommit();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 6db83a1..50e3a0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -48,7 +48,7 @@
return name;
}
- public final Processor processor() {
+ public final Processor<K, V> processor() {
return processor;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index d7d7eee..824e20a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -43,7 +43,7 @@
PunctuationSchedule sched = top;
pq.poll();
punctuator.punctuate(sched.node(), timestamp);
- pq.add(sched.next());
+ pq.add(sched.next(timestamp));
punctuated = true;
top = pq.peek();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
index dc9a50d..98919d2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationSchedule.java
@@ -22,11 +22,11 @@
final long interval;
public PunctuationSchedule(ProcessorNode node, long interval) {
- this(node, System.currentTimeMillis(), interval);
+ this(node, 0L, interval);
}
public PunctuationSchedule(ProcessorNode node, long time, long interval) {
- super(node, time + interval);
+ super(node, time);
this.interval = interval;
}
@@ -34,8 +34,13 @@
return value;
}
- public PunctuationSchedule next() {
- return new PunctuationSchedule(value, timestamp, interval);
+ public PunctuationSchedule next(long currTimestamp) {
+ // we need to special handle the case when it is firstly triggered (i.e. the timestamp
+ // is equal to the interval) by reschedule based on the currTimestamp
+ if (timestamp == 0L)
+ return new PunctuationSchedule(value, currTimestamp + interval, interval);
+ else
+ return new PunctuationSchedule(value, timestamp + interval, interval);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 3ad06e2..468fe74 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -108,46 +108,51 @@
@Override
public StateStore getStateStore(String name) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: getStateStore() not supported in standby tasks.");
}
@Override
public String topic() {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: topic() not supported in standby tasks.");
}
@Override
public int partition() {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: partition() not supported in standby tasks.");
}
@Override
public long offset() {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: offset() not supported in standby tasks.");
}
@Override
public long timestamp() {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: timestamp() not supported in standby tasks.");
}
@Override
public <K, V> void forward(K key, V value) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@Override
public <K, V> void forward(K key, V value, int childIndex) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
+ }
+
+ @Override
+ public <K, V> void forward(K key, V value, String childName) {
+ throw new UnsupportedOperationException("this should not happen: forward() not supported in standby tasks.");
}
@Override
public void commit() {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: commit() not supported in standby tasks.");
}
@Override
public void schedule(long interval) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("this should not happen: schedule() not supported in standby tasks.");
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 61aeced..53d0a8d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -43,6 +43,8 @@
private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
+ private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null);
+
private final int maxBufferedSize;
private final PartitionGroup partitionGroup;
@@ -202,11 +204,11 @@
/**
* Possibly trigger registered punctuation functions if
- * current time has reached the defined stamp
- *
- * @param timestamp
+ * current partition group timestamp has reached the defined stamp
*/
- public boolean maybePunctuate(long timestamp) {
+ public boolean maybePunctuate() {
+ long timestamp = partitionGroup.timestamp();
+
return punctuationQueue.mayPunctuate(timestamp, this);
}
@@ -216,10 +218,13 @@
throw new IllegalStateException("Current node is not null");
currNode = node;
+ currRecord = new StampedRecord(DUMMY_RECORD, timestamp);
+
try {
node.processor().punctuate(timestamp);
} finally {
currNode = null;
+ currRecord = null;
}
}
@@ -342,4 +347,20 @@
}
}
+ @SuppressWarnings("unchecked")
+ public <K, V> void forward(K key, V value, String childName) {
+ ProcessorNode thisNode = currNode;
+ for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+ if (childNode.name().equals(childName)) {
+ currNode = childNode;
+ try {
+ childNode.process(key, value);
+ } finally {
+ currNode = thisNode;
+ }
+ break;
+ }
+ }
+ }
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index c2a8e06..38dc356 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -341,8 +341,9 @@
totalNumBuffered = 0;
+ // try to process one fetch record from each task via the topology, and also trigger punctuate
+ // functions if necessary, which may result in more records going through the topology in this loop
if (!activeTasks.isEmpty()) {
- // try to process one record from each task
for (StreamTask task : activeTasks.values()) {
long startProcess = time.milliseconds();
@@ -431,7 +432,9 @@
try {
long now = time.milliseconds();
- if (task.maybePunctuate(now))
+ // check whether we should punctuate based on the task's partition group timestamp;
+ // which are essentially based on record timestamp.
+ if (task.maybePunctuate())
sensors.punctuateTimeSensor.record(time.milliseconds() - now);
} catch (KafkaException e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
new file mode 100644
index 0000000..6573779
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamForeachTest {
+
+ final private String topicName = "topic";
+
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
+
+ @Test
+ public void testForeach() {
+ // Given
+ List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+ new KeyValue<>(0, "zero"),
+ new KeyValue<>(1, "one"),
+ new KeyValue<>(2, "two"),
+ new KeyValue<>(3, "three")
+ );
+
+ List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
+ new KeyValue<>(0, "ZERO"),
+ new KeyValue<>(2, "ONE"),
+ new KeyValue<>(4, "TWO"),
+ new KeyValue<>(6, "THREE")
+ );
+
+ final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
+ ForeachAction<Integer, String> action =
+ new ForeachAction<Integer, String>() {
+ @Override
+ public void apply(Integer key, String value) {
+ actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
+ }
+ };
+
+ // When
+ KStreamBuilder builder = new KStreamBuilder();
+ KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
+ stream.foreach(action);
+
+ // Then
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+ for (KeyValue<Integer, String> record: inputRecords) {
+ driver.process(topicName, record.key, record.value);
+ }
+
+ assertEquals(expectedRecords.size(), actualRecords.size());
+ for (int i = 0; i < expectedRecords.size(); i++) {
+ KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
+ KeyValue<Integer, String> actualRecord = actualRecords.get(i);
+ assertEquals(expectedRecord, actualRecord);
+ }
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index d24ab15..19a9411 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -90,7 +90,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// push two items to the other stream. this should produce two items.
// w1 = { 0:X0, 1:X1 }
@@ -102,7 +102,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// push all four items to the primary stream. this should produce two items.
// w1 = { 0:X0, 1:X1 }
@@ -114,7 +114,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// push all items to the other stream. this should produce six items.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -126,7 +126,7 @@
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
// push all four items to the primary stream. this should produce six items.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -138,7 +138,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
// push two items to the other stream. this should produce six item.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
@@ -150,7 +150,7 @@
driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+ processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
} finally {
Utils.delete(baseDir);
@@ -195,7 +195,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+null", "1:X1+null");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce two items.
// w1 = { 0:X0, 1:X1 }
@@ -207,7 +207,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// push all four items to the primary stream. this should produce four items.
// w1 = { 0:X0, 1:X1 }
@@ -219,7 +219,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
// push all items to the other stream. this should produce six items.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -231,7 +231,7 @@
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
// push all four items to the primary stream. this should produce six items.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -243,7 +243,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
// push two items to the other stream. this should produce six item.
// w1 = { 0:X1, 1:X1, 0:X0, 1:X1, 2:X2, 3:X3, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
@@ -255,7 +255,7 @@
driver.process(topic2, expectedKeys[i], "YYY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
+ processor.checkAndClearProcessResult("0:X0+YYY0", "0:X0+YYY0", "0:XX0+YYY0", "1:X1+YYY1", "1:X1+YYY1", "1:XX1+YYY1");
} finally {
Utils.delete(baseDir);
@@ -302,7 +302,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// push two items to the other stream. this should produce two items.
// w1 = { 0:X0, 1:X1 }
@@ -314,7 +314,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
// clear logically
time = 1000L;
@@ -323,7 +323,7 @@
driver.setTime(time + i);
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// gradually expires items in w1
// w1 = { 0:X0, 1:X1, 2:X2, 3:X3 }
@@ -335,35 +335,35 @@
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("2:X2+YY2", "3:X3+YY3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("3:X3+YY3");
+ processor.checkAndClearProcessResult("3:X3+YY3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// go back to the time before expiration
@@ -373,35 +373,35 @@
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0");
+ processor.checkAndClearProcessResult("0:X0+YY0");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
// clear (logically)
time = 2000L;
@@ -411,7 +411,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// gradually expires items in w2
// w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
@@ -422,35 +422,35 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("3:XX3+Y3");
+ processor.checkAndClearProcessResult("3:XX3+Y3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// go back to the time before expiration
@@ -460,35 +460,35 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0");
+ processor.checkAndClearProcessResult("0:XX0+Y0");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
} finally {
Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 166e8ba..65226d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -88,7 +88,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+null", "1:X1+null");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce two items.
// w {}
@@ -98,7 +98,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
// w = { 0:Y0, 1:Y1 }
@@ -108,7 +108,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
// push all items to the other stream. this should produce no items.
// w = { 0:Y0, 1:Y1 }
@@ -118,7 +118,7 @@
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
// w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
@@ -128,7 +128,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
} finally {
Utils.delete(baseDir);
@@ -173,7 +173,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+null", "1:X1+null");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should produce no items.
// w = {}
@@ -183,7 +183,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// clear logically
time = 1000L;
@@ -196,7 +196,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// gradually expire items in window.
// w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
@@ -207,35 +207,35 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
// go back to the time before expiration
@@ -245,35 +245,35 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
driver.setTime(++time);
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+ processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
} finally {
Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 8e672a2..3acb59a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -95,7 +95,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+null", "1:X1+null");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
// push two items to the other stream. this should not produce any item.
@@ -103,7 +103,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
@@ -111,14 +111,14 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
// push all items to the other stream. this should not produce any item
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
@@ -126,7 +126,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
// push two items with null to the other stream as deletes. this should not produce any item.
@@ -134,7 +134,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- processor.checkAndClearResult();
+ processor.checkAndClearProcessResult();
// push all four items to the primary stream. this should produce four items.
@@ -142,7 +142,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
} finally {
Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 4244de5..a0a61f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -59,7 +59,8 @@
}
@Override
- public void punctuate(long timestamp) {
+ public KeyValue<Integer, Integer> punctuate(long timestamp) {
+ return KeyValue.pair(-1, (int) timestamp);
}
@Override
@@ -80,9 +81,12 @@
driver.process(topicName, expectedKeys[i], expectedKeys[i] * 10);
}
- assertEquals(4, processor.processed.size());
+ driver.punctuate(2);
+ driver.punctuate(3);
- String[] expected = {"2:10", "20:110", "200:1110", "2000:11110"};
+ assertEquals(6, processor.processed.size());
+
+ String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"};
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processor.processed.get(i));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 52abdf7..f5f9698 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -58,7 +58,8 @@
}
@Override
- public void punctuate(long timestamp) {
+ public Integer punctuate(long timestamp) {
+ return (int) timestamp;
}
@Override
@@ -82,7 +83,10 @@
assertEquals(4, processor.processed.size());
- String[] expected = {"1:10", "10:110", "100:1110", "1000:11110"};
+ driver.punctuate(2);
+ driver.punctuate(3);
+
+ String[] expected = {"1:10", "10:110", "100:1110", "1000:11110", "null:2", "null:3"};
for (int i = 0; i < expected.length; i++) {
assertEquals(expected[i], processor.processed.get(i));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index e19510f..3c7a1bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -182,15 +182,15 @@
driver.setTime(4L);
driver.process(topic1, "A", "1");
- proc1.checkAndClearResult(
+ proc1.checkAndClearProcessResult(
"[A@0]:0+1",
"[B@0]:0+2",
"[C@0]:0+3",
"[D@0]:0+4",
"[A@0]:0+1+1"
);
- proc2.checkAndClearResult();
- proc3.checkAndClearResult(
+ proc2.checkAndClearProcessResult();
+ proc3.checkAndClearProcessResult(
"[A@0]:null",
"[B@0]:null",
"[C@0]:null",
@@ -209,15 +209,15 @@
driver.setTime(9L);
driver.process(topic1, "C", "3");
- proc1.checkAndClearResult(
+ proc1.checkAndClearProcessResult(
"[A@0]:0+1+1+1", "[A@5]:0+1",
"[B@0]:0+2+2", "[B@5]:0+2",
"[D@0]:0+4+4", "[D@5]:0+4",
"[B@0]:0+2+2+2", "[B@5]:0+2+2",
"[C@0]:0+3+3", "[C@5]:0+3"
);
- proc2.checkAndClearResult();
- proc3.checkAndClearResult(
+ proc2.checkAndClearProcessResult();
+ proc3.checkAndClearProcessResult(
"[A@0]:null", "[A@5]:null",
"[B@0]:null", "[B@5]:null",
"[D@0]:null", "[D@5]:null",
@@ -236,15 +236,15 @@
driver.setTime(4L);
driver.process(topic2, "A", "a");
- proc1.checkAndClearResult();
- proc2.checkAndClearResult(
+ proc1.checkAndClearProcessResult();
+ proc2.checkAndClearProcessResult(
"[A@0]:0+a",
"[B@0]:0+b",
"[C@0]:0+c",
"[D@0]:0+d",
"[A@0]:0+a+a"
);
- proc3.checkAndClearResult(
+ proc3.checkAndClearProcessResult(
"[A@0]:0+1+1+1%0+a",
"[B@0]:0+2+2+2%0+b",
"[C@0]:0+3+3%0+c",
@@ -262,15 +262,15 @@
driver.setTime(9L);
driver.process(topic2, "C", "c");
- proc1.checkAndClearResult();
- proc2.checkAndClearResult(
+ proc1.checkAndClearProcessResult();
+ proc2.checkAndClearProcessResult(
"[A@0]:0+a+a+a", "[A@5]:0+a",
"[B@0]:0+b+b", "[B@5]:0+b",
"[D@0]:0+d+d", "[D@5]:0+d",
"[B@0]:0+b+b+b", "[B@5]:0+b+b",
"[C@0]:0+c+c", "[C@5]:0+c"
);
- proc3.checkAndClearResult(
+ proc3.checkAndClearProcessResult(
"[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
"[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
"[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index 78d274e..ee26058 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -74,8 +74,8 @@
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
- proc2.checkAndClearResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
- proc3.checkAndClearResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
+ proc2.checkAndClearProcessResult("A:null", "B:2", "C:null", "D:4", "A:null", "B:null");
+ proc3.checkAndClearProcessResult("A:1", "B:null", "C:3", "D:null", "A:null", "B:null");
}
@Test
@@ -193,25 +193,25 @@
driver.process(topic1, "B", 1);
driver.process(topic1, "C", 1);
- proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+ proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
driver.process(topic1, "A", 2);
driver.process(topic1, "B", 2);
- proc1.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
- proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+ proc1.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
driver.process(topic1, "A", 3);
- proc1.checkAndClearResult("A:(3<-null)");
- proc2.checkAndClearResult("A:(null<-null)");
+ proc1.checkAndClearProcessResult("A:(3<-null)");
+ proc2.checkAndClearProcessResult("A:(null<-null)");
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
- proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
- proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+ proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
} finally {
Utils.delete(stateDir);
@@ -250,25 +250,25 @@
driver.process(topic1, "B", 1);
driver.process(topic1, "C", 1);
- proc1.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- proc2.checkAndClearResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
+ proc1.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
driver.process(topic1, "A", 2);
driver.process(topic1, "B", 2);
- proc1.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
- proc2.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+ proc1.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+ proc2.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
driver.process(topic1, "A", 3);
- proc1.checkAndClearResult("A:(3<-2)");
- proc2.checkAndClearResult("A:(null<-2)");
+ proc1.checkAndClearProcessResult("A:(3<-2)");
+ proc2.checkAndClearProcessResult("A:(null<-2)");
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
- proc1.checkAndClearResult("A:(null<-3)", "B:(null<-2)");
- proc2.checkAndClearResult("A:(null<-null)", "B:(null<-2)");
+ proc1.checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
+ proc2.checkAndClearProcessResult("A:(null<-null)", "B:(null<-2)");
} finally {
Utils.delete(stateDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
new file mode 100644
index 0000000..4b612a5
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class KTableForeachTest {
+
+ final private String topicName = "topic";
+
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
+
+ @Test
+ public void testForeach() {
+ // Given
+ List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+ new KeyValue<>(0, "zero"),
+ new KeyValue<>(1, "one"),
+ new KeyValue<>(2, "two"),
+ new KeyValue<>(3, "three")
+ );
+
+ List<KeyValue<Integer, String>> expectedRecords = Arrays.asList(
+ new KeyValue<>(0, "ZERO"),
+ new KeyValue<>(2, "ONE"),
+ new KeyValue<>(4, "TWO"),
+ new KeyValue<>(6, "THREE")
+ );
+
+ final List<KeyValue<Integer, String>> actualRecords = new ArrayList<>();
+ ForeachAction<Integer, String> action =
+ new ForeachAction<Integer, String>() {
+ @Override
+ public void apply(Integer key, String value) {
+ actualRecords.add(new KeyValue<>(key * 2, value.toUpperCase()));
+ }
+ };
+
+ // When
+ KStreamBuilder builder = new KStreamBuilder();
+ KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName);
+ table.foreach(action);
+
+ // Then
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+ for (KeyValue<Integer, String> record: inputRecords) {
+ driver.process(topicName, record.key, record.value);
+ }
+
+ assertEquals(expectedRecords.size(), actualRecords.size());
+ for (int i = 0; i < expectedRecords.size(); i++) {
+ KeyValue<Integer, String> expectedRecord = expectedRecords.get(i);
+ KeyValue<Integer, String> actualRecord = actualRecords.get(i);
+ assertEquals(expectedRecord, actualRecord);
+ }
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 5f30574..f6ebbe1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -100,7 +100,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:null", "1:null");
+ processor.checkAndClearProcessResult("0:null", "1:null");
checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
// push two items to the other stream. this should produce two items.
@@ -109,7 +109,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
// push all four items to the primary stream. this should produce four items.
@@ -118,7 +118,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
// push all items to the other stream. this should produce four items.
@@ -126,7 +126,7 @@
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items.
@@ -135,7 +135,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push two items with null to the other stream as deletes. this should produce two item.
@@ -144,7 +144,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- processor.checkAndClearResult("0:null", "1:null");
+ processor.checkAndClearProcessResult("0:null", "1:null");
checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items.
@@ -153,7 +153,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
+ processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
} finally {
@@ -195,7 +195,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+ proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
// push two items to the other stream. this should produce two items.
@@ -203,7 +203,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -211,14 +211,14 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)");
// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -226,7 +226,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push two items with null to the other stream as deletes. this should produce two item.
@@ -234,7 +234,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+ proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -242,7 +242,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
} finally {
Utils.delete(baseDir);
@@ -285,7 +285,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)");
+ proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
// push two items to the other stream. this should produce two items.
@@ -293,7 +293,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -301,14 +301,14 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)");
// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -316,7 +316,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
// push two items with null to the other stream as deletes. this should produce two item.
@@ -324,7 +324,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- proc.checkAndClearResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
+ proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
// push all four items to the primary stream. this should produce four items.
@@ -332,7 +332,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+ proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
} finally {
Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index f92c5ca..449ea05 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -105,7 +105,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+null", "1:X1+null");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
// push two items to the other stream. this should produce two items.
@@ -114,7 +114,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
// push all four items to the primary stream. this should produce four items.
@@ -123,7 +123,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
// push all items to the other stream. this should produce four items.
@@ -131,7 +131,7 @@
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items.
@@ -140,7 +140,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push two items with null to the other stream as deletes. this should produce two item.
@@ -149,7 +149,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- processor.checkAndClearResult("0:X0+null", "1:X1+null");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items.
@@ -158,7 +158,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
} finally {
@@ -200,7 +200,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push two items to the other stream. this should produce two items.
@@ -208,7 +208,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -216,14 +216,14 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -231,7 +231,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push two items with null to the other stream as deletes. this should produce two item.
@@ -239,7 +239,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -247,7 +247,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
} finally {
Utils.delete(baseDir);
@@ -290,7 +290,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push two items to the other stream. this should produce two items.
@@ -298,7 +298,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
// push all four items to the primary stream. this should produce four items.
@@ -306,14 +306,14 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
// push all four items to the primary stream. this should produce four items.
@@ -321,7 +321,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
// push two items with null to the other stream as deletes. this should produce two item.
@@ -329,7 +329,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+ proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
// push all four items to the primary stream. this should produce four items.
@@ -337,7 +337,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+ proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
} finally {
Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 6cc77e0..ea7476a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -100,7 +100,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+null", "1:X1+null");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null));
// push two items to the other stream. this should produce two items.
@@ -109,7 +109,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
// push all four items to the primary stream. this should produce four items.
@@ -118,7 +118,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+ processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null"));
// push all items to the other stream. this should produce four items.
@@ -126,7 +126,7 @@
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items.
@@ -135,7 +135,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+ processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push two items with null to the other stream as deletes. this should produce two item.
@@ -144,7 +144,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- processor.checkAndClearResult("0:X0+null", "1:X1+null");
+ processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
// push all four items to the primary stream. this should produce four items.
@@ -153,7 +153,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
+ processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
// push middle two items to the primary stream with null. this should produce two items.
@@ -162,7 +162,7 @@
driver.process(topic1, expectedKeys[i], null);
}
- processor.checkAndClearResult("1:null", "2:null+YY2");
+ processor.checkAndClearProcessResult("1:null", "2:null+YY2");
checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3"));
} finally {
@@ -204,7 +204,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push two items to the other stream. this should produce two items.
@@ -212,7 +212,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -220,14 +220,14 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -235,7 +235,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
// push two items with null to the other stream as deletes. this should produce two item.
@@ -243,7 +243,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push all four items to the primary stream. this should produce four items.
@@ -251,7 +251,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+ proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
// push middle two items to the primary stream with null. this should produce two items.
@@ -259,7 +259,7 @@
driver.process(topic1, expectedKeys[i], null);
}
- proc.checkAndClearResult("1:(null<-null)", "2:(null+YY2<-null)");
+ proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)");
} finally {
Utils.delete(baseDir);
@@ -302,7 +302,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+null<-null)", "1:(X1+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)");
// push two items to the other stream. this should produce two items.
@@ -310,7 +310,7 @@
driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)");
// push all four items to the primary stream. this should produce four items.
@@ -318,14 +318,14 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
+ proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
// push all items to the other stream. this should produce four items.
for (int i = 0; i < expectedKeys.length; i++) {
driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
// push all four items to the primary stream. this should produce four items.
@@ -333,7 +333,7 @@
driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+ proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
// push two items with null to the other stream as deletes. this should produce two item.
@@ -341,7 +341,7 @@
driver.process(topic2, expectedKeys[i], null);
}
- proc.checkAndClearResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
+ proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)");
// push all four items to the primary stream. this should produce four items.
@@ -349,7 +349,7 @@
driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
}
- proc.checkAndClearResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+ proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
// push middle two items to the primary stream with null. this should produce two items.
@@ -357,7 +357,7 @@
driver.process(topic1, expectedKeys[i], null);
}
- proc.checkAndClearResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
+ proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
} finally {
Utils.delete(baseDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 9ec1258..9cafe8b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -223,20 +223,20 @@
driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01");
- proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
driver.process(topic1, "A", "02");
driver.process(topic1, "B", "02");
- proc.checkAndClearResult("A:(2<-null)", "B:(2<-null)");
+ proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
driver.process(topic1, "A", "03");
- proc.checkAndClearResult("A:(3<-null)");
+ proc.checkAndClearProcessResult("A:(3<-null)");
driver.process(topic1, "A", null);
- proc.checkAndClearResult("A:(null<-null)");
+ proc.checkAndClearProcessResult("A:(null<-null)");
} finally {
Utils.delete(stateDir);
@@ -276,20 +276,20 @@
driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01");
- proc.checkAndClearResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
driver.process(topic1, "A", "02");
driver.process(topic1, "B", "02");
- proc.checkAndClearResult("A:(2<-1)", "B:(2<-1)");
+ proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
driver.process(topic1, "A", "03");
- proc.checkAndClearResult("A:(3<-2)");
+ proc.checkAndClearProcessResult("A:(3<-2)");
driver.process(topic1, "A", null);
- proc.checkAndClearResult("A:(null<-3)");
+ proc.checkAndClearProcessResult("A:(null<-3)");
} finally {
Utils.delete(stateDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 51276f3..7c158e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -131,21 +131,21 @@
driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01");
- proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+ proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
driver.process(topic1, "A", "02");
driver.process(topic1, "B", "02");
- proc1.checkAndClearResult("A:(02<-null)", "B:(02<-null)");
+ proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
driver.process(topic1, "A", "03");
- proc1.checkAndClearResult("A:(03<-null)");
+ proc1.checkAndClearProcessResult("A:(03<-null)");
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
- proc1.checkAndClearResult("A:(null<-null)", "B:(null<-null)");
+ proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
} finally {
Utils.delete(stateDir);
@@ -176,21 +176,21 @@
driver.process(topic1, "B", "01");
driver.process(topic1, "C", "01");
- proc1.checkAndClearResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+ proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
driver.process(topic1, "A", "02");
driver.process(topic1, "B", "02");
- proc1.checkAndClearResult("A:(02<-01)", "B:(02<-01)");
+ proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
driver.process(topic1, "A", "03");
- proc1.checkAndClearResult("A:(03<-02)");
+ proc1.checkAndClearProcessResult("A:(03<-02)");
driver.process(topic1, "A", null);
driver.process(topic1, "B", null);
- proc1.checkAndClearResult("A:(null<-03)", "B:(null<-02)");
+ proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
} finally {
Utils.delete(stateDir);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
new file mode 100644
index 0000000..22948ab
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class KeyValuePrinterProcessorTest {
+
+ private String topicName = "topic";
+ private Serde<String> stringSerde = Serdes.String();
+ private Serde<byte[]> bytesSerde = Serdes.ByteArray();
+ private ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ private KStreamBuilder builder = new KStreamBuilder();
+ private PrintStream printStream = new PrintStream(baos);
+
+
+ @Test
+ public void testPrintKeyValueDefaultSerde() throws Exception {
+
+ KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printStream);
+ String[] suppliedKeys = {"foo", "bar", null};
+ String[] suppliedValues = {"value1", "value2", "value3"};
+ String[] expectedValues = {"foo , value1", "bar , value2", "null , value3"};
+
+
+ KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName);
+ stream.process(keyValuePrinter);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+ for (int i = 0; i < suppliedKeys.length; i++) {
+ driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
+ }
+
+ String[] capturedValues = new String(baos.toByteArray(), Charset.forName("UTF-8")).split("\n");
+
+ for (int i = 0; i < capturedValues.length; i++) {
+ assertEquals(capturedValues[i], expectedValues[i]);
+ }
+ }
+
+
+ @Test
+ public void testPrintKeyValueWithProvidedSerde() throws Exception {
+
+ Serde<MockObject> mockObjectSerde = Serdes.serdeFrom(new MockSerializer(), new MockDeserializer());
+ KeyValuePrinter<String, MockObject> keyValuePrinter = new KeyValuePrinter<>(printStream, stringSerde, mockObjectSerde);
+ KStream<String, MockObject> stream = builder.stream(stringSerde, mockObjectSerde, topicName);
+
+ stream.process(keyValuePrinter);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder);
+
+ String suppliedKey = null;
+ byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));
+
+ driver.process(topicName, suppliedKey, suppliedValue);
+ String expectedPrintedValue = "null , name:print label:test";
+ String capturedValue = new String(baos.toByteArray(), Charset.forName("UTF-8")).trim();
+
+ assertEquals(capturedValue, expectedPrintedValue);
+
+ }
+
+ private static class MockObject {
+ public String name;
+ public String label;
+
+ public MockObject() {
+ }
+
+ MockObject(String name, String label) {
+ this.name = name;
+ this.label = label;
+ }
+
+ @Override
+ public String toString() {
+ return "name:" + name + " label:" + label;
+ }
+ }
+
+
+ private static class MockDeserializer implements Deserializer<MockObject> {
+
+ private com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+
+ }
+
+ @Override
+ public MockObject deserialize(String topic, byte[] data) {
+ MockObject mockObject;
+ try {
+ mockObject = objectMapper.readValue(data, MockObject.class);
+ } catch (Exception e) {
+ throw new SerializationException(e);
+ }
+ return mockObject;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+
+ private static class MockSerializer implements Serializer<MockObject> {
+ private final com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+
+ }
+
+ @Override
+ public byte[] serialize(String topic, MockObject data) {
+ try {
+ return objectMapper.writeValueAsBytes(data);
+ } catch (Exception e) {
+ throw new SerializationException("Error serializing JSON message", e);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 5bf1b5e..a1c07af7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -21,7 +21,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -60,17 +59,17 @@
// add three 3 records with timestamp 1, 3, 5 to partition-1
List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue));
group.addRawRecords(partition1, list1);
// add three 3 records with timestamp 2, 4, 6 to partition-2
List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
- new ConsumerRecord<>("topic", 1, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
- new ConsumerRecord<>("topic", 1, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue));
+ new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
group.addRawRecords(partition2, list2);
@@ -82,7 +81,7 @@
StampedRecord record;
PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
- // get one record
+ // get one record, now the time should be advanced
record = group.nextRecord(info);
assertEquals(partition1, info.partition());
assertEquals(1L, record.timestamp);
@@ -99,5 +98,72 @@
assertEquals(2, group.numBuffered(partition1));
assertEquals(2, group.numBuffered(partition2));
assertEquals(3L, group.timestamp());
+
+ // add three 3 records with timestamp 2, 4, 6 to partition-1 again
+ List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
+ new ConsumerRecord<>("topic", 1, 2L, recordKey, recordValue),
+ new ConsumerRecord<>("topic", 1, 4L, recordKey, recordValue));
+
+ group.addRawRecords(partition1, list3);
+
+ assertEquals(6, group.numBuffered());
+ assertEquals(4, group.numBuffered(partition1));
+ assertEquals(2, group.numBuffered(partition2));
+ assertEquals(3L, group.timestamp());
+
+ // get one record, time should not be advanced
+ record = group.nextRecord(info);
+ assertEquals(partition1, info.partition());
+ assertEquals(3L, record.timestamp);
+ assertEquals(5, group.numBuffered());
+ assertEquals(3, group.numBuffered(partition1));
+ assertEquals(2, group.numBuffered(partition2));
+ assertEquals(3L, group.timestamp());
+
+ // get one more record, now time should be advanced
+ record = group.nextRecord(info);
+ assertEquals(partition1, info.partition());
+ assertEquals(5L, record.timestamp);
+ assertEquals(4, group.numBuffered());
+ assertEquals(2, group.numBuffered(partition1));
+ assertEquals(2, group.numBuffered(partition2));
+ assertEquals(3L, group.timestamp());
+
+ // get one more record, time should not be advanced
+ record = group.nextRecord(info);
+ assertEquals(partition1, info.partition());
+ assertEquals(2L, record.timestamp);
+ assertEquals(3, group.numBuffered());
+ assertEquals(1, group.numBuffered(partition1));
+ assertEquals(2, group.numBuffered(partition2));
+ assertEquals(4L, group.timestamp());
+
+ // get one more record, now time should be advanced
+ record = group.nextRecord(info);
+ assertEquals(partition2, info.partition());
+ assertEquals(4L, record.timestamp);
+ assertEquals(2, group.numBuffered());
+ assertEquals(1, group.numBuffered(partition1));
+ assertEquals(1, group.numBuffered(partition2));
+ assertEquals(4L, group.timestamp());
+
+ // get one more record, time should not be advanced
+ record = group.nextRecord(info);
+ assertEquals(partition1, info.partition());
+ assertEquals(4L, record.timestamp);
+ assertEquals(1, group.numBuffered());
+ assertEquals(0, group.numBuffered(partition1));
+ assertEquals(1, group.numBuffered(partition2));
+ assertEquals(4L, group.timestamp());
+
+ // get one more record, time should not be advanced
+ record = group.nextRecord(info);
+ assertEquals(partition2, info.partition());
+ assertEquals(6L, record.timestamp);
+ assertEquals(0, group.numBuffered());
+ assertEquals(0, group.numBuffered(partition1));
+ assertEquals(0, group.numBuffered(partition2));
+ assertEquals(4L, group.timestamp());
+
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index ef08176..1095fcf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -157,6 +157,28 @@
}
@Test
+ public void testDrivingMultiplexByNameTopology() {
+ driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology());
+ driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
+
+ driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
+
+ driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER);
+ driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER);
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)");
+ assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)");
+ }
+
+ @Test
public void testDrivingStatefulTopology() {
String storeName = "entries";
driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName);
@@ -215,6 +237,13 @@
.addSink("sink2", OUTPUT_TOPIC_2, "processor");
}
+ protected TopologyBuilder createMultiplexByNameTopology() {
+ return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
+ .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source")
+ .addSink("sink0", OUTPUT_TOPIC_1, "processor")
+ .addSink("sink1", OUTPUT_TOPIC_2, "processor");
+ }
+
protected TopologyBuilder createStatefulTopology(String storeName) {
return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC)
.addProcessor("processor", define(new StatefulProcessor(storeName)), "source")
@@ -268,6 +297,33 @@
}
/**
+ * A processor that forwards slightly-modified messages to each named child.
+ * Note: the children are assumed to be named "sink{child number}", e.g., sink1, or sink2, etc.
+ */
+ protected static class MultiplexByNameProcessor extends AbstractProcessor<String, String> {
+
+ private final int numChildren;
+
+ public MultiplexByNameProcessor(int numChildren) {
+ this.numChildren = numChildren;
+ }
+
+ @Override
+ public void process(String key, String value) {
+ for (int i = 0; i != numChildren; ++i) {
+ context().forward(key, value + "(" + (i + 1) + ")", "sink" + i);
+ }
+ }
+
+ @Override
+ public void punctuate(long streamTime) {
+ for (int i = 0; i != numChildren; ++i) {
+ context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i);
+ }
+ }
+ }
+
+ /**
* A processor that stores each key-value pair in an in-memory key-value store registered with the context. When
* {@link #punctuate(long)} is called, it outputs the total number of entries in the store.
*/
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 33fa5c4..6014c36 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -32,6 +32,7 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.Test;
@@ -46,6 +47,7 @@
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class StreamTaskTest {
@@ -58,10 +60,12 @@
private final TopicPartition partition2 = new TopicPartition("topic2", 1);
private final Set<TopicPartition> partitions = Utils.mkSet(partition1, partition2);
- private final MockSourceNode source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
- private final MockSourceNode source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+ private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(intDeserializer, intDeserializer);
+ private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(intDeserializer, intDeserializer);
+ private final MockProcessorNode<Integer, Integer> processor = new MockProcessorNode<>(10L);
+
private final ProcessorTopology topology = new ProcessorTopology(
- Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2),
+ Arrays.asList((ProcessorNode) source1, (ProcessorNode) source2, (ProcessorNode) processor),
new HashMap<String, SourceNode>() {
{
put("topic1", source1);
@@ -94,6 +98,8 @@
@Before
public void setup() {
consumer.assign(Arrays.asList(partition1, partition2));
+ source1.addChild(processor);
+ source2.addChild(processor);
}
@SuppressWarnings("unchecked")
@@ -211,6 +217,73 @@
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMaybePunctuate() throws Exception {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ StreamsConfig config = createConfig(baseDir);
+ StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+
+ task.addRecords(partition1, records(
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 30, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition1.topic(), partition1.partition(), 40, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ ));
+
+ task.addRecords(partition2, records(
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 25, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 35, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
+ new ConsumerRecord<>(partition2.topic(), partition2.partition(), 45, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)
+ ));
+
+ assertTrue(task.maybePunctuate());
+
+ assertEquals(5, task.process());
+ assertEquals(1, source1.numReceived);
+ assertEquals(0, source2.numReceived);
+
+ assertFalse(task.maybePunctuate());
+
+ assertEquals(4, task.process());
+ assertEquals(1, source1.numReceived);
+ assertEquals(1, source2.numReceived);
+
+ assertTrue(task.maybePunctuate());
+
+ assertEquals(3, task.process());
+ assertEquals(2, source1.numReceived);
+ assertEquals(1, source2.numReceived);
+
+ assertFalse(task.maybePunctuate());
+
+ assertEquals(2, task.process());
+ assertEquals(2, source1.numReceived);
+ assertEquals(2, source2.numReceived);
+
+ assertTrue(task.maybePunctuate());
+
+ assertEquals(1, task.process());
+ assertEquals(3, source1.numReceived);
+ assertEquals(2, source2.numReceived);
+
+ assertFalse(task.maybePunctuate());
+
+ assertEquals(0, task.process());
+ assertEquals(3, source1.numReceived);
+ assertEquals(3, source2.numReceived);
+
+ assertFalse(task.maybePunctuate());
+
+ processor.supplier.checkAndClearPunctuateResult(20L, 30L, 40L);
+
+ task.close();
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
return Arrays.asList(recs);
}
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 0c56c26..2ee8730 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -87,6 +87,21 @@
}
}
+ public void punctuate(long timestamp) {
+ setTime(timestamp);
+
+ for (ProcessorNode processor : topology.processors()) {
+ if (processor.processor() != null) {
+ currNode = processor;
+ try {
+ processor.processor().punctuate(timestamp);
+ } finally {
+ currNode = null;
+ }
+ }
+ }
+ }
+
public void setTime(long timestamp) {
context.setTime(timestamp);
}
@@ -120,6 +135,22 @@
}
}
+ @SuppressWarnings("unchecked")
+ public <K, V> void forward(K key, V value, String childName) {
+ ProcessorNode thisNode = currNode;
+ for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
+ if (childNode.name().equals(childName)) {
+ currNode = childNode;
+ try {
+ childNode.process(key, value);
+ } finally {
+ currNode = thisNode;
+ }
+ break;
+ }
+ }
+ }
+
public Map<String, StateStore> allStateStores() {
return context.allStateStores();
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index e57e1c7..2e2c221 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -143,7 +143,7 @@
@Override
public void schedule(long interval) {
- throw new UnsupportedOperationException("schedule() not supported");
+ throw new UnsupportedOperationException("schedule() not supported.");
}
@Override
@@ -159,23 +159,29 @@
}
@Override
+ @SuppressWarnings("unchecked")
+ public <K, V> void forward(K key, V value, String childName) {
+ driver.forward(key, value, childName);
+ }
+
+ @Override
public void commit() {
throw new UnsupportedOperationException("commit() not supported.");
}
@Override
public String topic() {
- throw new UnsupportedOperationException("topic() not supported.");
+ return null;
}
@Override
public int partition() {
- throw new UnsupportedOperationException("partition() not supported.");
+ return -1;
}
@Override
public long offset() {
- throw new UnsupportedOperationException("offset() not supported.");
+ return -1L;
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
new file mode 100644
index 0000000..cf8a526
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
+
+ public static final String NAME = "MOCK-PROCESS-";
+ public static final AtomicInteger INDEX = new AtomicInteger(1);
+
+ public int numReceived = 0;
+
+ public final MockProcessorSupplier<K, V> supplier;
+
+ public MockProcessorNode(long scheduleInterval) {
+ this(new MockProcessorSupplier<K, V>(scheduleInterval));
+ }
+
+ private MockProcessorNode(MockProcessorSupplier<K, V> supplier) {
+ super(NAME + INDEX.getAndIncrement(), supplier.get(), Collections.<String>emptySet());
+
+ this.supplier = supplier;
+ }
+
+ @Override
+ public void process(K key, V value) {
+ this.numReceived++;
+ processor().process(key, value);
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index b402525..9cf0eb2 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -17,6 +17,7 @@
package org.apache.kafka.test;
+import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -30,36 +31,48 @@
public final ArrayList<String> processed = new ArrayList<>();
public final ArrayList<Long> punctuated = new ArrayList<>();
+ private final long scheduleInterval;
+
+ public MockProcessorSupplier() {
+ this(-1L);
+ }
+
+ public MockProcessorSupplier(long scheduleInterval) {
+ this.scheduleInterval = scheduleInterval;
+ }
+
@Override
public Processor<K, V> get() {
return new MockProcessor();
}
- public class MockProcessor implements Processor<K, V> {
+ public class MockProcessor extends AbstractProcessor<K, V> {
@Override
public void init(ProcessorContext context) {
- // do nothing
+ super.init(context);
+ if (scheduleInterval > 0L)
+ context.schedule(scheduleInterval);
}
@Override
public void process(K key, V value) {
- processed.add(key + ":" + value);
+ processed.add((key == null ? "null" : key) + ":" +
+ (value == null ? "null" : value));
}
@Override
public void punctuate(long streamTime) {
+ assertEquals(streamTime, context().timestamp());
+ assertEquals(null, context().topic());
+ assertEquals(-1, context().partition());
+ assertEquals(-1L, context().offset());
+
punctuated.add(streamTime);
}
-
- @Override
- public void close() {
- // do nothing
- }
-
}
- public void checkAndClearResult(String... expected) {
+ public void checkAndClearProcessResult(String... expected) {
assertEquals("the number of outputs:", expected.length, processed.size());
for (int i = 0; i < expected.length; i++) {
@@ -69,4 +82,14 @@
processed.clear();
}
+ public void checkAndClearPunctuateResult(long... expected) {
+ assertEquals("the number of outputs:", expected.length, punctuated.size());
+
+ for (int i = 0; i < expected.length; i++) {
+ assertEquals("output[" + i + "]:", expected[i], (long) punctuated.get(i));
+ }
+
+ processed.clear();
+ }
+
}
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index df1a612..10163a0 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -23,4 +23,4 @@
# Instead, in trunk, the version should have a suffix of the form ".devN"
#
# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
-__version__ = '0.10.0.0.dev0'
+__version__ = '0.10.1.0.dev0'
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index f2ea421..414da84 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -52,12 +52,8 @@
num_nodes = 1
* is_int_with_prefix recommended if num_nodes > 1, because otherwise each producer
will produce exactly same messages, and validation may miss missing messages.
- :param compression_types: If None, all producers will not use compression; or a list of one or
- more compression types (including "none"). Each producer will pick a compression type
- from the list in round-robin fashion. Example: compression_types = ["none", "snappy"] and
- num_nodes = 3, then producer 1 and 2 will not use compression, and producer 3 will use
- compression type = snappy. If in this example, num_nodes is 1, then first (and only)
- producer will not use compression.
+ :param compression_types: If None, all producers will not use compression; or a list of
+ compression types, one per producer (could be "none").
"""
super(VerifiableProducer, self).__init__(context, num_nodes)
@@ -67,30 +63,39 @@
self.throughput = throughput
self.message_validator = message_validator
self.compression_types = compression_types
+ if self.compression_types is not None:
+ assert len(self.compression_types) == num_nodes, "Specify one compression type per node"
for node in self.nodes:
node.version = version
self.acked_values = []
self.not_acked_values = []
self.produced_count = {}
- self.prop_file = ""
+
+
+ @property
+ def security_config(self):
+ return self.kafka.security_config.client_config()
+
+ def prop_file(self, node):
+ idx = self.idx(node)
+ prop_file = str(self.security_config)
+ if self.compression_types is not None:
+ compression_index = idx - 1
+ self.logger.info("VerifiableProducer (index = %d) will use compression type = %s", idx,
+ self.compression_types[compression_index])
+ prop_file += "\ncompression.type=%s\n" % self.compression_types[compression_index]
+ return prop_file
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False)
# Create and upload log properties
- self.security_config = self.kafka.security_config.client_config(self.prop_file)
- producer_prop_file = str(self.security_config)
log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
# Create and upload config file
- if self.compression_types is not None:
- compression_index = (idx - 1) % len(self.compression_types)
- self.logger.info("VerifiableProducer (index = %d) will use compression type = %s", idx,
- self.compression_types[compression_index])
- producer_prop_file += "\ncompression.type=%s\n" % self.compression_types[compression_index]
-
+ producer_prop_file = self.prop_file(node)
self.logger.info("verifiable_producer.properties:")
self.logger.info(producer_prop_file)
node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)
@@ -197,7 +202,7 @@
def each_produced_at_least(self, count):
with self.lock:
- for idx in range(1, self.num_nodes):
+ for idx in range(1, self.num_nodes + 1):
if self.produced_count.get(idx) is None or self.produced_count[idx] < count:
return False
return True
diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py
index 084b19d..534f65c 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -66,6 +66,12 @@
self.await_all_members(consumer)
self.await_consumed_messages(consumer)
+ def setup_consumer(self, topic, **kwargs):
+ # collect verifiable consumer events since this makes debugging much easier
+ consumer = super(OffsetValidationTest, self).setup_consumer(topic, **kwargs)
+ self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
+ return consumer
+
def test_broker_rolling_bounce(self):
"""
Verify correct consumer behavior when the brokers are consecutively restarted.
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 25b87bd..1880d7a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -256,9 +256,16 @@
}
private static abstract class ConsumerEvent {
+ private final long timestamp = System.currentTimeMillis();
+
@JsonProperty
public abstract String name();
+ @JsonProperty
+ public long timestamp() {
+ return timestamp;
+ }
+
@JsonProperty("class")
public String clazz() {
return VerifiableConsumer.class.getName();