blob: 5221b15d5d9c61759c5f9d5e09cbfe39fbb043fe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload.filesystem.impl;
import static org.apache.bookkeeper.mledger.offload.OffloadUtils.parseLedgerMetadata;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileStoreBackedReadHandleImpl implements ReadHandle {
private static final Logger log = LoggerFactory.getLogger(FileStoreBackedReadHandleImpl.class);
private final ExecutorService executor;
private final MapFile.Reader reader;
private final long ledgerId;
private final LedgerMetadata ledgerMetadata;
private FileStoreBackedReadHandleImpl(ExecutorService executor,
MapFile.Reader reader,
long ledgerId) throws IOException {
this.ledgerId = ledgerId;
this.executor = executor;
this.reader = reader;
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
try {
key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX);
reader.get(key, value);
this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes());
} catch (IOException e) {
log.error("Fail to read LedgerMetadata for ledgerId {}",
ledgerId);
throw new IOException("Fail to read LedgerMetadata for ledgerId " + key.get());
}
}
@Override
public long getId() {
return ledgerId;
}
@Override
public LedgerMetadata getLedgerMetadata() {
return ledgerMetadata;
}
@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> promise = new CompletableFuture<>();
executor.submit(() -> {
try {
reader.close();
} catch (IOException t) {
promise.completeExceptionally(t);
}
});
return promise;
}
@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
if (log.isDebugEnabled()) {
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
}
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.submit(() -> {
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
promise.completeExceptionally(new BKException.BKIncorrectParameterException());
return;
}
long entriesToRead = (lastEntry - firstEntry) + 1;
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
long nextExpectedId = firstEntry;
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
try {
key.set(nextExpectedId - 1);
reader.seek(key);
while (entriesToRead > 0) {
reader.next(key, value);
int length = value.getLength();
long entryId = key.get();
if (entryId == nextExpectedId) {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
buf.writeBytes(value.copyBytes());
entriesToRead--;
nextExpectedId++;
} else if (entryId > lastEntry) {
log.info("Expected to read {}, but read {}, which is greater than last entry {}",
nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
}
}
promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
promise.completeExceptionally(t);
entries.forEach(LedgerEntry::close);
}
});
return promise;
}
@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readAsync(firstEntry, lastEntry);
}
@Override
public CompletableFuture<Long> readLastAddConfirmedAsync() {
return CompletableFuture.completedFuture(getLastAddConfirmed());
}
@Override
public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
return CompletableFuture.completedFuture(getLastAddConfirmed());
}
@Override
public long getLastAddConfirmed() {
return getLedgerMetadata().getLastEntryId();
}
@Override
public long getLength() {
return getLedgerMetadata().getLength();
}
@Override
public boolean isClosed() {
return getLedgerMetadata().isClosed();
}
@Override
public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
long timeOutInMillis,
boolean parallel) {
CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
promise.completeExceptionally(new UnsupportedOperationException());
return promise;
}
public static ReadHandle open(ScheduledExecutorService executor,
MapFile.Reader reader,
long ledgerId) throws IOException {
return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId);
}
}