Added acknowledgement for pub/sub Streamer - Fixes #8.
Signed-off-by: samaitra <saikat.maitra@gmail.com>
diff --git a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
index 13384d2..3a4b689 100644
--- a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
+++ b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
@@ -35,6 +35,7 @@
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
@@ -224,9 +225,18 @@
PullResponse pullResponse = subscriberStub.pullCallable().call(pullRequest);
+ List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
addMessage(message.getMessage());
+ ackIds.add(message.getAckId());
}
+
+ AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
+ .setSubscription(subscriptionName)
+ .addAllAckIds(ackIds)
+ .build();
+
+ subscriberStub.acknowledgeCallable().call(acknowledgeRequest);
}
} finally {
subscriberStub.close();
diff --git a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
index 20fe767..714961c 100644
--- a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
+++ b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
@@ -24,15 +24,15 @@
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.protobuf.Empty;
+import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
@@ -68,7 +68,6 @@
public static final int MESSAGES_PER_REQUEST = 10;
private final Map<String, Publisher> publishers = new HashMap<>();
- private final List<PubsubMessage> topicMessages = new ArrayList<>();
private final Queue<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
public SubscriberStubSettings createSubscriberStub() throws IOException {
@@ -87,10 +86,31 @@
@NotNull
private ManagedChannel managedChannel() {
ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class);
- when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la) -> clientCall());
+ when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la) -> {
+ MethodDescriptor methodDescriptor = (MethodDescriptor) la.getArguments()[0];
+ if(methodDescriptor.getFullMethodName().equals("google.pubsub.v1.Subscriber/Acknowledge")) {
+ return acknowledgeCall();
+ }
+
+ return clientCall();
+ });
return managedChannel;
}
+ private ClientCall<AcknowledgeRequest, Empty> acknowledgeCall() {
+ ClientCall<AcknowledgeRequest, Empty> clientCall = Mockito.mock(ClientCall.class);
+ doAnswer(iom -> {
+ Object[] arguments = iom.getArguments();
+ ClientCall.Listener<Empty> listener = (ClientCall.Listener<Empty>) arguments[0];
+ listener.onMessage(Empty.getDefaultInstance());
+ Metadata metadata = (Metadata) arguments[1];
+ listener.onClose(Status.OK, metadata);
+ return null;
+ }
+ ).when(clientCall).start(any(ClientCall.Listener.class),any(Metadata.class));
+ return clientCall;
+ }
+
private ClientCall<PullRequest, PullResponse> clientCall() {
ClientCall<PullRequest, PullResponse> clientCall = Mockito.mock(ClientCall.class);