blob: 817c6a7732c66635dbacde6cf282cb38c6db36cb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle.OfferEntryResult;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.OffloadSegmentInfoImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock.IndexInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.BlobStoreLocation;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.BlobBuilder;
import org.jclouds.blobstore.domain.MultipartPart;
import org.jclouds.blobstore.domain.MultipartUpload;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
import org.jclouds.io.Payload;
import org.jclouds.io.Payloads;
import org.jclouds.io.payloads.InputStreamPayload;
/**
* Tiered Storage Offloader that is backed by a JCloud Blob Store.
* <p>
* The constructor takes an instance of TieredStorageConfiguration, which
* contains all of the configuration data necessary to connect to a JCloud
* Provider service.
* </p>
*/
@Slf4j
public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
private final OrderedScheduler scheduler;
private final TieredStorageConfiguration config;
private final Location writeLocation;
// metadata to be stored as part of the offloaded ledger metadata
private final Map<String, String> userMetadata;
private final ConcurrentMap<BlobStoreLocation, BlobStore> blobStores = new ConcurrentHashMap<>();
private OffloadSegmentInfoImpl segmentInfo;
private AtomicLong bufferLength = new AtomicLong(0);
private AtomicLong segmentLength = new AtomicLong(0);
final private long maxBufferLength;
final private ConcurrentLinkedQueue<Entry> offloadBuffer = new ConcurrentLinkedQueue<>();
private CompletableFuture<OffloadResult> offloadResult;
private volatile PositionImpl lastOfferedPosition = PositionImpl.LATEST;
private final Duration maxSegmentCloseTime;
private final long minSegmentCloseTimeMillis;
private final long segmentBeginTimeMillis;
private final long maxSegmentLength;
private final int streamingBlockSize;
private volatile ManagedLedger ml;
private OffloadIndexBlockV2Builder streamingIndexBuilder;
public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config,
Map<String, String> userMetadata,
OrderedScheduler scheduler) throws IOException {
return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata);
}
BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler,
Map<String, String> userMetadata) {
this.scheduler = scheduler;
this.userMetadata = userMetadata;
this.config = config;
this.streamingBlockSize = config.getMinBlockSizeInBytes();
this.maxSegmentCloseTime = Duration.ofSeconds(config.getMaxSegmentTimeInSecond());
this.maxSegmentLength = config.getMaxSegmentSizeInBytes();
this.minSegmentCloseTimeMillis = Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis();
//ensure buffer can have enough content to fill a block
this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(), config.getMinBlockSizeInBytes());
this.segmentBeginTimeMillis = System.currentTimeMillis();
if (!Strings.isNullOrEmpty(config.getRegion())) {
this.writeLocation = new LocationBuilder()
.scope(LocationScope.REGION)
.id(config.getRegion())
.description(config.getRegion())
.build();
} else {
this.writeLocation = null;
}
log.info("Constructor offload driver: {}, host: {}, container: {}, region: {} ",
config.getProvider().getDriver(), config.getServiceEndpoint(),
config.getBucket(), config.getRegion());
blobStores.putIfAbsent(config.getBlobStoreLocation(), config.getBlobStore());
log.info("The ledger offloader was created.");
}
@Override
public String getOffloadDriverName() {
return config.getDriver();
}
@Override
public Map<String, String> getOffloadDriverMetadata() {
return config.getOffloadDriverMetadata();
}
/**
* Upload the DataBlocks associated with the given ReadHandle using MultiPartUpload,
* Creating indexBlocks for each corresponding DataBlock that is uploaded.
*/
@Override
public CompletableFuture<Void> offload(ReadHandle readHandle,
UUID uuid,
Map<String, String> extraMetadata) {
final BlobStore writeBlobStore = blobStores.get(config.getBlobStoreLocation());
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(readHandle.getId()).submit(() -> {
if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
promise.completeExceptionally(
new IllegalArgumentException("An empty or open ledger should never be offloaded"));
return;
}
OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create()
.withLedgerMetadata(readHandle.getLedgerMetadata())
.withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
String dataBlockKey = DataBlockUtils.dataBlockOffloadKey(readHandle.getId(), uuid);
String indexBlockKey = DataBlockUtils.indexBlockOffloadKey(readHandle.getId(), uuid);
MultipartUpload mpu = null;
List<MultipartPart> parts = Lists.newArrayList();
// init multi part upload for data block.
try {
BlobBuilder blobBuilder = writeBlobStore.blobBuilder(dataBlockKey);
DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
Blob blob = blobBuilder.build();
mpu = writeBlobStore.initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions());
} catch (Throwable t) {
promise.completeExceptionally(t);
return;
}
long dataObjectLength = 0;
// start multi part upload for data block.
try {
long startEntry = 0;
int partId = 1;
long entryBytesWritten = 0;
while (startEntry <= readHandle.getLastAddConfirmed()) {
int blockSize = BlockAwareSegmentInputStreamImpl
.calculateBlockSize(config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten);
try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl(
readHandle, startEntry, blockSize)) {
Payload partPayload = Payloads.newInputStreamPayload(blockStream);
partPayload.getContentMetadata().setContentLength((long) blockSize);
partPayload.getContentMetadata().setContentType("application/octet-stream");
parts.add(writeBlobStore.uploadMultipartPart(mpu, partId, partPayload));
log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}",
config.getBucket(), dataBlockKey, partId, mpu.id());
indexBuilder.addBlock(startEntry, partId, blockSize);
if (blockStream.getEndEntryId() != -1) {
startEntry = blockStream.getEndEntryId() + 1;
} else {
// could not read entry from ledger.
break;
}
entryBytesWritten += blockStream.getBlockEntryBytesCount();
partId++;
}
dataObjectLength += blockSize;
}
writeBlobStore.completeMultipartUpload(mpu, parts);
mpu = null;
} catch (Throwable t) {
try {
if (mpu != null) {
writeBlobStore.abortMultipartUpload(mpu);
}
} catch (Throwable throwable) {
log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.",
config.getBucket(), dataBlockKey, mpu.id(), throwable);
}
promise.completeExceptionally(t);
return;
}
// upload index block
try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build();
IndexInputStream indexStream = index.toStream()) {
// write the index block
BlobBuilder blobBuilder = writeBlobStore.blobBuilder(indexBlockKey);
DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
Payload indexPayload = Payloads.newInputStreamPayload(indexStream);
indexPayload.getContentMetadata().setContentLength((long) indexStream.getStreamSize());
indexPayload.getContentMetadata().setContentType("application/octet-stream");
Blob blob = blobBuilder
.payload(indexPayload)
.contentLength((long) indexStream.getStreamSize())
.build();
writeBlobStore.putBlob(config.getBucket(), blob);
promise.complete(null);
} catch (Throwable t) {
try {
writeBlobStore.removeBlob(config.getBucket(), dataBlockKey);
} catch (Throwable throwable) {
log.error("Failed deleteObject in bucket - {} with key - {}.",
config.getBucket(), dataBlockKey, throwable);
}
promise.completeExceptionally(t);
return;
}
});
return promise;
}
BlobStore blobStore;
String streamingDataBlockKey;
String streamingDataIndexKey;
MultipartUpload streamingMpu = null;
List<MultipartPart> streamingParts = Lists.newArrayList();
@Override
public CompletableFuture<OffloadHandle> streamingOffload(@NonNull ManagedLedger ml, UUID uuid, long beginLedger,
long beginEntry,
Map<String, String> driverMetadata) {
if (this.ml != null) {
log.error("streamingOffload should only be called once");
final CompletableFuture<OffloadHandle> result = new CompletableFuture<>();
result.completeExceptionally(new RuntimeException("streamingOffload should only be called once"));
}
this.ml = ml;
this.segmentInfo = new OffloadSegmentInfoImpl(uuid, beginLedger, beginEntry, config.getDriver(),
driverMetadata);
log.debug("begin offload with {}:{}", beginLedger, beginEntry);
this.offloadResult = new CompletableFuture<>();
blobStore = blobStores.get(config.getBlobStoreLocation());
streamingIndexBuilder = OffloadIndexBlockV2Builder.create();
streamingDataBlockKey = segmentInfo.uuid.toString();
streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid);
BlobBuilder blobBuilder = blobStore.blobBuilder(streamingDataBlockKey);
DataBlockUtils.addVersionInfo(blobBuilder, userMetadata);
Blob blob = blobBuilder.build();
streamingMpu = blobStore
.initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions());
scheduler.chooseThread(segmentInfo).execute(() -> {
log.info("start offloading segment: {}", segmentInfo);
streamingOffloadLoop(1, 0);
});
scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(), TimeUnit.MILLISECONDS);
return CompletableFuture.completedFuture(new OffloadHandle() {
@Override
public Position lastOffered() {
return BlobStoreManagedLedgerOffloader.this.lastOffered();
}
@Override
public CompletableFuture<Position> lastOfferedAsync() {
return CompletableFuture.completedFuture(lastOffered());
}
@Override
public OfferEntryResult offerEntry(Entry entry) {
return BlobStoreManagedLedgerOffloader.this.offerEntry(entry);
}
@Override
public CompletableFuture<OfferEntryResult> offerEntryAsync(Entry entry) {
return CompletableFuture.completedFuture(offerEntry(entry));
}
@Override
public CompletableFuture<OffloadResult> getOffloadResultAsync() {
return BlobStoreManagedLedgerOffloader.this.getOffloadResultAsync();
}
@Override
public boolean close() {
return BlobStoreManagedLedgerOffloader.this.closeSegment();
}
});
}
private void streamingOffloadLoop(int partId, int dataObjectLength) {
log.debug("streaming offload loop {} {}", partId, dataObjectLength);
if (segmentInfo.isClosed() && offloadBuffer.isEmpty()) {
buildIndexAndCompleteResult(dataObjectLength);
offloadResult.complete(segmentInfo.result());
} else if ((segmentInfo.isClosed() && !offloadBuffer.isEmpty())
// last time to build and upload block
|| bufferLength.get() >= streamingBlockSize
// buffer size full, build and upload block
) {
List<Entry> entries = new LinkedList<>();
int blockEntrySize = 0;
final Entry firstEntry = offloadBuffer.poll();
entries.add(firstEntry);
long blockLedgerId = firstEntry.getLedgerId();
long blockEntryId = firstEntry.getEntryId();
while (!offloadBuffer.isEmpty() && offloadBuffer.peek().getLedgerId() == blockLedgerId
&& blockEntrySize <= streamingBlockSize) {
final Entry entryInBlock = offloadBuffer.poll();
final int entrySize = entryInBlock.getLength();
bufferLength.addAndGet(-entrySize);
blockEntrySize += entrySize;
entries.add(entryInBlock);
}
final int blockSize = BufferedOffloadStream
.calculateBlockSize(streamingBlockSize, entries.size(), blockEntrySize);
buildBlockAndUpload(blockSize, entries, blockLedgerId, blockEntryId, partId);
streamingOffloadLoop(partId + 1, dataObjectLength + blockSize);
} else {
log.debug("not enough data, delay schedule for part: {} length: {}", partId, dataObjectLength);
scheduler.chooseThread(segmentInfo)
.schedule(() -> {
streamingOffloadLoop(partId, dataObjectLength);
}, 100, TimeUnit.MILLISECONDS);
}
}
private void buildBlockAndUpload(int blockSize, List<Entry> entries, long blockLedgerId, long beginEntryId,
int partId) {
try (final BufferedOffloadStream payloadStream = new BufferedOffloadStream(blockSize, entries,
blockLedgerId, beginEntryId)) {
log.debug("begin upload payload: {} {}", blockLedgerId, beginEntryId);
Payload partPayload = Payloads.newInputStreamPayload(payloadStream);
partPayload.getContentMetadata().setContentType("application/octet-stream");
streamingParts.add(blobStore.uploadMultipartPart(streamingMpu, partId, partPayload));
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
streamingIndexBuilder.addBlock(blockLedgerId, beginEntryId, partId, blockSize);
final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ml.getLedgerInfo(blockLedgerId).get();
final MLDataFormats.ManagedLedgerInfo.LedgerInfo.Builder ledgerInfoBuilder = MLDataFormats.ManagedLedgerInfo.LedgerInfo
.newBuilder();
if (ledgerInfo != null) {
ledgerInfoBuilder.mergeFrom(ledgerInfo);
}
if (ledgerInfoBuilder.getEntries() == 0) {
//ledger unclosed, use last entry id of the block
ledgerInfoBuilder.setEntries(payloadStream.getEndEntryId() + 1);
}
streamingIndexBuilder.addLedgerMeta(blockLedgerId, ledgerInfoBuilder.build());
log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}",
config.getBucket(), streamingDataBlockKey, partId, streamingMpu.id());
} catch (Throwable e) {
blobStore.abortMultipartUpload(streamingMpu);
offloadResult.completeExceptionally(e);
return;
}
}
private void buildIndexAndCompleteResult(long dataObjectLength) {
try {
blobStore.completeMultipartUpload(streamingMpu, streamingParts);
streamingIndexBuilder.withDataObjectLength(dataObjectLength);
final OffloadIndexBlockV2 index = streamingIndexBuilder.buildV2();
final IndexInputStream indexStream = index.toStream();
final BlobBuilder indexBlobBuilder = blobStore.blobBuilder(streamingDataIndexKey);
streamingIndexBuilder.withDataBlockHeaderLength(StreamingDataBlockHeaderImpl.getDataStartOffset());
DataBlockUtils.addVersionInfo(indexBlobBuilder, userMetadata);
try (final InputStreamPayload indexPayLoad = Payloads.newInputStreamPayload(indexStream)) {
indexPayLoad.getContentMetadata().setContentLength(indexStream.getStreamSize());
indexPayLoad.getContentMetadata().setContentType("application/octet-stream");
final Blob indexBlob = indexBlobBuilder.payload(indexPayLoad)
.contentLength(indexStream.getStreamSize())
.build();
blobStore.putBlob(config.getBucket(), indexBlob);
final OffloadResult result = segmentInfo.result();
offloadResult.complete(result);
log.debug("offload segment completed {}", result);
} catch (Exception e) {
log.error("streaming offload failed", e);
offloadResult.completeExceptionally(e);
}
} catch (Exception e) {
log.error("streaming offload failed", e);
offloadResult.completeExceptionally(e);
}
}
private CompletableFuture<OffloadResult> getOffloadResultAsync() {
return this.offloadResult;
}
private synchronized OfferEntryResult offerEntry(Entry entry) {
if (segmentInfo.isClosed()) {
log.debug("Segment already closed {}", segmentInfo);
return OfferEntryResult.FAIL_SEGMENT_CLOSED;
} else if (maxBufferLength <= bufferLength.get()) {
//buffer length can over fill maxBufferLength a bit with the last entry
//to prevent insufficient content to build a block
return OfferEntryResult.FAIL_BUFFER_FULL;
} else {
final EntryImpl entryImpl = EntryImpl
.create(entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer());
offloadBuffer.add(entryImpl);
bufferLength.getAndAdd(entryImpl.getLength());
segmentLength.getAndAdd(entryImpl.getLength());
lastOfferedPosition = entryImpl.getPosition();
if (segmentLength.get() >= maxSegmentLength
&& System.currentTimeMillis() - segmentBeginTimeMillis >= minSegmentCloseTimeMillis) {
closeSegment();
}
return OfferEntryResult.SUCCESS;
}
}
private synchronized boolean closeSegment() {
final boolean result = !segmentInfo.isClosed();
log.debug("close segment {} {}", lastOfferedPosition.getLedgerId(), lastOfferedPosition.getEntryId());
this.segmentInfo.closeSegment(lastOfferedPosition.getLedgerId(), lastOfferedPosition.getEntryId());
return result;
}
private PositionImpl lastOffered() {
return lastOfferedPosition;
}
/**
* Attempts to create a BlobStoreLocation from the values in the offloadDriverMetadata,
* however, if no values are available, it defaults to the currently configured
* provider, region, bucket, etc.
*
* @param offloadDriverMetadata
* @return
*/
private BlobStoreLocation getBlobStoreLocation(Map<String, String> offloadDriverMetadata) {
return (!offloadDriverMetadata.isEmpty()) ? new BlobStoreLocation(offloadDriverMetadata) :
new BlobStoreLocation(getOffloadDriverMetadata());
}
@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket();
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid);
String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid);
scheduler.chooseThread(ledgerId).submit(() -> {
try {
promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId),
readBlobstore,
readBucket, key, indexKey,
DataBlockUtils.VERSION_CHECK,
ledgerId, config.getReadBufferSizeInBytes()));
} catch (Throwable t) {
log.error("Failed readOffloaded: ", t);
promise.completeExceptionally(t);
}
});
return promise;
}
@Override
public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, MLDataFormats.OffloadContext ledgerContext,
Map<String, String> offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket();
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
final List<MLDataFormats.OffloadSegment> offloadSegmentList = ledgerContext.getOffloadSegmentList();
List<String> keys = Lists.newLinkedList();
List<String> indexKeys = Lists.newLinkedList();
offloadSegmentList.forEach(seg -> {
final UUID uuid = new UUID(seg.getUidMsb(), seg.getUidLsb());
final String key = uuid.toString();
final String indexKey = DataBlockUtils.indexBlockOffloadKey(uuid);
keys.add(key);
indexKeys.add(indexKey);
});
scheduler.chooseThread(ledgerId).submit(() -> {
try {
promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId),
readBlobstore,
readBucket, keys, indexKeys,
DataBlockUtils.VERSION_CHECK,
ledgerId, config.getReadBufferSizeInBytes()));
} catch (Throwable t) {
log.error("Failed readOffloaded: ", t);
promise.completeExceptionally(t);
}
});
return promise;
}
@Override
public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid,
Map<String, String> offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket(offloadDriverMetadata);
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.chooseThread(ledgerId).submit(() -> {
try {
readBlobstore.removeBlobs(readBucket,
ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid),
DataBlockUtils.indexBlockOffloadKey(ledgerId, uid)));
promise.complete(null);
} catch (Throwable t) {
log.error("Failed delete Blob", t);
promise.completeExceptionally(t);
}
});
return promise;
}
@Override
public CompletableFuture<Void> deleteOffloaded(UUID uid, Map<String, String> offloadDriverMetadata) {
BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata);
String readBucket = bsKey.getBucket(offloadDriverMetadata);
BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation());
CompletableFuture<Void> promise = new CompletableFuture<>();
scheduler.submit(() -> {
try {
readBlobstore.removeBlobs(readBucket,
ImmutableList.of(uid.toString(),
DataBlockUtils.indexBlockOffloadKey(uid)));
promise.complete(null);
} catch (Throwable t) {
log.error("Failed delete Blob", t);
promise.completeExceptionally(t);
}
});
return promise;
}
@Override
public OffloadPoliciesImpl getOffloadPolicies() {
Properties properties = new Properties();
properties.putAll(config.getConfigProperties());
return OffloadPoliciesImpl.create(properties);
}
@Override
public void close() {
for (BlobStore readBlobStore : blobStores.values()) {
if (readBlobStore != null) {
readBlobStore.getContext().close();
}
}
}
}