[BAHIR-177] Fixes state recovery/size of the recovered queue

Two issues are meant to be fixed in this PR:

- As described in BAHIR-177 currently the state recovery of
Bahir operators depends on randomly generated IDs, which
basically makes it impossible to recover state properly.
The chagne has been done, so that the outStreamId is
used instead of random names.

-The size of the queue recovered in restoreQueuerState()
was equal to the actual size (number of elements) of the
snapshot queue. If the queue was empty, the method would
try to create queue with the size 0, which is currently
forbidden for the PriorityQueue in Java.

Closes #51
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
index 43d7436..ca61a0a 100644
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
@@ -236,7 +236,7 @@
             TypeInformation<T> typeInformation =
                 SiddhiTypeFactory.getTupleTypeInformation(siddhiContext.getFinalExecutionPlan(), outStreamId);
             siddhiContext.setOutputStreamType(typeInformation);
-            return returnsInternal(siddhiContext);
+            return returnsInternal(siddhiContext, outStreamId);
         }
 
         /**
@@ -269,11 +269,11 @@
             siddhiContext.setOutputStreamType(typeInformation);
             siddhiContext.setExtensions(environment.getExtensions());
             siddhiContext.setExecutionConfig(environment.getExecutionEnvironment().getConfig());
-            return returnsInternal(siddhiContext);
+            return returnsInternal(siddhiContext, outStreamId);
         }
 
-        private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext) {
-            return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream);
+        private <T> DataStream<T> returnsInternal(SiddhiOperatorContext siddhiContext, String outStreamId) {
+            return SiddhiStreamFactory.createDataStream(siddhiContext, this.dataStream, outStreamId);
         }
     }
 }
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
index 8cb6d67..79df6ac 100755
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -20,8 +20,10 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 
@@ -51,7 +53,11 @@
 import org.wso2.siddhi.core.SiddhiAppRuntime;
 import org.wso2.siddhi.core.SiddhiManager;
 import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.SiddhiApp;
+import org.wso2.siddhi.query.api.annotation.Annotation;
+import org.wso2.siddhi.query.api.annotation.Element;
 import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.compiler.SiddhiCompiler;
 
 /**
  * <h1>Siddhi Runtime Operator</h1>
@@ -86,10 +92,10 @@
 public abstract class AbstractSiddhiOperator<IN, OUT> extends AbstractStreamOperator<OUT>
     implements OneInputStreamOperator<IN, OUT> {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class);
-    private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+    protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
     private static final String SIDDHI_RUNTIME_STATE_NAME = "siddhiRuntimeState";
     private static final String QUEUED_RECORDS_STATE_NAME = "queuedRecordsState";
-
+    protected final String operatorName;
     private final SiddhiOperatorContext siddhiPlan;
     private final String executionExpression;
     private final boolean isProcessingTime;
@@ -108,13 +114,13 @@
     /**
      * @param siddhiPlan Siddhi CEP  Execution Plan
      */
-    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) {
+    public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan, String operatorName) {
         validate(siddhiPlan);
         this.executionExpression = siddhiPlan.getFinalExecutionPlan();
         this.siddhiPlan = siddhiPlan;
         this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
         this.streamRecordSerializers = new HashMap<>();
-
+        this.operatorName = operatorName;
         registerStreamRecordSerializers();
     }
 
@@ -228,7 +234,15 @@
             for (Map.Entry<String, Class<?>> entry : this.siddhiPlan.getExtensions().entrySet()) {
                 this.siddhiManager.setExtension(entry.getKey(), entry.getValue());
             }
-            this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(executionExpression);
+
+            SiddhiApp siddhiApp = SiddhiCompiler.parse(executionExpression);
+            Annotation nameAnnotation = new Annotation("Name");
+            Element element = new Element(null, operatorName);
+            List<Element> elements = new ArrayList<>();
+            elements.add(element);
+            nameAnnotation.setElements(elements);
+            siddhiApp.getAnnotations().add(nameAnnotation);
+            this.siddhiRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
             this.siddhiRuntime.start();
             registerInputAndOutput(this.siddhiRuntime);
             LOGGER.info("Siddhi {} started", siddhiRuntime.getName());
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
index 5c54ad8..0ce719c 100755
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
@@ -36,8 +36,8 @@
  */
 public class SiddhiStreamOperator<IN, OUT> extends AbstractSiddhiOperator<Tuple2<String, IN>, OUT> {
 
-    public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan) {
-        super(siddhiPlan);
+    public SiddhiStreamOperator(SiddhiOperatorContext siddhiPlan, String operatorName) {
+        super(siddhiPlan, operatorName);
     }
 
     @Override
@@ -68,9 +68,10 @@
 
     @Override
     protected PriorityQueue<StreamRecord<Tuple2<String, IN>>> restoreQueuerState(DataInputView dataInputView) throws IOException {
-        int sizeOfQueue = dataInputView.readInt();
+        int snapshotSize = dataInputView.readInt();
+        int sizeOfQueue = snapshotSize > 0 ? snapshotSize : this.INITIAL_PRIORITY_QUEUE_CAPACITY;
         PriorityQueue<StreamRecord<Tuple2<String, IN>>> priorityQueue = new PriorityQueue<>(sizeOfQueue);
-        for (int i = 0; i < sizeOfQueue; i++) {
+        for (int i = 0; i < snapshotSize; i++) {
             String streamId = dataInputView.readUTF();
             StreamElement streamElement = getStreamRecordSerializer(streamId).deserialize(dataInputView);
             priorityQueue.offer(streamElement.<Tuple2<String, IN>>asRecord());
diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
index 20ca535..d11f029 100644
--- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
+++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
@@ -27,7 +27,7 @@
  */
 public class SiddhiStreamFactory {
     @SuppressWarnings("unchecked")
-    public static <OUT> DataStream<OUT> createDataStream(SiddhiOperatorContext context, DataStream<Tuple2<String, Object>> namedStream) {
-        return namedStream.transform(context.getName(), context.getOutputStreamType(), new SiddhiStreamOperator(context));
+    public static <OUT> DataStream<OUT> createDataStream(SiddhiOperatorContext context, DataStream<Tuple2<String, Object>> namedStream, String outStreamId) {
+        return namedStream.transform(context.getName(), context.getOutputStreamType(), new SiddhiStreamOperator(context,outStreamId));
     }
 }