[issues-21] support SQL92 filter mod.
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 8b338e3..b3c81e5 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -139,7 +139,7 @@
         String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG);
         String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
         Validate.isTrue(
-                StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql),
+                !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
                 "Consumer tag and sql can not set value at the same time");
 
         this.enableCheckpoint =
diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
index a0b4e29..801814a 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.rocketmq.flink.source;
 
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
 import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumState;
 import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumStateSerializer;
 import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator;
@@ -46,6 +47,9 @@
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.UserCodeClassLoader;
 
+import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.function.Supplier;
 
 /** The Source implementation of RocketMQ. */
@@ -81,10 +85,13 @@
             long partitionDiscoveryIntervalMs,
             Boundedness boundedness,
             RocketMQDeserializationSchema<OUT> deserializationSchema) {
+        Validate.isTrue(
+                !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
+                "Consumer tag and sql can not set value at the same time");
         this.topic = topic;
         this.consumerGroup = consumerGroup;
         this.nameServerAddress = nameServerAddress;
-        this.tag = tag;
+        this.tag = StringUtils.isEmpty(tag) ? RocketMQConfig.DEFAULT_CONSUMER_TAG : tag;
         this.sql = sql;
         this.stopInMs = stopInMs;
         this.startTime = startTime;
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index 531c64c..f7af900 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -164,18 +164,18 @@
                         recordsBySplits.prepareForRead();
                         return recordsBySplits;
                     }
-                    if (StringUtils.isNotEmpty(tag)) {
+                    if (StringUtils.isNotEmpty(sql)) {
                         pullResult =
                                 consumer.pull(
                                         messageQueue,
-                                        tag,
+                                        MessageSelector.bySql(sql),
                                         messageOffset,
                                         MAX_MESSAGE_NUMBER_PER_BLOCK);
                     } else {
                         pullResult =
                                 consumer.pull(
                                         messageQueue,
-                                        MessageSelector.bySql(sql),
+                                        tag,
                                         messageOffset,
                                         MAX_MESSAGE_NUMBER_PER_BLOCK);
                     }