blob: 538a57273f7e50fa91bb3b77a5ed03555aba6081 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.dlog.DLInputStream;
import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
import org.apache.zookeeper.KeeperException.Code;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@Slf4j
public final class WorkerUtils {
private WorkerUtils(){}
public static void uploadFileToBookkeeper(String packagePath, File sourceFile, Namespace dlogNamespace) throws IOException {
FileInputStream uploadedInputStream = new FileInputStream(sourceFile);
uploadToBookeeper(dlogNamespace, uploadedInputStream, packagePath);
}
public static void uploadToBookeeper(Namespace dlogNamespace,
InputStream uploadedInputStream,
String destPkgPath)
throws IOException {
// if the dest directory does not exist, create it.
if (dlogNamespace.logExists(destPkgPath)) {
// if the destination file exists, write a log message
log.info("Target function file already exists at '{}'. Overwriting it now", destPkgPath);
dlogNamespace.deleteLog(destPkgPath);
}
// copy the topology package to target working directory
log.info("Uploading function package to '{}'", destPkgPath);
try (DistributedLogManager dlm = dlogNamespace.openLog(destPkgPath)) {
try (AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter()){
try (OutputStream out = new DLOutputStream(dlm, writer)) {
int read = 0;
byte[] bytes = new byte[1024];
while ((read = uploadedInputStream.read(bytes)) != -1) {
out.write(bytes, 0, read);
}
out.flush();
}
}
}
}
public static void downloadFromBookkeeper(Namespace namespace,
File outputFile,
String packagePath) throws IOException {
downloadFromBookkeeper(namespace, new FileOutputStream(outputFile), packagePath);
}
public static void downloadFromBookkeeper(Namespace namespace,
OutputStream outputStream,
String packagePath) throws IOException {
log.info("Downloading {} from BK...", packagePath);
DistributedLogManager dlm = namespace.openLog(packagePath);
try (InputStream in = new DLInputStream(dlm)) {
int read = 0;
byte[] bytes = new byte[1024];
while ((read = in.read(bytes)) != -1) {
outputStream.write(bytes, 0, read);
}
outputStream.flush();
}
}
public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) {
int numReplicas = workerConfig.getNumFunctionPackageReplicas();
DistributedLogConfiguration conf = new DistributedLogConfiguration()
.setWriteLockEnabled(false)
.setOutputBufferSize(256 * 1024) // 256k
.setPeriodicFlushFrequencyMilliSeconds(0) // disable periodical flush
.setImmediateFlushEnabled(false) // disable immediate flush
.setLogSegmentRollingIntervalMinutes(0) // disable time-based rolling
.setMaxLogSegmentBytes(Long.MAX_VALUE) // disable size-based rolling
.setExplicitTruncationByApplication(true) // no auto-truncation
.setRetentionPeriodHours(Integer.MAX_VALUE) // long retention
.setEnsembleSize(numReplicas) // replica settings
.setWriteQuorumSize(numReplicas)
.setAckQuorumSize(numReplicas)
.setUseDaemonThread(true);
conf.setProperty("bkc.allowShadedLedgerManagerFactoryClass", true);
conf.setProperty("bkc.shadedLedgerManagerFactoryClassPrefix", "dlshade.");
if (isNotBlank(workerConfig.getBookkeeperClientAuthenticationPlugin())) {
conf.setProperty("bkc.clientAuthProviderFactoryClass",
workerConfig.getBookkeeperClientAuthenticationPlugin());
if (isNotBlank(workerConfig.getBookkeeperClientAuthenticationParametersName())) {
conf.setProperty("bkc." + workerConfig.getBookkeeperClientAuthenticationParametersName(),
workerConfig.getBookkeeperClientAuthenticationParameters());
}
}
return conf;
}
public static URI initializeDlogNamespace(String zkServers, String ledgersRootPath) throws IOException {
BKDLConfig dlConfig = new BKDLConfig(zkServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(dlConfig);
URI dlogUri = URI.create(String.format("distributedlog://%s/pulsar/functions", zkServers));
try {
dlMetadata.create(dlogUri);
} catch (ZKException e) {
if (e.getKeeperExceptionCode() == Code.NODEEXISTS) {
return dlogUri;
}
throw e;
}
return dlogUri;
}
public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl) {
return getPulsarAdminClient(pulsarWebServiceUrl, null, null, null, null, null);
}
public static PulsarAdmin getPulsarAdminClient(String pulsarWebServiceUrl, String authPlugin, String authParams,
String tlsTrustCertsFilePath, Boolean allowTlsInsecureConnection,
Boolean enableTlsHostnameVerificationEnable) {
try {
PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().serviceHttpUrl(pulsarWebServiceUrl);
if (isNotBlank(authPlugin) && isNotBlank(authParams)) {
adminBuilder.authentication(authPlugin, authParams);
}
if (isNotBlank(tlsTrustCertsFilePath)) {
adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
if (allowTlsInsecureConnection != null) {
adminBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
}
if (enableTlsHostnameVerificationEnable != null) {
adminBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
}
return adminBuilder.build();
} catch (PulsarClientException e) {
log.error("Error creating pulsar admin client", e);
throw new RuntimeException(e);
}
}
public static PulsarClient getPulsarClient(String pulsarServiceUrl) {
return getPulsarClient(pulsarServiceUrl, null, null, null,
null, null, null);
}
public static PulsarClient getPulsarClient(String pulsarServiceUrl, String authPlugin, String authParams,
Boolean useTls, String tlsTrustCertsFilePath,
Boolean allowTlsInsecureConnection,
Boolean enableTlsHostnameVerificationEnable) {
try {
ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
if (isNotBlank(authPlugin)
&& isNotBlank(authParams)) {
clientBuilder.authentication(authPlugin, authParams);
}
if (useTls != null) {
clientBuilder.enableTls(useTls);
}
if (allowTlsInsecureConnection != null) {
clientBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
}
if (isNotBlank(tlsTrustCertsFilePath)) {
clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
if (enableTlsHostnameVerificationEnable != null) {
clientBuilder.enableTlsHostnameVerification(enableTlsHostnameVerificationEnable);
}
return clientBuilder.build();
} catch (PulsarClientException e) {
log.error("Error creating pulsar client", e);
throw new RuntimeException(e);
}
}
public static FunctionStats.FunctionInstanceStats getFunctionInstanceStats(String fullyQualifiedInstanceName,
FunctionRuntimeInfo functionRuntimeInfo,
int instanceId) {
RuntimeSpawner functionRuntimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
FunctionStats.FunctionInstanceStats functionInstanceStats = new FunctionStats.FunctionInstanceStats();
if (functionRuntimeSpawner != null) {
Runtime functionRuntime = functionRuntimeSpawner.getRuntime();
if (functionRuntime != null) {
try {
InstanceCommunication.MetricsData metricsData = functionRuntime.getMetrics(instanceId).get();
functionInstanceStats.setInstanceId(instanceId);
FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData functionInstanceStatsData
= new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
functionInstanceStatsData.setReceivedTotal(metricsData.getReceivedTotal());
functionInstanceStatsData.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal());
functionInstanceStatsData.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal());
functionInstanceStatsData.setUserExceptionsTotal(metricsData.getUserExceptionsTotal());
functionInstanceStatsData.setAvgProcessLatency(metricsData.getAvgProcessLatency() == 0.0 ? null : metricsData.getAvgProcessLatency());
functionInstanceStatsData.setLastInvocation(metricsData.getLastInvocation() == 0 ? null : metricsData.getLastInvocation());
functionInstanceStatsData.oneMin.setReceivedTotal(metricsData.getReceivedTotal1Min());
functionInstanceStatsData.oneMin.setProcessedSuccessfullyTotal(metricsData.getProcessedSuccessfullyTotal1Min());
functionInstanceStatsData.oneMin.setSystemExceptionsTotal(metricsData.getSystemExceptionsTotal1Min());
functionInstanceStatsData.oneMin.setUserExceptionsTotal(metricsData.getUserExceptionsTotal1Min());
functionInstanceStatsData.oneMin.setAvgProcessLatency(metricsData.getAvgProcessLatency1Min() == 0.0 ? null : metricsData.getAvgProcessLatency1Min());
// Filter out values that are NaN
Map<String, Double> statsDataMap = metricsData.getUserMetricsMap().entrySet().stream()
.filter(stringDoubleEntry -> !stringDoubleEntry.getValue().isNaN())
.collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
functionInstanceStatsData.setUserMetrics(statsDataMap);
functionInstanceStats.setMetrics(functionInstanceStatsData);
} catch (InterruptedException | ExecutionException e) {
log.warn("Failed to collect metrics for function instance {}", fullyQualifiedInstanceName, e);
}
}
}
return functionInstanceStats;
}
public static File dumpToTmpFile(final InputStream uploadedInputStream) {
try {
File tmpFile = FunctionCommon.createPkgTempFile();
tmpFile.deleteOnExit();
Files.copy(uploadedInputStream, tmpFile.toPath(), REPLACE_EXISTING);
return tmpFile;
} catch (IOException e) {
throw new RuntimeException("Cannot create a temporary file", e);
}
}
public static boolean isFunctionCodeBuiltin(Function.FunctionDetailsOrBuilder functionDetails) {
if (functionDetails.hasSource()) {
Function.SourceSpec sourceSpec = functionDetails.getSource();
if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
return true;
}
}
if (functionDetails.hasSink()) {
Function.SinkSpec sinkSpec = functionDetails.getSink();
if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
return true;
}
}
return false;
}
}