Avoid to create map object for Flume source (#4369)
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
index 3e707ed..680bd7d 100644
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
@@ -26,7 +26,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -41,18 +40,18 @@
public abstract T extractValue(Record<T> record);
- protected static BlockingQueue<Map<String, Object>> records;
+ protected static BlockingQueue<Object> records;
protected FlumeConnector flumeConnector;
- public static BlockingQueue<Map<String, Object>> getQueue() {
+ public static BlockingQueue<Object> getQueue() {
return records;
}
@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
- records = new LinkedBlockingQueue<Map<String, Object>>();
+ records = new LinkedBlockingQueue<>();
FlumeConfig flumeConfig = FlumeConfig.load(config);
@@ -64,9 +63,7 @@
public void write(Record<T> record) {
try {
T message = extractValue(record);
- Map<String, Object> m = new HashMap();
- m.put("body", message);
- records.put(m);
+ records.put(message);
record.ack();
} catch (InterruptedException e) {
record.fail();
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
index a1be6e7..51bc59e 100644
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
@@ -18,25 +18,19 @@
*/
package org.apache.pulsar.io.flume.sink;
-import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractPollableSource;
-import org.apache.flume.source.avro.AvroFlumeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
-
-import com.google.common.base.Optional;
-
import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.BATCH_SIZE;
@@ -55,9 +49,6 @@
private final List<Event> eventList = new ArrayList<Event>();
- private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = Optional.absent();
-
-
@Override
public synchronized void doStart() {
log.info("start source of flume ...");
@@ -87,10 +78,10 @@
while (eventList.size() < this.getBatchSize() &&
System.currentTimeMillis() < maxBatchEndTime) {
- BlockingQueue<Map<String, Object>> blockingQueue = StringSink.getQueue();
+ BlockingQueue<Object> blockingQueue = StringSink.getQueue();
while (blockingQueue != null && !blockingQueue.isEmpty()) {
- Map<String, Object> message = blockingQueue.take();
- eventBody = message.get("body").toString();
+ Object message = blockingQueue.take();
+ eventBody = message.toString();
event = EventBuilder.withBody(eventBody.getBytes());
eventList.add(event);
}
@@ -104,7 +95,7 @@
return Status.BACKOFF;
} catch (Exception e) {
- log.error("Flume Source EXCEPTION, {}", e);
+ log.error("Flume Source EXCEPTION", e);
counter.incrementEventReadOrChannelFail(e);
return Status.BACKOFF;
}