blob: d0c481569a7c3675fc10f9c7c320669b4982a950 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
import static org.apache.seatunnel.common.PropertiesUtil.getEnum;
import static org.apache.seatunnel.common.PropertiesUtil.setOption;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_RESET_MODE;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_MODE;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode.LATEST;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StopMode.NEVER;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicListDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import com.google.auto.service.AutoService;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.regex.Pattern;
@AutoService(SeaTunnelSource.class)
public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit, PulsarSplitEnumeratorState> {
public static final String IDENTIFIER = "pulsar";
private DeserializationSchema<T> deserialization;
private PulsarAdminConfig adminConfig;
private PulsarClientConfig clientConfig;
private PulsarConsumerConfig consumerConfig;
private PulsarDiscoverer partitionDiscoverer;
private long partitionDiscoveryIntervalMs;
private StartCursor startCursor;
private StopCursor stopCursor;
protected int pollTimeout;
protected long pollInterval;
protected int batchSize;
@Override
public String getPluginName() {
return IDENTIFIER;
}
@SuppressWarnings("checkstyle:MagicNumber")
@Override
public void prepare(Config config) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(config, SUBSCRIPTION_NAME, CLIENT_SERVICE_URL, ADMIN_SERVICE_URL);
if (!result.isSuccess()) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
// admin config
PulsarAdminConfig.Builder adminConfigBuilder = PulsarAdminConfig.builder()
.adminUrl(config.getString(ADMIN_SERVICE_URL));
setOption(config, AUTH_PLUGIN_CLASS, config::getString, adminConfigBuilder::authPluginClassName);
setOption(config, AUTH_PARAMS, config::getString, adminConfigBuilder::authParams);
this.adminConfig = adminConfigBuilder.build();
// client config
PulsarClientConfig.Builder clientConfigBuilder = PulsarClientConfig.builder()
.serviceUrl(config.getString(CLIENT_SERVICE_URL));
setOption(config, AUTH_PLUGIN_CLASS, config::getString, clientConfigBuilder::authPluginClassName);
setOption(config, AUTH_PARAMS, config::getString, clientConfigBuilder::authParams);
this.clientConfig = clientConfigBuilder.build();
// consumer config
PulsarConsumerConfig.Builder consumerConfigBuilder = PulsarConsumerConfig.builder()
.subscriptionName(config.getString(SUBSCRIPTION_NAME));
this.consumerConfig = consumerConfigBuilder.build();
// source properties
setOption(config,
TOPIC_DISCOVERY_INTERVAL,
-1L,
config::getLong,
v -> this.partitionDiscoveryIntervalMs = v);
setOption(config,
POLL_TIMEOUT,
100,
config::getInt,
v -> this.pollTimeout = v);
setOption(config,
POLL_INTERVAL,
50L,
config::getLong,
v -> this.pollInterval = v);
setOption(config,
POLL_BATCH_SIZE,
500,
config::getInt,
v -> this.batchSize = v);
setStartCursor(config);
setStopCursor(config);
setPartitionDiscoverer(config);
setDeserialization(config);
if ((partitionDiscoverer instanceof TopicPatternDiscoverer)
&& partitionDiscoveryIntervalMs > 0
&& Boundedness.BOUNDED == stopCursor.getBoundedness()) {
throw new IllegalArgumentException("Bounded streams do not support dynamic partition discovery.");
}
}
private void setStartCursor(Config config) {
StartMode startMode = getEnum(config, CURSOR_STARTUP_MODE, StartMode.class, LATEST);
switch (startMode) {
case EARLIEST:
this.startCursor = StartCursor.earliest();
break;
case LATEST:
this.startCursor = StartCursor.latest();
break;
case SUBSCRIPTION:
SubscriptionStartCursor.CursorResetStrategy resetStrategy = getEnum(config,
CURSOR_RESET_MODE,
SubscriptionStartCursor.CursorResetStrategy.class,
SubscriptionStartCursor.CursorResetStrategy.LATEST);
this.startCursor = StartCursor.subscription(resetStrategy);
break;
case TIMESTAMP:
if (StringUtils.isBlank(config.getString(CURSOR_STARTUP_TIMESTAMP))) {
throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STARTUP_TIMESTAMP, CURSOR_STARTUP_MODE));
}
setOption(config, CURSOR_STARTUP_TIMESTAMP, config::getLong, timestamp -> this.startCursor = StartCursor.timestamp(timestamp));
break;
default:
throw new IllegalArgumentException(String.format("The %s mode is not supported.", startMode));
}
}
private void setStopCursor(Config config) {
SourceProperties.StopMode stopMode = getEnum(config, CURSOR_STOP_MODE, SourceProperties.StopMode.class, NEVER);
switch (stopMode) {
case LATEST:
this.stopCursor = StopCursor.latest();
break;
case NEVER:
this.stopCursor = StopCursor.never();
break;
case TIMESTAMP:
if (StringUtils.isBlank(config.getString(CURSOR_STOP_TIMESTAMP))) {
throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STOP_TIMESTAMP, CURSOR_STOP_MODE));
}
setOption(config, CURSOR_STARTUP_TIMESTAMP, config::getLong, timestamp -> this.stopCursor = StopCursor.timestamp(timestamp));
break;
default:
throw new IllegalArgumentException(String.format("The %s mode is not supported.", stopMode));
}
}
private void setPartitionDiscoverer(Config config) {
String topic = config.getString(TOPIC);
if (StringUtils.isNotBlank(topic)) {
this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ",")));
}
String topicPattern = config.getString(TOPIC_PATTERN);
if (StringUtils.isNotBlank(topicPattern)) {
if (this.partitionDiscoverer != null) {
throw new IllegalArgumentException(String.format("The properties '%s' and '%s' is exclusive.", TOPIC, TOPIC_PATTERN));
}
this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(topicPattern));
}
if (this.partitionDiscoverer == null) {
throw new IllegalArgumentException(String.format("The properties '%s' or '%s' is required.", TOPIC, TOPIC_PATTERN));
}
}
private void setDeserialization(Config config) {
String format = config.getString("format");
// TODO: format SPI
SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA)).getSeaTunnelRowType();
deserialization = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
}
@Override
public Boundedness getBoundedness() {
return this.stopCursor instanceof NeverStopCursor ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
}
@Override
public SeaTunnelDataType<T> getProducedType() {
return deserialization.getProducedType();
}
@Override
public SourceReader<T, PulsarPartitionSplit> createReader(SourceReader.Context readerContext) throws Exception {
return new PulsarSourceReader<>(readerContext,
clientConfig,
consumerConfig,
startCursor,
deserialization,
pollTimeout,
pollInterval,
batchSize);
}
@Override
public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> createEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> enumeratorContext) throws Exception {
return new PulsarSplitEnumerator(
enumeratorContext,
adminConfig,
partitionDiscoverer,
partitionDiscoveryIntervalMs,
startCursor,
stopCursor,
consumerConfig.getSubscriptionName());
}
@Override
public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> restoreEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> enumeratorContext, PulsarSplitEnumeratorState checkpointState) throws Exception {
return new PulsarSplitEnumerator(
enumeratorContext,
adminConfig,
partitionDiscoverer,
partitionDiscoveryIntervalMs,
startCursor,
stopCursor,
consumerConfig.getSubscriptionName(),
checkpointState.assignedPartitions());
}
}