blob: 39f7edd5ceed5289f585457d868d516a984821b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.impl;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ConfigOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;
import org.rocksdb.OptionsUtil;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.WriteOptions;
/**
*
*/
@Slf4j
public class RocksdbMetadataStore extends AbstractMetadataStore {
static final String ROCKSDB_SCHEME = "rocksdb";
static final String ROCKSDB_SCHEME_IDENTIFIER = "rocksdb:";
private static final byte[] SEQUENTIAL_ID_KEY = toBytes("__metadata_sequentialId_key");
private static final byte[] INSTANCE_ID_KEY = toBytes("__metadata_instanceId_key");
private final long instanceId;
private final AtomicLong sequentialIdGenerator;
private final TransactionDB db;
private final ReentrantReadWriteLock dbStateLock;
private final WriteOptions writeOptions;
private final ReadOptions optionCache;
private final ReadOptions optionDontCache;
private MetadataEventSynchronizer synchronizer;
private int referenceCount = 1;
private static final Map<String, RocksdbMetadataStore> instancesCache = new ConcurrentHashMap<>();
public static RocksdbMetadataStore get(String metadataStoreUri, MetadataStoreConfig conf)
throws MetadataStoreException {
RocksdbMetadataStore store = instancesCache.get(metadataStoreUri);
if (store != null) {
synchronized (store) {
if (store.referenceCount > 0) {
// Reuse the same store instance
store.referenceCount++;
return store;
}
}
}
// Create a new store instance
store = new RocksdbMetadataStore(metadataStoreUri, conf);
// update synchronizer and register sync listener
store.updateMetadataEventSynchronizer(conf.getSynchronizer());
instancesCache.put(metadataStoreUri, store);
return store;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
static class MetaValue {
private static final int HEADER_SIZE = 4 + 4 + 8 + 8 + 8 + 8 + 1;
private static final int FORMAT_VERSION_V1 = 1;
long version;
long owner;
long createdTimestamp;
long modifiedTimestamp;
boolean ephemeral;
byte[] data;
/**
* Note: we can only add new fields, but not change or remove existing fields.
*/
public byte[] serialize() {
byte[] result = new byte[HEADER_SIZE + data.length];
ByteBuffer buffer = ByteBuffer.wrap(result);
buffer.putInt(HEADER_SIZE);
buffer.putInt(FORMAT_VERSION_V1);
buffer.putLong(version);
buffer.putLong(owner);
buffer.putLong(createdTimestamp);
buffer.putLong(modifiedTimestamp);
buffer.put((byte) (ephemeral ? 1 : 0));
buffer.put(data);
return result;
}
public static MetaValue parse(byte[] dataBytes) throws MetadataStoreException {
if (dataBytes == null) {
return null;
}
if (dataBytes.length < 4) {
throw new MetadataStoreException("Invalid MetaValue data, size=" + dataBytes.length);
}
ByteBuffer buffer = ByteBuffer.wrap(dataBytes);
MetaValue metaValue = new MetaValue();
int headerSize = buffer.getInt();
if (dataBytes.length < headerSize) {
throw new MetadataStoreException(
String.format("Invalid MetaValue data, no enough header data. expect %d, actual %d",
headerSize, dataBytes.length));
}
int formatVersion = buffer.getInt();
if (formatVersion >= FORMAT_VERSION_V1) {
metaValue.version = buffer.getLong();
metaValue.owner = buffer.getLong();
metaValue.createdTimestamp = buffer.getLong();
metaValue.modifiedTimestamp = buffer.getLong();
metaValue.ephemeral = buffer.get() > 0;
} else {
throw new MetadataStoreException("Invalid MetaValue format version=" + formatVersion);
}
buffer.position(headerSize);
metaValue.data = new byte[buffer.remaining()];
buffer.get(metaValue.data);
return metaValue;
}
}
@VisibleForTesting
static byte[] toBytes(String s) {
return s.getBytes(StandardCharsets.UTF_8);
}
@VisibleForTesting
static String toString(byte[] bytes) {
return new String(bytes, StandardCharsets.UTF_8);
}
@VisibleForTesting
static byte[] toBytes(long value) {
return ByteBuffer.wrap(new byte[8]).putLong(value).array();
}
@VisibleForTesting
static long toLong(byte[] bytes) {
return ByteBuffer.wrap(bytes).getLong();
}
private final String metadataUrl;
/**
* @param metadataURL format "rocksdb:{storePath}"
* @param metadataStoreConfig
* @throws MetadataStoreException
*/
private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName());
this.metadataUrl = metadataURL;
try {
RocksDB.loadLibrary();
} catch (Throwable t) {
throw new MetadataStoreException("Failed to load RocksDB JNI library", t);
}
String dataDir = metadataURL.substring("rocksdb:".length());
Path dataPath = FileSystems.getDefault().getPath(dataDir);
try {
Files.createDirectories(dataPath);
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-x---");
Files.setPosixFilePermissions(dataPath, perms);
} catch (IOException e) {
throw new MetadataStoreException("Fail to create RocksDB file directory", e);
}
db = openDB(dataPath.toString(), metadataStoreConfig.getConfigFilePath());
this.writeOptions = new WriteOptions().setSync(metadataStoreConfig.isFsyncEnable());
this.optionCache = new ReadOptions().setFillCache(true);
this.optionDontCache = new ReadOptions().setFillCache(false);
try {
sequentialIdGenerator = loadSequentialIdGenerator();
instanceId = loadInstanceId();
} catch (RocksDBException exception) {
log.error("Error while init metastore state", exception);
close();
throw new MetadataStoreException("Error init metastore state", exception);
}
dbStateLock = new ReentrantReadWriteLock();
log.info("new RocksdbMetadataStore,url={},instanceId={}", metadataStoreConfig, instanceId);
}
private long loadInstanceId() throws RocksDBException {
long instanceId;
byte[] value = db.get(optionDontCache, INSTANCE_ID_KEY);
if (value != null) {
instanceId = toLong(value) + 1;
} else {
instanceId = 0;
}
db.put(writeOptions, INSTANCE_ID_KEY, toBytes(instanceId));
return instanceId;
}
private AtomicLong loadSequentialIdGenerator() throws RocksDBException {
AtomicLong generator = new AtomicLong(0);
byte[] value = db.get(optionDontCache, SEQUENTIAL_ID_KEY);
if (value != null) {
generator.set(toLong(value));
} else {
db.put(writeOptions, INSTANCE_ID_KEY, toBytes(generator.get()));
}
return generator;
}
private TransactionDB openDB(String dataPath, String configFilePath) throws MetadataStoreException {
try (TransactionDBOptions transactionDBOptions = new TransactionDBOptions()) {
if (configFilePath != null) {
DBOptions dbOptions = new DBOptions();
ConfigOptions configOptions = new ConfigOptions();
List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
OptionsUtil.loadOptionsFromFile(configOptions, configFilePath, dbOptions, cfDescriptors);
log.info("Load options from configFile({}), CF.size={},dbConfig={}", configFilePath,
cfDescriptors.size(), dbOptions);
if (log.isDebugEnabled()) {
for (ColumnFamilyDescriptor cfDescriptor : cfDescriptors) {
log.debug("CF={},Options={}", cfDescriptor.getName(), cfDescriptor.getOptions().toString());
}
}
List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
try {
return TransactionDB.open(dbOptions, transactionDBOptions, dataPath, cfDescriptors, cfHandles);
} finally {
dbOptions.close();
configOptions.close();
}
} else {
Options options = new Options();
options.setCreateIfMissing(true);
configLog(options);
try {
return TransactionDB.open(options, transactionDBOptions, dataPath);
} finally {
options.close();
}
}
} catch (RocksDBException e) {
throw new MetadataStoreException("Error open RocksDB database", e);
} catch (Throwable t) {
throw MetadataStoreException.wrap(t);
}
}
private void configLog(Options options) throws IOException {
// Configure file path
String logPath = System.getProperty("pulsar.log.dir", "");
Path logPathSetting;
if (!logPath.isEmpty()) {
logPathSetting = FileSystems.getDefault().getPath(logPath + "/rocksdb-log");
Files.createDirectories(logPathSetting);
options.setDbLogDir(logPathSetting.toString());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-x---");
Files.setPosixFilePermissions(logPathSetting, perms);
}
// Configure log level
String logLevel = System.getProperty("pulsar.log.level", "info");
switch (logLevel) {
case "debug":
options.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL);
break;
case "info":
options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
break;
case "warn":
options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
break;
case "error":
options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
break;
default:
log.warn("Unrecognized RockDB log level: {}", logLevel);
}
// Keep log files for 1month
options.setKeepLogFileNum(30);
options.setLogFileTimeToRoll(TimeUnit.DAYS.toSeconds(1));
}
@Override
public synchronized void close() throws MetadataStoreException {
referenceCount--;
if (referenceCount > 0) {
// We close it only when the last reference is closed;
return;
}
instancesCache.remove(this.metadataUrl, this);
if (isClosed.compareAndSet(false, true)) {
try {
dbStateLock.writeLock().lock();
log.info("close.instanceId={}", instanceId);
db.close();
writeOptions.close();
optionCache.close();
optionDontCache.close();
super.close();
} catch (Throwable throwable) {
throw MetadataStoreException.wrap(throwable);
} finally {
dbStateLock.writeLock().unlock();
}
}
}
@Override
public CompletableFuture<Optional<GetResult>> storeGet(String path) {
if (log.isDebugEnabled()) {
log.debug("getFromStore.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
byte[] value = db.get(optionCache, toBytes(path));
if (value == null) {
return CompletableFuture.completedFuture(Optional.empty());
}
MetaValue metaValue = MetaValue.parse(value);
if (metaValue.ephemeral && metaValue.owner != instanceId) {
delete(path, Optional.empty());
return CompletableFuture.completedFuture(Optional.empty());
}
GetResult result = new GetResult(metaValue.getData(),
new Stat(path,
metaValue.getVersion(),
metaValue.getCreatedTimestamp(),
metaValue.getModifiedTimestamp(),
metaValue.ephemeral,
metaValue.getOwner() == instanceId));
return CompletableFuture.completedFuture(Optional.of(result));
} catch (Throwable e) {
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
dbStateLock.readLock().unlock();
}
}
@Override
protected CompletableFuture<List<String>> getChildrenFromStore(String path) {
if (log.isDebugEnabled()) {
log.debug("getChildrenFromStore.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
try (RocksIterator iterator = db.newIterator(optionDontCache)) {
Set<String> result = new HashSet<>();
String firstKey = path.equals("/") ? path : path + "/";
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
for (iterator.seek(toBytes(firstKey)); iterator.isValid(); iterator.next()) {
String currentPath = toString(iterator.key());
if (lastKey.compareTo(currentPath) <= 0) {
//End of sub paths.
break;
}
byte[] value = iterator.value();
if (value == null) {
continue;
}
MetaValue metaValue = MetaValue.parse(value);
if (metaValue.ephemeral && metaValue.owner != instanceId) {
delete(currentPath, Optional.empty());
continue;
}
//check if it's direct child.
String relativePath = currentPath.substring(firstKey.length());
String child = relativePath.split("/", 2)[0];
result.add(child);
}
List<String> arrayResult = new ArrayList<>(result);
arrayResult.sort(Comparator.naturalOrder());
return CompletableFuture.completedFuture(arrayResult);
}
} catch (Throwable e) {
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
dbStateLock.readLock().unlock();
}
}
@Override
protected CompletableFuture<Boolean> existsFromStore(String path) {
if (log.isDebugEnabled()) {
log.debug("existsFromStore.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
byte[] value = db.get(optionDontCache, toBytes(path));
if (log.isDebugEnabled()) {
if (value != null) {
MetaValue metaValue = MetaValue.parse(value);
log.debug("Get data from db:{}.", metaValue);
}
}
return CompletableFuture.completedFuture(value != null);
} catch (Throwable e) {
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
dbStateLock.readLock().unlock();
}
}
@Override
protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expectedVersion) {
if (log.isDebugEnabled()) {
log.debug("storeDelete.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
MetaValue metaValue = MetaValue.parse(oldValueData);
if (metaValue == null) {
throw new MetadataStoreException.NotFoundException(String.format("path %s not found.", path));
}
if (expectedVersion.isPresent() && !expectedVersion.get().equals(metaValue.getVersion())) {
throw new MetadataStoreException.BadVersionException(
String.format("Version mismatch, actual=%s, expect=%s", metaValue.getVersion(),
expectedVersion.get()));
}
transaction.delete(pathBytes);
transaction.commit();
receivedNotification(new Notification(NotificationType.Deleted, path));
notifyParentChildrenChanged(path);
return CompletableFuture.completedFuture(null);
}
} catch (Throwable e) {
if (log.isDebugEnabled()) {
log.debug("error in storeDelete,path={}", path, e);
}
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
dbStateLock.readLock().unlock();
}
}
@Override
protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> expectedVersion,
EnumSet<CreateOption> options) {
if (log.isDebugEnabled()) {
log.debug("storePut.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
try (Transaction transaction = db.beginTransaction(writeOptions)) {
byte[] pathBytes = toBytes(path);
byte[] oldValueData = transaction.getForUpdate(optionDontCache, pathBytes, true);
MetaValue metaValue = MetaValue.parse(oldValueData);
if (expectedVersion.isPresent()) {
if (metaValue == null && expectedVersion.get() != -1
|| metaValue != null && !expectedVersion.get().equals(metaValue.getVersion())) {
throw new MetadataStoreException.BadVersionException(
String.format("Version mismatch, actual=%s, expect=%s",
metaValue == null ? null : metaValue.getVersion(), expectedVersion.get()));
}
}
boolean created = false;
long timestamp = System.currentTimeMillis();
if (metaValue == null) {
// create new node
metaValue = new MetaValue();
metaValue.version = 0;
metaValue.createdTimestamp = timestamp;
metaValue.ephemeral = options.contains(CreateOption.Ephemeral);
if (options.contains(CreateOption.Sequential)) {
path += sequentialIdGenerator.getAndIncrement();
pathBytes = toBytes(path);
transaction.put(SEQUENTIAL_ID_KEY, toBytes(sequentialIdGenerator.get()));
}
created = true;
} else {
// update old node
metaValue.version++;
}
metaValue.modifiedTimestamp = timestamp;
metaValue.owner = instanceId;
metaValue.data = data;
//handle Sequential
transaction.put(pathBytes, metaValue.serialize());
transaction.commit();
receivedNotification(
new Notification(created ? NotificationType.Created : NotificationType.Modified, path));
if (created) {
notifyParentChildrenChanged(path);
}
return CompletableFuture.completedFuture(
new Stat(path, metaValue.version, metaValue.createdTimestamp, metaValue.modifiedTimestamp,
metaValue.ephemeral, true));
}
} catch (Throwable e) {
if (log.isDebugEnabled()) {
log.debug("error in storePut,path={}", path, e);
}
return FutureUtil.failedFuture(MetadataStoreException.wrap(e));
} finally {
dbStateLock.readLock().unlock();
}
}
@Override
public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
return Optional.ofNullable(synchronizer);
}
@Override
public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) {
this.synchronizer = synchronizer;
registerSyncListener(Optional.ofNullable(synchronizer));
}
}
class RocksdbMetadataStoreProvider implements MetadataStoreProvider {
@Override
public String urlScheme() {
return RocksdbMetadataStore.ROCKSDB_SCHEME;
}
@Override
public MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig,
boolean enableSessionWatcher) throws MetadataStoreException {
return RocksdbMetadataStore.get(metadataURL, metadataStoreConfig);
}
}