blob: 3909a89cf5eb213aa3a2fd52425192a3e441e3b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.impl;
import com.google.common.collect.MapMaker;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@Slf4j
public class LocalMemoryMetadataStore extends AbstractMetadataStore implements MetadataStoreExtended {
static final String MEMORY_SCHEME = "memory";
static final String MEMORY_SCHEME_IDENTIFIER = "memory:";
@Data
private static class Value {
final long version;
final byte[] data;
final long createdTimestamp;
final long modifiedTimestamp;
final boolean ephemeral;
}
private final NavigableMap<String, Value> map;
private final AtomicLong sequentialIdGenerator;
private MetadataEventSynchronizer synchronizer;
private static final Map<String, NavigableMap<String, Value>> STATIC_MAPS = new MapMaker()
.weakValues().makeMap();
// Manage all instances to facilitate registration to the same listener
private static final Map<String, Set<AbstractMetadataStore>> STATIC_INSTANCE = new MapMaker()
.weakValues().makeMap();
private static final Map<String, AtomicLong> STATIC_ID_GEN_MAP = new MapMaker()
.weakValues().makeMap();
public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
throws MetadataStoreException {
super(metadataStoreConfig.getMetadataStoreName());
String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
// Local means a private data set
// update synchronizer and register sync listener
updateMetadataEventSynchronizer(metadataStoreConfig.getSynchronizer());
if ("local".equals(name)) {
map = new TreeMap<>();
sequentialIdGenerator = new AtomicLong();
} else {
// Use a reference from a shared data set
map = STATIC_MAPS.computeIfAbsent(name, __ -> new TreeMap<>());
STATIC_INSTANCE.compute(name, (key, value) -> {
if (value == null) {
value = new HashSet<>();
}
value.forEach(v -> {
registerListener(v);
v.registerListener(this);
});
value.add(this);
return value;
});
sequentialIdGenerator = STATIC_ID_GEN_MAP.computeIfAbsent(name, __ -> new AtomicLong());
log.info("Created LocalMemoryDataStore for '{}'", name);
}
}
@Override
public CompletableFuture<Optional<GetResult>> storeGet(String path) {
synchronized (map) {
Value v = map.get(path);
if (v != null) {
return FutureUtils.value(
Optional.of(
new GetResult(v.data, new Stat(path, v.version, v.createdTimestamp, v.modifiedTimestamp,
v.isEphemeral(), true))));
} else {
return FutureUtils.value(Optional.empty());
}
}
}
@Override
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
String firstKey = path.equals("/") ? path : path + "/";
String lastKey = path.equals("/") ? "0" : path + "0"; // '0' is lexicographically just after '/'
Set<String> children = new TreeSet<>();
map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
String relativePath = key.replaceFirst(firstKey, "");
// Only return first-level children
String child = relativePath.split("/", 2)[0];
children.add(child);
});
return FutureUtils.value(new ArrayList<>(children));
}
}
@Override
public CompletableFuture<Boolean> existsFromStore(String path) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
Value v = map.get(path);
return FutureUtils.value(v != null);
}
}
@Override
public CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
boolean hasVersion = optExpectedVersion.isPresent();
int expectedVersion = optExpectedVersion.orElse(-1L).intValue();
if (options.contains(CreateOption.Sequential)) {
path += Long.toString(sequentialIdGenerator.getAndIncrement());
}
long now = System.currentTimeMillis();
if (hasVersion && expectedVersion == -1) {
Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral));
Value existingValue = map.putIfAbsent(path, newValue);
if (existingValue != null) {
return FutureUtils.exception(new BadVersionException(""));
} else {
receivedNotification(new Notification(NotificationType.Created, path));
notifyParentChildrenChanged(path);
return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true));
}
} else {
Value existingValue = map.get(path);
long existingVersion = existingValue != null ? existingValue.version : -1;
if (hasVersion && expectedVersion != existingVersion) {
return FutureUtils.exception(new BadVersionException(""));
} else {
long newVersion = existingValue != null ? existingValue.version + 1 : 0;
long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now;
Value newValue = new Value(newVersion, data, createdTimestamp, now,
options.contains(CreateOption.Ephemeral));
map.put(path, newValue);
NotificationType type =
existingValue == null ? NotificationType.Created : NotificationType.Modified;
receivedNotification(new Notification(type, path));
if (type == NotificationType.Created) {
notifyParentChildrenChanged(path);
}
return FutureUtils
.value(new Stat(path, newValue.version, newValue.createdTimestamp,
newValue.modifiedTimestamp,
false, true));
}
}
}
}
@Override
public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpectedVersion) {
if (!isValidPath(path)) {
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
Value value = map.get(path);
if (value == null) {
return FutureUtils.exception(new NotFoundException(""));
} else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) {
return FutureUtils.exception(new BadVersionException(""));
} else {
map.remove(path);
receivedNotification(new Notification(NotificationType.Deleted, path));
notifyParentChildrenChanged(path);
return FutureUtils.value(null);
}
}
}
@Override
public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
return Optional.ofNullable(synchronizer);
}
@Override
public void updateMetadataEventSynchronizer(MetadataEventSynchronizer synchronizer) {
this.synchronizer = synchronizer;
registerSyncListener(Optional.ofNullable(synchronizer));
}
@Override
public void close() throws Exception {
if (isClosed.compareAndSet(false, true)) {
super.close();
}
}
}
class MemoryMetadataStoreProvider implements MetadataStoreProvider {
@Override
public String urlScheme() {
return LocalMemoryMetadataStore.MEMORY_SCHEME;
}
@Override
public MetadataStore create(String metadataURL, MetadataStoreConfig metadataStoreConfig,
boolean enableSessionWatcher) throws MetadataStoreException {
return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
}
}