[issues-21] support SQL92 filter mod.
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 8b338e3..b3c81e5 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -139,7 +139,7 @@
String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG);
String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
Validate.isTrue(
- StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql),
+ !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
"Consumer tag and sql can not set value at the same time");
this.enableCheckpoint =
diff --git a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
index a0b4e29..801814a 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/RocketMQSource.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.flink.source;
+import org.apache.rocketmq.flink.legacy.RocketMQConfig;
import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumState;
import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumStateSerializer;
import org.apache.rocketmq.flink.source.enumerator.RocketMQSourceEnumerator;
@@ -46,6 +47,9 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.function.Supplier;
/** The Source implementation of RocketMQ. */
@@ -81,10 +85,13 @@
long partitionDiscoveryIntervalMs,
Boundedness boundedness,
RocketMQDeserializationSchema<OUT> deserializationSchema) {
+ Validate.isTrue(
+ !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
+ "Consumer tag and sql can not set value at the same time");
this.topic = topic;
this.consumerGroup = consumerGroup;
this.nameServerAddress = nameServerAddress;
- this.tag = tag;
+ this.tag = StringUtils.isEmpty(tag) ? RocketMQConfig.DEFAULT_CONSUMER_TAG : tag;
this.sql = sql;
this.stopInMs = stopInMs;
this.startTime = startTime;
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index 531c64c..f7af900 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -164,18 +164,18 @@
recordsBySplits.prepareForRead();
return recordsBySplits;
}
- if (StringUtils.isNotEmpty(tag)) {
+ if (StringUtils.isNotEmpty(sql)) {
pullResult =
consumer.pull(
messageQueue,
- tag,
+ MessageSelector.bySql(sql),
messageOffset,
MAX_MESSAGE_NUMBER_PER_BLOCK);
} else {
pullResult =
consumer.pull(
messageQueue,
- MessageSelector.bySql(sql),
+ tag,
messageOffset,
MAX_MESSAGE_NUMBER_PER_BLOCK);
}