blob: 66cdb60c01becea3a2ed2be05979dd85c4f2e32e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;
import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.ClientException;
import org.apache.bookkeeper.clients.exceptions.InternalServerException;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.common.util.Backoff.Jitter;
import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.StorageType;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
/**
* A function container implemented using java thread.
*/
@Slf4j
public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final InstanceConfig instanceConfig;
private final FunctionCacheManager fnCache;
private final String jarFile;
// input topic consumer & output topic producer
private final PulsarClientImpl client;
private LogAppender logAppender;
// provide tables for storing states
private final String stateStorageServiceUrl;
@Getter(AccessLevel.PACKAGE)
private StorageClient storageClient;
@Getter(AccessLevel.PACKAGE)
private Table<ByteBuf, ByteBuf> stateTable;
private JavaInstance javaInstance;
@Getter
private Throwable deathException;
// function stats
@Getter
private ComponentStatsManager stats;
private Record<?> currentRecord;
private Source source;
private Sink sink;
private final SecretsProvider secretsProvider;
private CollectorRegistry collectorRegistry;
private final String[] metricsLabels;
private InstanceCache instanceCache;
private final org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType componentType;
private final Map<String, String> properties;
private final ClassLoader instanceClassLoader;
private ClassLoader functionClassLoader;
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
PulsarClient pulsarClient,
String stateStorageServiceUrl,
SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry) {
this.instanceConfig = instanceConfig;
this.fnCache = fnCache;
this.jarFile = jarFile;
this.client = (PulsarClientImpl) pulsarClient;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.secretsProvider = secretsProvider;
this.collectorRegistry = collectorRegistry;
this.metricsLabels = new String[]{
instanceConfig.getFunctionDetails().getTenant(),
String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace()),
instanceConfig.getFunctionDetails().getName(),
String.valueOf(instanceConfig.getInstanceId()),
instanceConfig.getClusterName(),
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())
};
this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
this.properties = InstanceUtils.getProperties(this.componentType,
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
this.instanceConfig.getInstanceId());
// Declare function local collector registry so that it will not clash with other function instances'
// metrics collection especially in threaded mode
// In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down
this.collectorRegistry = collectorRegistry;
this.instanceClassLoader = Thread.currentThread().getContextClassLoader();
}
/**
* NOTE: this method should be called in the instance thread, in order to make class loading work.
*/
JavaInstance setupJavaInstance() throws Exception {
// initialize the thread context
ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
ThreadContext.put("instance", instanceConfig.getInstanceName());
log.info("Starting Java Instance {} : \n Details = {}",
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());
// start the function thread
functionClassLoader = loadJars();
Object object;
if (instanceConfig.getFunctionDetails().getClassName().equals(org.apache.pulsar.functions.windowing.WindowFunctionExecutor.class.getName())) {
object = Reflections.createInstance(
instanceConfig.getFunctionDetails().getClassName(),
instanceClassLoader);
} else {
object = Reflections.createInstance(
instanceConfig.getFunctionDetails().getClassName(),
functionClassLoader);
}
if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) {
throw new RuntimeException("User class must either be Function or java.util.Function");
}
// start the state table
setupStateTable();
ContextImpl contextImpl = setupContext();
// start the output producer
setupOutput(contextImpl);
// start the input consumer
setupInput(contextImpl);
// start any log topic handler
setupLogHandler();
return new JavaInstance(contextImpl, object, instanceConfig);
}
ContextImpl setupContext() {
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats, stateTable);
}
/**
* The core logic that initialize the instance thread and executes the function.
*/
@Override
public void run() {
try {
this.instanceCache = InstanceCache.getInstanceCache();
if (this.collectorRegistry == null) {
this.collectorRegistry = new CollectorRegistry();
}
this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
this.instanceCache.getScheduledExecutorService(),
this.componentType);
javaInstance = setupJavaInstance();
while (true) {
currentRecord = readInput();
// increment number of records received from source
stats.incrTotalReceived();
if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
.proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
if (instanceConfig.getFunctionDetails().getAutoAck()) {
currentRecord.ack();
}
}
addLogTopicHandler();
CompletableFuture<JavaExecutionResult> result;
// set last invocation time
stats.setLastInvocation(System.currentTimeMillis());
// start time for process latency stat
stats.processTimeStart();
// process the message
Thread.currentThread().setContextClassLoader(functionClassLoader);
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
Thread.currentThread().setContextClassLoader(instanceClassLoader);
// register end time
stats.processTimeEnd();
removeLogTopicHandler();
try {
processResult(currentRecord, result);
} catch (Exception e) {
log.warn("Failed to process result of message {}", currentRecord, e);
currentRecord.fail();
}
}
} catch (Throwable t) {
log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(),
instanceConfig.getInstanceId()), t);
deathException = t;
if (stats != null) {
stats.incrSysExceptions(t);
}
} finally {
log.info("Closing instance");
close();
}
}
private ClassLoader loadJars() throws Exception {
ClassLoader fnClassLoader;
try {
log.info("Load JAR: {}", jarFile);
// Let's first try to treat it as a nar archive
fnCache.registerFunctionInstanceWithArchive(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName(),
jarFile);
} catch (FileNotFoundException e) {
// create the function class loader
fnCache.registerFunctionInstance(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName(),
Arrays.asList(jarFile),
Collections.emptyList());
}
log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId()));
fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
if (null == fnClassLoader) {
throw new Exception("No function class loader available.");
}
return fnClassLoader;
}
private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
try (StorageAdminClient storageAdminClient = StorageClientBuilder.newBuilder()
.withSettings(settings)
.buildAdmin()) {
StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF)
.setInitialNumRanges(4)
.setMinNumRanges(4)
.setStorageType(StorageType.TABLE)
.build();
Stopwatch elapsedWatch = Stopwatch.createStarted();
while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) {
try {
result(storageAdminClient.getStream(tableNs, tableName));
return;
} catch (NamespaceNotFoundException nnfe) {
try {
result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(streamConf)
.build()));
result(storageAdminClient.createStream(tableNs, tableName, streamConf));
} catch (Exception e) {
// there might be two clients conflicting at creating table, so let's retrieve the table again
// to make sure the table is created.
}
} catch (StreamNotFoundException snfe) {
try {
result(storageAdminClient.createStream(tableNs, tableName, streamConf));
} catch (Exception e) {
// there might be two client conflicting at creating table, so let's retrieve it to make
// sure the table is created.
}
} catch (ClientException ce) {
log.warn("Encountered issue {} on fetching state stable metadata, re-attempting in 100 milliseconds",
ce.getMessage());
TimeUnit.MILLISECONDS.sleep(100);
}
}
}
}
private void setupStateTable() throws Exception {
if (null == stateStorageServiceUrl) {
return;
}
String tableNs = FunctionCommon.getStateNamespace(
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace()
);
String tableName = instanceConfig.getFunctionDetails().getName();
StorageClientSettings settings = StorageClientSettings.newBuilder()
.serviceUri(stateStorageServiceUrl)
.clientName("function-" + tableNs + "/" + tableName)
// configure a maximum 2 minutes jitter backoff for accessing table service
.backoffPolicy(Jitter.of(
Type.EXPONENTIAL,
100,
2000,
60
))
.build();
// we defer creation of the state table until a java instance is running here.
createStateTable(tableNs, tableName, settings);
log.info("Starting state table for function {}", instanceConfig.getFunctionDetails().getName());
this.storageClient = StorageClientBuilder.newBuilder()
.withSettings(settings)
.withNamespace(tableNs)
.build();
// NOTE: this is a workaround until we bump bk version to 4.9.0
// table might just be created above, so it might not be ready for serving traffic
Stopwatch openSw = Stopwatch.createStarted();
while (openSw.elapsed(TimeUnit.MINUTES) < 1) {
try {
this.stateTable = result(storageClient.openTable(tableName));
break;
} catch (InternalServerException ise) {
log.warn("Encountered internal server on opening table '{}', re-attempt in 100 milliseconds : {}",
tableName, ise.getMessage());
TimeUnit.MILLISECONDS.sleep(100);
}
}
}
private void processResult(Record srcRecord,
CompletableFuture<JavaExecutionResult> result) throws Exception {
result.whenComplete((result1, throwable) -> {
if (throwable != null || result1.getUserException() != null) {
Throwable t = throwable != null ? throwable : result1.getUserException();
log.warn("Encountered exception when processing message {}",
srcRecord, t);
stats.incrUserExceptions(t);
srcRecord.fail();
} else {
if (result1.getResult() != null) {
sendOutputMessage(srcRecord, result1.getResult());
} else {
if (instanceConfig.getFunctionDetails().getAutoAck()) {
// the function doesn't produce any result or the user doesn't want the result.
srcRecord.ack();
}
}
// increment total successfully processed
stats.incrTotalProcessedSuccessfully();
}
});
}
private void sendOutputMessage(Record srcRecord, Object output) {
if (!(this.sink instanceof PulsarSink)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
this.sink.write(new SinkRecord<>(srcRecord, output));
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
throw new RuntimeException(e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
}
private Record readInput() {
Record record;
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
record = this.source.read();
} catch (Exception e) {
stats.incrSourceExceptions(e);
log.info("Encountered exception in source read: ", e);
throw new RuntimeException(e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
// check record is valid
if (record == null) {
throw new IllegalArgumentException("The record returned by the source cannot be null");
} else if (record.getValue() == null) {
throw new IllegalArgumentException("The value in the record returned by the source cannot be null");
}
return record;
}
/**
* NOTE: this method is be syncrhonized because it is potentially called by two different places
* one inside the run/finally clause and one inside the ThreadRuntime::stop
*/
@Override
synchronized public void close() {
if (stats != null) {
stats.close();
stats = null;
}
if (source != null) {
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
source.close();
} catch (Throwable e) {
log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
source = null;
}
if (sink != null) {
if (!(this.sink instanceof PulsarSink)) {
Thread.currentThread().setContextClassLoader(functionClassLoader);
}
try {
sink.close();
} catch (Throwable e) {
log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
} finally {
Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
sink = null;
}
if (null != javaInstance) {
javaInstance.close();
javaInstance = null;
}
// kill the state table
if (null != stateTable) {
stateTable.close();
stateTable = null;
}
if (null != storageClient) {
storageClient.closeAsync()
.exceptionally(cause -> {
log.warn("Failed to close state storage client", cause);
return null;
});
storageClient = null;
}
if (instanceCache != null) {
// once the thread quits, clean up the instance
fnCache.unregisterFunctionInstance(
instanceConfig.getFunctionId(),
instanceConfig.getInstanceName());
log.info("Unloading JAR files for function {}", instanceConfig);
instanceCache = null;
}
}
public InstanceCommunication.MetricsData getAndResetMetrics() {
InstanceCommunication.MetricsData metricsData = getMetrics();
stats.reset();
return metricsData;
}
public InstanceCommunication.MetricsData getMetrics() {
InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder();
if (javaInstance != null) {
Map<String, Double> userMetrics = javaInstance.getMetrics();
if (userMetrics != null) {
bldr.putAllUserMetrics(userMetrics);
}
}
return bldr.build();
}
public void resetMetrics() {
stats.reset();
javaInstance.resetMetrics();
}
private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
if (stats != null) {
bldr.setProcessedSuccessfullyTotal((long) stats.getTotalProcessedSuccessfully());
bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions());
bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions());
bldr.setReceivedTotal((long) stats.getTotalRecordsReceived());
bldr.setAvgProcessLatency(stats.getAvgProcessLatency());
bldr.setLastInvocation((long) stats.getLastInvocation());
bldr.setProcessedSuccessfullyTotal1Min((long) stats.getTotalProcessedSuccessfully1min());
bldr.setSystemExceptionsTotal1Min((long) stats.getTotalSysExceptions1min());
bldr.setUserExceptionsTotal1Min((long) stats.getTotalUserExceptions1min());
bldr.setReceivedTotal1Min((long) stats.getTotalRecordsReceived1min());
bldr.setAvgProcessLatency1Min(stats.getAvgProcessLatency1min());
}
return bldr;
}
public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
if (stats != null) {
functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived());
functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
stats.getLatestUserExceptions().forEach(ex -> {
functionStatusBuilder.addLatestUserExceptions(ex);
});
functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions());
stats.getLatestSystemExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSystemExceptions(ex);
});
stats.getLatestSourceExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSourceExceptions(ex);
});
stats.getLatestSinkExceptions().forEach(ex -> {
functionStatusBuilder.addLatestSinkExceptions(ex);
});
functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency());
functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation());
}
return functionStatusBuilder;
}
private void setupLogHandler() {
if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
!instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
logAppender = new LogAppender(client, instanceConfig.getFunctionDetails().getLogTopic(),
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
logAppender.start();
}
}
private void addLogTopicHandler() {
if (logAppender == null) return;
LoggerContext context = LoggerContext.getContext(false);
Configuration config = context.getConfiguration();
config.addAppender(logAppender);
for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
loggerConfig.addAppender(logAppender, null, null);
}
config.getRootLogger().addAppender(logAppender, null, null);
}
private void removeLogTopicHandler() {
if (logAppender == null) return;
LoggerContext context = LoggerContext.getContext(false);
Configuration config = context.getConfiguration();
for (final LoggerConfig loggerConfig : config.getLoggers().values()) {
loggerConfig.removeAppender(logAppender.getName());
}
config.getRootLogger().removeAppender(logAppender.getName());
}
public void setupInput(ContextImpl contextImpl) throws Exception {
SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource();
Object object;
// If source classname is not set, we default pulsar source
if (sourceSpec.getClassName().isEmpty()) {
PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
sourceSpec.getInputSpecsMap().forEach((topic, conf) -> {
ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(conf.getIsRegexPattern()).build();
if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) {
consumerConfig.setSchemaType(conf.getSchemaType());
} else if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
consumerConfig.setSerdeClassName(conf.getSerdeClassName());
}
if (conf.hasReceiverQueueSize()) {
consumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize().getValue());
}
pulsarSourceConfig.getTopicSchema().put(topic, consumerConfig);
});
sourceSpec.getTopicsToSerDeClassNameMap().forEach((topic, serde) -> {
pulsarSourceConfig.getTopicSchema().put(topic,
ConsumerConfig.builder()
.serdeClassName(serde)
.isRegexPattern(false)
.build());
});
if (!StringUtils.isEmpty(sourceSpec.getTopicsPattern())) {
pulsarSourceConfig.getTopicSchema().get(sourceSpec.getTopicsPattern()).setRegexPattern(true);
}
pulsarSourceConfig.setSubscriptionName(
StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName()
: InstanceUtils.getDefaultSubscriptionName(instanceConfig.getFunctionDetails()));
pulsarSourceConfig.setProcessingGuarantees(
FunctionConfig.ProcessingGuarantees.valueOf(
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
switch (sourceSpec.getSubscriptionPosition()) {
case EARLIEST:
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
break;
default:
pulsarSourceConfig.setSubscriptionPosition(SubscriptionInitialPosition.Latest);
break;
}
switch (sourceSpec.getSubscriptionType()) {
case FAILOVER:
pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
break;
default:
pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared);
break;
}
pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName());
if (sourceSpec.getTimeoutMs() > 0 ) {
pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
}
if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) {
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
}
object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
this.functionClassLoader);
}
Class<?>[] typeArgs;
if (object instanceof Source) {
typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass());
assert typeArgs.length > 0;
} else {
throw new RuntimeException("Source does not implement correct interface");
}
this.source = (Source<?>) object;
if (!(this.source instanceof PulsarSource)) {
Thread.currentThread().setContextClassLoader(this.functionClassLoader);
}
try {
if (sourceSpec.getConfigs().isEmpty()) {
this.source.open(new HashMap<>(), contextImpl);
} else {
this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
new TypeToken<Map<String, Object>>() {
}.getType()), contextImpl);
}
} catch (Exception e) {
log.error("Source open produced uncaught exception: ", e);
throw e;
} finally {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
public void setupOutput(ContextImpl contextImpl) throws Exception {
SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink();
Object object;
// If sink classname is not set, we default pulsar sink
if (sinkSpec.getClassName().isEmpty()) {
if (StringUtils.isEmpty(sinkSpec.getTopic())) {
object = PulsarSinkDisable.INSTANCE;
} else {
PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
pulsarSinkConfig.setTopic(sinkSpec.getTopic());
pulsarSinkConfig.setForwardSourceMessageProperty(
this.instanceConfig.getFunctionDetails().getSink().getForwardSourceMessageProperty());
if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {
pulsarSinkConfig.setSchemaType(sinkSpec.getSchemaType());
} else if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
pulsarSinkConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
}
pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
}
} else {
object = Reflections.createInstance(
sinkSpec.getClassName(),
this.functionClassLoader);
}
if (object instanceof Sink) {
this.sink = (Sink) object;
} else {
throw new RuntimeException("Sink does not implement correct interface");
}
if (!(this.sink instanceof PulsarSink)) {
Thread.currentThread().setContextClassLoader(this.functionClassLoader);
}
try {
if (sinkSpec.getConfigs().isEmpty()) {
this.sink.open(new HashMap<>(), contextImpl);
} else {
this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
new TypeToken<Map<String, Object>>() {
}.getType()), contextImpl);
}
} catch (Exception e) {
log.error("Sink open produced uncaught exception: ", e);
throw e;
} finally {
Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
}