[Issue 6024][pulsar_storm] PulsarSpout emit to multiple streams (#6039)
Fixes #6024
### Motivation
This is all described in detail in https://github.com/apache/pulsar/issues/6024, but in short, an insurmountable obstacle to using Pulsar in our storm topology is the fact that `PulsarSpout` only emits to the "default" stream. In our environment, we need to emit on different streams based on the content of each received message. This change extends `PulsarSpout` to recognize a `Values` extension that specifies an alternate output stream, and uses that stream when given.
### Modifications
A new `PulsarTuple` class is added. It extends `Values` and adds a method to return the output stream.
When emitting a tuple after calling `toValues(msg)`, `PulsarSpout` checks if the returned `Values` is a `PulsarTuple`. If so, it emits to the designated stream, otherwise it emits as before.
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
index 4291fa8..92e127c 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
@@ -33,12 +33,12 @@
* @param msg
* @return
*/
- public Values toValues(Message<byte[]> msg);
+ Values toValues(Message<byte[]> msg);
/**
* Declare the output schema for the spout.
*
* @param declarer
*/
- public void declareOutputFields(OutputFieldsDeclarer declarer);
+ void declareOutputFields(OutputFieldsDeclarer declarer);
}
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 773cb2b..63887a4 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -326,7 +326,11 @@
}
ack(msg);
} else {
- collector.emit(values, msg);
+ if (values instanceof PulsarTuple) {
+ collector.emit(((PulsarTuple) values).getOutputStream(), values, msg);
+ } else {
+ collector.emit(values, msg);
+ }
++messagesEmitted;
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId());
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
new file mode 100644
index 0000000..b000827
--- /dev/null
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarTuple.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.storm;
+
+
+import org.apache.storm.tuple.Values;
+
+/**
+ * Returned by MessageToValuesMapper, this specifies the Values
+ * for an output tuple and the stream it should be sent to.
+ */
+public class PulsarTuple extends Values {
+
+ protected final String outputStream;
+
+ public PulsarTuple(String outStream, Object ... values) {
+ super(values);
+ outputStream = outStream;
+ }
+
+ /**
+ * Return stream the tuple should be emitted on.
+ *
+ * @return String
+ */
+ public String getOutputStream() {
+ return outputStream;
+ }
+}
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
index f8a1aa0..ba67051 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java
@@ -22,10 +22,13 @@
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
@@ -47,6 +50,7 @@
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Values;
+import org.mockito.ArgumentCaptor;
import org.testng.annotations.Test;
import com.google.common.collect.Maps;
@@ -93,7 +97,16 @@
}
@Test
+ public void testPulsarTuple() throws Exception {
+ testPulsarSpout(true);
+ }
+
+ @Test
public void testPulsarSpout() throws Exception {
+ testPulsarSpout(false);
+ }
+
+ public void testPulsarSpout(boolean pulsarTuple) throws Exception {
PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setSubscriptionName("sub1");
@@ -105,7 +118,15 @@
@Override
public Values toValues(Message<byte[]> msg) {
called.set(true);
- return new Values("test");
+ if ("message to be dropped".equals(new String(msg.getData()))) {
+ return null;
+ }
+ String val = new String(msg.getData());
+ if (val.startsWith("stream:")) {
+ String stream = val.split(":")[1];
+ return new PulsarTuple(stream, val);
+ }
+ return new Values(val);
}
@Override
@@ -114,6 +135,8 @@
});
+ String msgContent = pulsarTuple ? "stream:pstream" : "test";
+
ClientBuilder builder = spy(new ClientBuilderImpl());
PulsarSpout spout = spy(new PulsarSpout(conf, builder));
TopologyContext context = mock(TopologyContext.class);
@@ -131,7 +154,7 @@
when(client.getSharedConsumer(any())).thenReturn(consumer);
instances.put(componentId, client);
- Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), "test".getBytes(), Schema.BYTES);
+ Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), msgContent.getBytes(), Schema.BYTES);
when(consumer.receive(anyInt(), any())).thenReturn(msg);
spout.open(config, context, collector);
@@ -139,6 +162,14 @@
assertTrue(called.get());
verify(consumer, atLeast(1)).receive(anyInt(), any());
+ ArgumentCaptor<Values> capt = ArgumentCaptor.forClass(Values.class);
+ if (pulsarTuple) {
+ verify(collector, times(1)).emit(eq("pstream"), capt.capture(), eq(msg));
+ } else {
+ verify(collector, times(1)).emit(capt.capture(), eq(msg));
+ }
+ Values vals = capt.getValue();
+ assertEquals(msgContent, vals.get(0));
}
}