Added acknowledgement for pub/sub Streamer - Fixes #8.

Signed-off-by: samaitra <saikat.maitra@gmail.com>
diff --git a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
index 13384d2..3a4b689 100644
--- a/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
+++ b/modules/pub-sub/src/main/java/org/apache/ignite/stream/pubsub/PubSubStreamer.java
@@ -35,6 +35,7 @@
 import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
 import com.google.cloud.pubsub.v1.stub.SubscriberStub;
 import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.pubsub.v1.AcknowledgeRequest;
 import com.google.pubsub.v1.ProjectTopicName;
 import com.google.pubsub.v1.PubsubMessage;
 import com.google.pubsub.v1.PullRequest;
@@ -224,9 +225,18 @@
 
                     PullResponse pullResponse = subscriberStub.pullCallable().call(pullRequest);
 
+                    List<String> ackIds = new ArrayList<>();
                     for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
                         addMessage(message.getMessage());
+                        ackIds.add(message.getAckId());
                     }
+
+                    AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
+                                                                              .setSubscription(subscriptionName)
+                                                                              .addAllAckIds(ackIds)
+                                                                              .build();
+
+                    subscriberStub.acknowledgeCallable().call(acknowledgeRequest);
                 }
             } finally {
                 subscriberStub.close();
diff --git a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
index 20fe767..714961c 100644
--- a/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
+++ b/modules/pub-sub/src/test/java/org/apache/ignite/stream/pubsub/MockPubSubServer.java
@@ -24,15 +24,15 @@
 import com.google.api.gax.rpc.FixedTransportChannelProvider;
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import com.google.protobuf.Empty;
+import com.google.pubsub.v1.AcknowledgeRequest;
 import com.google.pubsub.v1.PubsubMessage;
 import com.google.pubsub.v1.PullRequest;
 import com.google.pubsub.v1.PullResponse;
 import com.google.pubsub.v1.ReceivedMessage;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
@@ -68,7 +68,6 @@
     public static final int MESSAGES_PER_REQUEST = 10;
 
     private final Map<String, Publisher> publishers = new HashMap<>();
-    private final List<PubsubMessage> topicMessages = new ArrayList<>();
     private final Queue<PubsubMessage> blockingQueue = new LinkedBlockingDeque<>();
 
     public SubscriberStubSettings createSubscriberStub() throws IOException {
@@ -87,10 +86,31 @@
     @NotNull
     private ManagedChannel managedChannel() {
         ManagedChannel managedChannel = Mockito.mock(ManagedChannel.class);
-        when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la) -> clientCall());
+        when(managedChannel.newCall(any(MethodDescriptor.class),any(CallOptions.class))).thenAnswer((la) -> {
+            MethodDescriptor methodDescriptor = (MethodDescriptor) la.getArguments()[0];
+            if(methodDescriptor.getFullMethodName().equals("google.pubsub.v1.Subscriber/Acknowledge")) {
+                return acknowledgeCall();
+            }
+
+            return clientCall();
+        });
         return managedChannel;
     }
 
+    private ClientCall<AcknowledgeRequest, Empty> acknowledgeCall() {
+        ClientCall<AcknowledgeRequest, Empty> clientCall = Mockito.mock(ClientCall.class);
+        doAnswer(iom -> {
+                     Object[] arguments = iom.getArguments();
+                     ClientCall.Listener<Empty> listener = (ClientCall.Listener<Empty>) arguments[0];
+                     listener.onMessage(Empty.getDefaultInstance());
+                     Metadata metadata = (Metadata) arguments[1];
+                     listener.onClose(Status.OK, metadata);
+                     return null;
+                 }
+                ).when(clientCall).start(any(ClientCall.Listener.class),any(Metadata.class));
+        return clientCall;
+    }
+
     private ClientCall<PullRequest, PullResponse> clientCall() {
         ClientCall<PullRequest, PullResponse> clientCall = Mockito.mock(ClientCall.class);