[Issue 6024][pulsar_storm] PulsarSpout emit to multiple streams (#6039)

Fixes #6024

### Motivation
This is all described in detail in https://github.com/apache/pulsar/issues/6024, but in short, an insurmountable obstacle to using Pulsar in our storm topology is the fact that `PulsarSpout` only emits to the "default" stream. In our environment, we need to emit on different streams based on the content of each received message. This change extends `PulsarSpout` to recognize a `Values` extension that specifies an alternate output stream, and uses that stream when given.

### Modifications
A new `PulsarTuple` class is added. It extends `Values` and adds a method to return the output stream.

When emitting a tuple after calling `toValues(msg)`, `PulsarSpout` checks if the returned `Values` is a `PulsarTuple`. If so, it emits to the designated stream, otherwise it emits as before.
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
index 4291fa8..92e127c 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
@@ -33,12 +33,12 @@
      * @param msg
      * @return
      */
-    public Values toValues(Message<byte[]> msg);
+    Values toValues(Message<byte[]> msg);
 
     /**
      * Declare the output schema for the spout.
      *
      * @param declarer
      */
-    public void declareOutputFields(OutputFieldsDeclarer declarer);
+    void declareOutputFields(OutputFieldsDeclarer declarer);
 }
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 773cb2b..63887a4 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -326,7 +326,11 @@
                 }
                 ack(msg);
             } else {
-                collector.emit(values, msg);
+                if (values instanceof PulsarTuple) {
+                    collector.emit(((PulsarTuple) values).getOutputStream(), values, msg);
+                } else {
+                    collector.emit(values, msg);
+                }
                 ++messagesEmitted;
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId());
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
new file mode 100644
index 0000000..b000827
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+
+import org.apache.storm.tuple.Values;
+
+/**
+ * Returned by MessageToValuesMapper, this specifies the Values
+ * for an output tuple and the stream it should be sent to.
+ */
+public class PulsarTuple extends Values {
+
+    protected final String outputStream;
+
+    public PulsarTuple(String outStream, Object ... values) {
+        super(values);
+        outputStream = outStream;
+    }
+
+    /**
+     * Return stream the tuple should be emitted on.
+     *
+     * @return String
+     */
+    public String getOutputStream() {
+        return outputStream;
+    }
+}
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index f8a1aa0..ba67051 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -22,10 +22,13 @@
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
 import java.lang.reflect.Field;
@@ -47,6 +50,7 @@
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Values;
+import org.mockito.ArgumentCaptor;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Maps;
@@ -93,7 +97,16 @@
     }
 
     @Test
+    public void testPulsarTuple() throws Exception {
+        testPulsarSpout(true);
+    }
+
+    @Test
     public void testPulsarSpout() throws Exception {
+        testPulsarSpout(false);
+    }
+
+    public void testPulsarSpout(boolean pulsarTuple) throws Exception {
         PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
         conf.setServiceUrl("http://localhost:8080");
         conf.setSubscriptionName("sub1");
@@ -105,7 +118,15 @@
             @Override
             public Values toValues(Message<byte[]> msg) {
                 called.set(true);
-                return new Values("test");
+                if ("message to be dropped".equals(new String(msg.getData()))) {
+                    return null;
+                }
+                String val = new String(msg.getData());
+                if (val.startsWith("stream:")) {
+                    String stream = val.split(":")[1];
+                    return new PulsarTuple(stream, val);
+                }
+                return new Values(val);
             }
 
             @Override
@@ -114,6 +135,8 @@
 
         });
 
+        String msgContent = pulsarTuple ? "stream:pstream" : "test";
+
         ClientBuilder builder = spy(new ClientBuilderImpl());
         PulsarSpout spout = spy(new PulsarSpout(conf, builder));
         TopologyContext context = mock(TopologyContext.class);
@@ -131,7 +154,7 @@
         when(client.getSharedConsumer(any())).thenReturn(consumer);
         instances.put(componentId, client);
 
-        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), "test".getBytes(), Schema.BYTES);
+        Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), msgContent.getBytes(), Schema.BYTES);
         when(consumer.receive(anyInt(), any())).thenReturn(msg);
 
         spout.open(config, context, collector);
@@ -139,6 +162,14 @@
 
         assertTrue(called.get());
         verify(consumer, atLeast(1)).receive(anyInt(), any());
+        ArgumentCaptor<Values> capt = ArgumentCaptor.forClass(Values.class);
+        if (pulsarTuple) {
+            verify(collector, times(1)).emit(eq("pstream"), capt.capture(), eq(msg));
+        } else {
+            verify(collector, times(1)).emit(capt.capture(), eq(msg));
+        }
+        Values vals = capt.getValue();
+        assertEquals(msgContent, vals.get(0));
     }
 
 }