blob: 01d7c77683f1c3c86657804b3b1baab6423562de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.e2e.connector.rocketmq;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.apache.seatunnel.e2e.connector.rocketmq.RocketMqContainer.NAMESRV_PORT;
@Slf4j
public class RocketMqIT extends TestSuiteBase implements TestResource {
private static final String IMAGE = "apache/rocketmq:4.9.4";
private static final String ROCKETMQ_GROUP = "SeaTunnel-rocketmq-group";
private static final String HOST = "rocketmq-e2e";
private static final SchemaFormat DEFAULT_FORMAT = SchemaFormat.JSON;
private static final String DEFAULT_FIELD_DELIMITER = ",";
private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
new SeaTunnelRowType(
new String[] {
"id",
"c_map",
"c_array",
"c_string",
"c_boolean",
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_bytes",
"c_date",
"c_timestamp"
},
new SeaTunnelDataType[] {
BasicType.LONG_TYPE,
new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
ArrayType.BYTE_ARRAY_TYPE,
BasicType.STRING_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
new DecimalType(2, 1),
PrimitiveByteArrayType.INSTANCE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE
});
private RocketMqContainer rocketMqContainer;
private DefaultMQProducer producer;
@BeforeAll
@Override
public void startUp() throws Exception {
this.rocketMqContainer =
new RocketMqContainer(DockerImageName.parse(IMAGE))
.withNetwork(NETWORK)
.withNetworkAliases(HOST)
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
.waitingFor(
new HostPortWaitStrategy()
.withStartupTimeout(Duration.ofMinutes(2)));
rocketMqContainer.setPortBindings(
Lists.newArrayList(String.format("%s:%s", NAMESRV_PORT, NAMESRV_PORT)));
rocketMqContainer.start();
log.info("RocketMq container started");
initProducer();
log.info("Write 100 records to topic test_topic_source");
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_source",
SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), "test_topic_source", 0, 100);
}
@SneakyThrows
private void initProducer() {
this.producer = new DefaultMQProducer();
this.producer.setNamesrvAddr(rocketMqContainer.getNameSrvAddr());
this.producer.setInstanceName(UUID.randomUUID().toString());
this.producer.setProducerGroup(ROCKETMQ_GROUP);
this.producer.setLanguage(LanguageCode.JAVA);
this.producer.setSendMsgTimeout(15000);
this.producer.start();
}
@AfterAll
@Override
public void tearDown() throws Exception {
if (this.producer != null) {
this.producer.shutdown();
}
if (this.rocketMqContainer != null) {
this.rocketMqContainer.close();
}
}
@TestTemplate
public void testSinkRocketMq(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rocketmq-sink_fake_to_rocketmq.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
String topicName = "test_topic";
Map<String, String> data = getRocketMqConsumerData(topicName);
ObjectMapper objectMapper = new ObjectMapper();
String key = data.keySet().iterator().next();
ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
Assertions.assertTrue(objectNode.has("c_map"));
Assertions.assertTrue(objectNode.has("c_string"));
Assertions.assertEquals(10, data.size());
}
@TestTemplate
public void testTextFormatSinkRocketMq(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rocketmq-text-sink_fake_to_rocketmq.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
String topicName = "test_text_topic";
Map<String, String> data = getRocketMqConsumerData(topicName);
Assertions.assertEquals(10, data.size());
}
@TestTemplate
public void testSourceRocketMqTextToConsole(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_text",
SEATUNNEL_ROW_TYPE,
SchemaFormat.TEXT,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), "test_topic_text", 0, 100);
Container.ExecResult execResult =
container.executeJob("/rocketmq-source_text_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@TestTemplate
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "flink and spark won't commit offset when batch job finished")
public void testSourceRocketMqTextToConsoleWithOffsetCheck(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_text_offset_check",
SEATUNNEL_ROW_TYPE,
SchemaFormat.TEXT,
DEFAULT_FIELD_DELIMITER);
generateTestData(
row -> serializer.serializeRow(row), "test_topic_text_offset_check", 0, 10);
Container.ExecResult execResult =
container.executeJob("/rocketmq-source_tex_with_offset_check.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
checkOffsetNoDiff("test_topic_text_offset_check", "SeaTunnel-Consumer-Group");
}
@TestTemplate
public void testSourceRocketMqJsonToConsole(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_json",
SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), "test_topic_json", 0, 100);
Container.ExecResult execResult =
container.executeJob("/rocketmq-source_json_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@TestTemplate
public void testRocketMqLatestToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rocketmq/rocketmq_source_latest_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@TestTemplate
public void testRocketMqEarliestToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rocketmq/rocketmq_source_earliest_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@TestTemplate
public void testRocketMqSpecificOffsetsToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rocketmq/rocketmq_source_specific_offsets_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@TestTemplate
public void testRocketMqTimestampToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rocketmq/rocketmq_source_timestamp_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@TestTemplate
public void testSourceRocketMqStartConfig(TestContainer container)
throws IOException, InterruptedException {
DefaultSeaTunnelRowSerializer serializer =
new DefaultSeaTunnelRowSerializer(
"test_topic_group",
SEATUNNEL_ROW_TYPE,
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), "test_topic_group", 100, 150);
testRocketMqGroupOffsetsToConsole(container);
}
public void testRocketMqGroupOffsetsToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/rocketmq/rocketmq_source_group_offset_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
}
@SneakyThrows
private void generateTestData(
ProducerRecordConverter converter, String topic, int start, int end) {
for (int i = start; i < end; i++) {
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
Long.valueOf(i),
Collections.singletonMap("key", Short.parseShort("1")),
new Byte[] {Byte.parseByte("1")},
"string",
Boolean.FALSE,
Byte.parseByte("1"),
Short.parseShort("1"),
Integer.parseInt("1"),
Long.parseLong("1"),
Float.parseFloat("1.1"),
Double.parseDouble("1.1"),
BigDecimal.valueOf(11, 1),
"test".getBytes(),
LocalDate.now(),
LocalDateTime.now()
});
Message message = converter.convert(row);
producer.send(message, new MessageQueue(topic, RocketMqContainer.BROKER_NAME, 0));
}
}
private Map<String, String> getRocketMqConsumerData(String topicName) {
Map<String, String> data = new HashMap<>();
try {
DefaultLitePullConsumer consumer =
RocketMqAdminUtil.initDefaultLitePullConsumer(newConfiguration(), false);
consumer.start();
// assign
Map<MessageQueue, TopicOffset> queueOffsets =
RetryUtils.retryWithException(
() -> {
return RocketMqAdminUtil.offsetTopics(
newConfiguration(), Lists.newArrayList(topicName))
.get(0);
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
false,
exception -> exception instanceof RocketMqConnectorException,
Constant.OPERATION_RETRY_SLEEP));
consumer.assign(queueOffsets.keySet());
// seek to offset
Map<MessageQueue, Long> currentOffsets =
RocketMqAdminUtil.currentOffsets(
newConfiguration(),
Lists.newArrayList(topicName),
queueOffsets.keySet());
for (MessageQueue mq : queueOffsets.keySet()) {
long currentOffset =
currentOffsets.containsKey(mq)
? currentOffsets.get(mq)
: queueOffsets.get(mq).getMinOffset();
consumer.seek(mq, currentOffset);
}
while (true) {
List<MessageExt> messages = consumer.poll(5000);
if (messages.isEmpty()) {
break;
}
for (MessageExt message : messages) {
data.put(
message.getKeys(),
new String(message.getBody(), StandardCharsets.UTF_8));
consumer.getOffsetStore()
.updateConsumeOffsetToBroker(
new MessageQueue(
message.getTopic(),
message.getBrokerName(),
message.getQueueId()),
message.getQueueOffset(),
false);
}
consumer.commitSync();
}
if (consumer != null) {
consumer.shutdown();
}
log.info("Consumer {} data total {}", topicName, data.size());
// consumer.commitSync() only submits the offset to the broker, and NameServer scans the
// broker to update the offset every 10 seconds
Thread.sleep(20 * 1000);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
return data;
}
private void checkOffsetNoDiff(String topicName, String consumerGroup) {
RocketMqBaseConfiguration config = newConfiguration();
config.setGroupId(consumerGroup);
List<Map<MessageQueue, TopicOffset>> offsetTopics =
RocketMqAdminUtil.offsetTopics(config, Arrays.asList(topicName));
Map<MessageQueue, TopicOffset> offsetMap = offsetTopics.get(0);
Set<MessageQueue> messageQueues = offsetMap.keySet();
Map<MessageQueue, Long> currentOffsets =
RocketMqAdminUtil.currentOffsets(config, Arrays.asList(topicName), messageQueues);
for (Map.Entry<MessageQueue, TopicOffset> offsetEntry : offsetMap.entrySet()) {
MessageQueue messageQueue = offsetEntry.getKey();
long maxOffset = offsetEntry.getValue().getMaxOffset();
Long consumeOffset = currentOffsets.get(messageQueue);
Assertions.assertEquals(
maxOffset,
consumeOffset,
"Offset different,maxOffset=" + maxOffset + ",consumeOffset=" + consumeOffset);
}
}
public RocketMqBaseConfiguration newConfiguration() {
return RocketMqBaseConfiguration.newBuilder()
.groupId(ROCKETMQ_GROUP)
.aclEnable(false)
.namesrvAddr(rocketMqContainer.getNameSrvAddr())
.batchSize(10)
.build();
}
interface ProducerRecordConverter {
Message convert(SeaTunnelRow row);
}
}