blob: 02f1e62426af649d997096f1912aa498b079c12d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload.filesystem.impl;
import static org.apache.bookkeeper.mledger.offload.OffloadUtils.buildLedgerMetadataFormat;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.netty.util.Recycler;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
private static final Logger log = LoggerFactory.getLogger(FileSystemManagedLedgerOffloader.class);
private static final String STORAGE_BASE_PATH = "storageBasePath";
private static final String DRIVER_NAMES = "filesystem";
private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
static final long METADATA_KEY_INDEX = -1;
private final Configuration configuration;
private final String driverName;
private final String storageBasePath;
private final FileSystem fileSystem;
private OrderedScheduler scheduler;
private static final long ENTRIES_PER_READ = 100;
private OrderedScheduler assignmentScheduler;
private OffloadPoliciesImpl offloadPolicies;
public static boolean driverSupported(String driver) {
return DRIVER_NAMES.equals(driver);
}
@Override
public String getOffloadDriverName() {
return driverName;
}
public static FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl conf,
OrderedScheduler scheduler) throws IOException {
return new FileSystemManagedLedgerOffloader(conf, scheduler);
}
private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler) throws IOException {
this.offloadPolicies = conf;
this.configuration = new Configuration();
if (conf.getFileSystemProfilePath() != null) {
String[] paths = conf.getFileSystemProfilePath().split(",");
for (int i = 0; i < paths.length; i++) {
configuration.addResource(new Path(paths[i]));
}
}
if (!"".equals(conf.getFileSystemURI()) && conf.getFileSystemURI() != null) {
configuration.set("fs.defaultFS", conf.getFileSystemURI());
}
if (configuration.get("fs.hdfs.impl") == null) {
this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
}
if (configuration.get("fs.file.impl") == null) {
this.configuration.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
}
this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
this.driverName = conf.getManagedLedgerOffloadDriver();
this.storageBasePath = configuration.get("hadoop.tmp.dir");
this.scheduler = scheduler;
this.fileSystem = FileSystem.get(configuration);
this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(conf.getManagedLedgerOffloadMaxThreads())
.name("offload-assignment").build();
}
@VisibleForTesting
public FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf,
OrderedScheduler scheduler,
String testHDFSPath,
String baseDir) throws IOException {
this.offloadPolicies = conf;
this.configuration = new Configuration();
this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
this.configuration.set("fs.defaultFS", testHDFSPath);
this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
this.driverName = conf.getManagedLedgerOffloadDriver();
this.configuration.set("hadoop.tmp.dir", baseDir);
this.storageBasePath = baseDir;
this.scheduler = scheduler;
this.fileSystem = FileSystem.get(configuration);
this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder()
.numThreads(conf.getManagedLedgerOffloadMaxThreads())
.name("offload-assignment").build();
}
@Override
public Map<String, String> getOffloadDriverMetadata() {
String path = storageBasePath == null ? "null" : storageBasePath;
return ImmutableMap.of(
STORAGE_BASE_PATH, path
);
}
/*
* ledgerMetadata stored in an index of -1
* */
@Override
public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> extraMetadata) {
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(readHandle.getId()).submit(
new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath,
configuration, assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds()));
return promise;
}
private static class LedgerReader implements Runnable {
private final ReadHandle readHandle;
private final UUID uuid;
private final Map<String, String> extraMetadata;
private final CompletableFuture<Void> promise;
private final String storageBasePath;
private final Configuration configuration;
volatile Exception fileSystemWriteException = null;
private OrderedScheduler assignmentScheduler;
private int managedLedgerOffloadPrefetchRounds = 1;
private LedgerReader(ReadHandle readHandle,
UUID uuid,
Map<String, String> extraMetadata,
CompletableFuture<Void> promise,
String storageBasePath,
Configuration configuration,
OrderedScheduler assignmentScheduler,
int managedLedgerOffloadPrefetchRounds) {
this.readHandle = readHandle;
this.uuid = uuid;
this.extraMetadata = extraMetadata;
this.promise = promise;
this.storageBasePath = storageBasePath;
this.configuration = configuration;
this.assignmentScheduler = assignmentScheduler;
this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds;
}
@Override
public void run() {
if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
promise.completeExceptionally(
new IllegalArgumentException("An empty or open ledger should never be offloaded"));
return;
}
long ledgerId = readHandle.getId();
String storagePath = getStoragePath(storageBasePath, extraMetadata.get(MANAGED_LEDGER_NAME));
String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
try {
MapFile.Writer dataWriter = new MapFile.Writer(configuration,
new Path(dataFilePath),
MapFile.Writer.keyClass(LongWritable.class),
MapFile.Writer.valueClass(BytesWritable.class));
//store the ledgerMetadata in -1 index
key.set(METADATA_KEY_INDEX);
byte[] ledgerMetadata = buildLedgerMetadataFormat(readHandle.getLedgerMetadata());
value.set(ledgerMetadata, 0, ledgerMetadata.length);
dataWriter.append(key, value);
AtomicLong haveOffloadEntryNumber = new AtomicLong(0);
long needToOffloadFirstEntryNumber = 0;
CountDownLatch countDownLatch;
//avoid prefetch too much data into memory
Semaphore semaphore = new Semaphore(managedLedgerOffloadPrefetchRounds);
do {
long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1,
readHandle.getLastAddConfirmed());
log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end);
LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get();
semaphore.acquire();
countDownLatch = new CountDownLatch(1);
assignmentScheduler.chooseThread(ledgerId)
.submit(FileSystemWriter.create(ledgerEntriesOnce,
dataWriter, semaphore, countDownLatch, haveOffloadEntryNumber, this));
needToOffloadFirstEntryNumber = end + 1;
} while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed()
&& fileSystemWriteException == null);
countDownLatch.await();
if (fileSystemWriteException != null) {
throw fileSystemWriteException;
}
IOUtils.closeStream(dataWriter);
promise.complete(null);
} catch (Exception e) {
log.error("Exception when get CompletableFuture<LedgerEntries> : ManagerLedgerName: {}, "
+ "LedgerId: {}, UUID: {} ", extraMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
promise.completeExceptionally(e);
}
}
}
private static class FileSystemWriter implements Runnable {
private LedgerEntries ledgerEntriesOnce;
private final LongWritable key = new LongWritable();
private final BytesWritable value = new BytesWritable();
private MapFile.Writer dataWriter;
private CountDownLatch countDownLatch;
private AtomicLong haveOffloadEntryNumber;
private LedgerReader ledgerReader;
private Semaphore semaphore;
private Recycler.Handle<FileSystemWriter> recyclerHandle;
private FileSystemWriter(Recycler.Handle<FileSystemWriter> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
private static final Recycler<FileSystemWriter> RECYCLER = new Recycler<FileSystemWriter>() {
@Override
protected FileSystemWriter newObject(Recycler.Handle<FileSystemWriter> handle) {
return new FileSystemWriter(handle);
}
};
private void recycle() {
this.dataWriter = null;
this.countDownLatch = null;
this.haveOffloadEntryNumber = null;
this.ledgerReader = null;
this.ledgerEntriesOnce = null;
this.semaphore = null;
recyclerHandle.recycle(this);
}
public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce,
MapFile.Writer dataWriter,
Semaphore semaphore,
CountDownLatch countDownLatch,
AtomicLong haveOffloadEntryNumber,
LedgerReader ledgerReader) {
FileSystemWriter writer = RECYCLER.get();
writer.ledgerReader = ledgerReader;
writer.dataWriter = dataWriter;
writer.countDownLatch = countDownLatch;
writer.haveOffloadEntryNumber = haveOffloadEntryNumber;
writer.ledgerEntriesOnce = ledgerEntriesOnce;
writer.semaphore = semaphore;
return writer;
}
@Override
public void run() {
if (ledgerReader.fileSystemWriteException == null) {
Iterator<LedgerEntry> iterator = ledgerEntriesOnce.iterator();
while (iterator.hasNext()) {
LedgerEntry entry = iterator.next();
long entryId = entry.getEntryId();
key.set(entryId);
try {
value.set(entry.getEntryBytes(), 0, entry.getEntryBytes().length);
dataWriter.append(key, value);
} catch (IOException e) {
ledgerReader.fileSystemWriteException = e;
break;
}
haveOffloadEntryNumber.incrementAndGet();
}
}
countDownLatch.countDown();
ledgerEntriesOnce.close();
semaphore.release();
this.recycle();
}
}
@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid,
Map<String, String> offloadDriverMetadata) {
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid);
scheduler.chooseThread(ledgerId).submit(() -> {
try {
MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath),
configuration);
promise.complete(FileStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
reader,
ledgerId));
} catch (Throwable t) {
log.error("Failed to open FileStoreBackedReadHandleImpl: ManagerLedgerName: {}, "
+ "LegerId: {}, UUID: {}", offloadDriverMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, t);
promise.completeExceptionally(t);
}
});
return promise;
}
private static String getStoragePath(String storageBasePath, String managedLedgerName) {
return storageBasePath == null ? managedLedgerName + "/" : storageBasePath + "/" + managedLedgerName + "/";
}
private static String getDataFilePath(String storagePath, long ledgerId, UUID uuid) {
return storagePath + ledgerId + "-" + uuid.toString();
}
@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid, Map<String, String> offloadDriverMetadata) {
String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME));
String dataFilePath = getDataFilePath(storagePath, ledgerId, uid);
CompletableFuture<Void> promise = new CompletableFuture<>();
try {
fileSystem.delete(new Path(dataFilePath), true);
promise.complete(null);
} catch (IOException e) {
log.error("Failed to delete Offloaded: ", e);
promise.completeExceptionally(e);
}
return promise;
}
@Override
public OffloadPoliciesImpl getOffloadPolicies() {
return offloadPolicies;
}
@Override
public void close() {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (Exception e) {
log.error("FileSystemManagedLedgerOffloader close failed!", e);
}
}
}
}