blob: 6dfd8c5e766b5b7e8cf9e744563204e5aca294a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.Offloaders;
import org.apache.bookkeeper.mledger.offload.OffloadersCache;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
/**
* Implementation of a cache for the Pulsar connector.
*/
public class PulsarConnectorCache {
private static final Logger log = Logger.get(PulsarConnectorCache.class);
@VisibleForTesting
static PulsarConnectorCache instance;
private final ManagedLedgerFactory managedLedgerFactory;
private final StatsProvider statsProvider;
private OrderedScheduler offloaderScheduler;
private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory";
private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver";
private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads";
private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
StatsProvider.class, getClass().getClassLoader());
// start stats provider
ClientConfiguration clientConfiguration = new ClientConfiguration();
pulsarConnectorConfig.getStatsProviderConfigs().forEach(clientConfiguration::setProperty);
this.statsProvider.start(clientConfiguration);
this.defaultOffloader = initManagedLedgerOffloader(
pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig);
}
public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
synchronized (PulsarConnectorCache.class) {
if (instance == null) {
instance = new PulsarConnectorCache(pulsarConnectorConfig);
}
}
return instance;
}
private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
throws Exception {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
.setMetadataServiceUri("zk://" + pulsarConnectorConfig.getZookeeperUri()
.replace(",", ";") + "/ledgers")
.setClientTcpNoDelay(false)
.setUseV2WireProtocol(pulsarConnectorConfig.getBookkeeperUseV2Protocol())
.setExplictLacInterval(pulsarConnectorConfig.getBookkeeperExplicitInterval())
.setStickyReadsEnabled(false)
.setReadEntryTimeout(60)
.setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
.setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
.setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads(
pulsarConnectorConfig.getManagedLedgerNumWorkerThreads());
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(
pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads());
return new ManagedLedgerFactoryImpl(bkClientConfiguration, pulsarConnectorConfig.getZookeeperUri(),managedLedgerFactoryConfig);
}
public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, OffloadPolicies offloadPolicies,
PulsarConnectorConfig pulsarConnectorConfig) {
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
if (offloadPolicies == null) {
managedLedgerConfig.setLedgerOffloader(this.defaultOffloader);
} else {
LedgerOffloader ledgerOffloader = offloaderMap.compute(namespaceName,
(ns, offloader) -> {
if (offloader != null && Objects.equals(offloader.getOffloadPolicies(), offloadPolicies)) {
return offloader;
} else {
if (offloader != null) {
offloader.close();
}
return initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig);
}
});
managedLedgerConfig.setLedgerOffloader(ledgerOffloader);
}
return managedLedgerConfig;
}
private synchronized OrderedScheduler getOffloaderScheduler(OffloadPolicies offloadPolicies) {
if (this.offloaderScheduler == null) {
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
.name("pulsar-offloader").build();
}
return this.offloaderScheduler;
}
private LedgerOffloader initManagedLedgerOffloader(OffloadPolicies offloadPolicies,
PulsarConnectorConfig pulsarConnectorConfig) {
try {
if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) {
checkNotNull(offloadPolicies.getOffloadersDirectory(),
"Offloader driver is configured to be '%s' but no offloaders directory is configured.",
offloadPolicies.getManagedLedgerOffloadDriver());
Offloaders offloaders = offloadersCache.getOrLoadOffloaders(offloadPolicies.getOffloadersDirectory(),
pulsarConnectorConfig.getNarExtractionDirectory());
LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
return offloaderFactory.create(
offloadPolicies,
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
getOffloaderScheduler(offloadPolicies));
} catch (IOException ioe) {
log.error("Failed to create offloader: ", ioe);
throw new RuntimeException(ioe.getMessage(), ioe.getCause());
}
} else {
log.info("No ledger offloader configured, using NULL instance");
return NullLedgerOffloader.INSTANCE;
}
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerFactory;
}
public StatsProvider getStatsProvider() {
return statsProvider;
}
public static void shutdown() throws Exception {
synchronized (PulsarConnectorCache.class) {
if (instance != null) {
instance.statsProvider.stop();
instance.managedLedgerFactory.shutdown();
instance.offloaderScheduler.shutdown();
instance.offloadersCache.close();
}
}
}
}