Avoid to create map object for Flume source (#4369)

diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
index 3e707ed..680bd7d 100644
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/AbstractSink.java
@@ -26,7 +26,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -41,18 +40,18 @@
 
     public abstract T extractValue(Record<T> record);
 
-    protected static BlockingQueue<Map<String, Object>> records;
+    protected static BlockingQueue<Object> records;
 
     protected FlumeConnector flumeConnector;
 
-    public static BlockingQueue<Map<String, Object>> getQueue() {
+    public static BlockingQueue<Object> getQueue() {
         return records;
     }
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
 
-        records = new LinkedBlockingQueue<Map<String, Object>>();
+        records = new LinkedBlockingQueue<>();
 
         FlumeConfig flumeConfig = FlumeConfig.load(config);
 
@@ -64,9 +63,7 @@
     public void write(Record<T> record) {
         try {
             T message = extractValue(record);
-            Map<String, Object> m = new HashMap();
-            m.put("body", message);
-            records.put(m);
+            records.put(message);
             record.ack();
         } catch (InterruptedException e) {
             record.fail();
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
index a1be6e7..51bc59e 100644
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/sink/SourceOfFlume.java
@@ -18,25 +18,19 @@
  */
 package org.apache.pulsar.io.flume.sink;
 
-import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.conf.BatchSizeSupported;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.AbstractPollableSource;
-import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
-
-import com.google.common.base.Optional;
-
 import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.BATCH_SIZE;
 
 
@@ -55,9 +49,6 @@
 
     private final List<Event> eventList = new ArrayList<Event>();
 
-    private Optional<SpecificDatumReader<AvroFlumeEvent>> reader = Optional.absent();
-
-
     @Override
     public synchronized void doStart() {
         log.info("start source of flume ...");
@@ -87,10 +78,10 @@
 
             while (eventList.size() < this.getBatchSize() &&
                     System.currentTimeMillis() < maxBatchEndTime) {
-                BlockingQueue<Map<String, Object>> blockingQueue = StringSink.getQueue();
+                BlockingQueue<Object> blockingQueue = StringSink.getQueue();
                 while (blockingQueue != null && !blockingQueue.isEmpty()) {
-                    Map<String, Object> message = blockingQueue.take();
-                    eventBody = message.get("body").toString();
+                    Object message = blockingQueue.take();
+                    eventBody = message.toString();
                     event = EventBuilder.withBody(eventBody.getBytes());
                     eventList.add(event);
                 }
@@ -104,7 +95,7 @@
             return Status.BACKOFF;
 
         } catch (Exception e) {
-            log.error("Flume Source EXCEPTION, {}", e);
+            log.error("Flume Source EXCEPTION", e);
             counter.incrementEventReadOrChannelFail(e);
             return Status.BACKOFF;
         }