blob: f91408d08678b405df49024e38c1c05321e01220 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.inlong.manager.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupContext;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
import org.apache.shiro.util.Assert;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Test class for binlog to kafka.
*/
@Slf4j
@Disabled
class Binlog2KafkaExample extends BaseExample {
@Test
void testCreateGroupForKafka() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setWriteTimeout(10);
configuration.setReadTimeout(10);
configuration.setConnectTimeout(10);
configuration.setTimeUnit(TimeUnit.SECONDS);
configuration.setAuthentication(super.getInlongAuth());
InlongClient inlongClient = InlongClient.create(super.getServiceUrl(), configuration);
InlongGroupInfo groupInfo = super.createGroupInfo();
try {
InlongGroup group = inlongClient.forGroup(groupInfo);
InlongStreamBuilder streamBuilder = group.createStream(createStreamInfo());
streamBuilder.source(createMysqlSource());
streamBuilder.sink(createKafkaSink());
streamBuilder.initOrUpdate();
// start group
InlongGroupContext inlongGroupContext = group.init();
Assert.notNull(inlongGroupContext);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
void testStopGroup() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setWriteTimeout(10);
configuration.setReadTimeout(10);
configuration.setConnectTimeout(10);
configuration.setTimeUnit(TimeUnit.SECONDS);
configuration.setAuthentication(super.getInlongAuth());
InlongClient inlongClient = InlongClient.create(super.getServiceUrl(), configuration);
InlongGroupInfo groupInfo = createGroupInfo();
try {
InlongGroup group = inlongClient.forGroup(groupInfo);
InlongGroupContext groupContext = group.delete(true);
Assert.notNull(groupContext);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
void testSuspendGroup() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setWriteTimeout(10);
configuration.setReadTimeout(10);
configuration.setConnectTimeout(10);
configuration.setTimeUnit(TimeUnit.SECONDS);
configuration.setAuthentication(super.getInlongAuth());
InlongClient inlongClient = InlongClient.create(super.getServiceUrl(), configuration);
InlongGroupInfo groupInfo = createGroupInfo();
try {
InlongGroup group = inlongClient.forGroup(groupInfo);
InlongGroupContext groupContext = group.suspend(true);
Assert.notNull(groupContext);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
void testRestartGroup() {
ClientConfiguration configuration = new ClientConfiguration();
configuration.setWriteTimeout(10);
configuration.setReadTimeout(10);
configuration.setConnectTimeout(10);
configuration.setTimeUnit(TimeUnit.SECONDS);
configuration.setAuthentication(super.getInlongAuth());
InlongClient inlongClient = InlongClient.create(super.getServiceUrl(), configuration);
InlongGroupInfo groupInfo = createGroupInfo();
try {
InlongGroup group = inlongClient.forGroup(groupInfo);
InlongGroupContext groupContext = group.restart(true);
Assert.notNull(groupContext);
} catch (Exception e) {
e.printStackTrace();
}
}
private MySQLBinlogSource createMysqlSource() {
MySQLBinlogSource binlogSource = new MySQLBinlogSource();
binlogSource.setDatabaseWhiteList("{db.name}");
binlogSource.setHostname("{db.url}");
binlogSource.setUser("{user}");
binlogSource.setPassword("{password}");
binlogSource.setSourceName("{mysql.source.name}");
binlogSource.setAllMigration(true);
return binlogSource;
}
private KafkaSink createKafkaSink() {
KafkaSink kafkaSink = new KafkaSink();
kafkaSink.setBootstrapServers("{kafka.bootstrap}");
kafkaSink.setTopicName("{kafka.topic}");
kafkaSink.setEnableCreateResource(0);
kafkaSink.setSinkName("{kafka.sink.name}");
Map<String, Object> properties = new HashMap<>();
// Not needed if kafka cluster is not set
properties.put("transaction.timeout.ms", 9000000);
kafkaSink.setProperties(properties);
return kafkaSink;
}
}