| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.bookkeeper.mledger.offload.jcloud.impl; |
| |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import io.netty.buffer.ByteBuf; |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; |
| import org.apache.bookkeeper.client.api.LedgerEntries; |
| import org.apache.bookkeeper.client.api.LedgerEntry; |
| import org.apache.bookkeeper.client.api.LedgerMetadata; |
| import org.apache.bookkeeper.client.api.ReadHandle; |
| import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; |
| import org.apache.bookkeeper.client.impl.LedgerEntryImpl; |
| import org.apache.bookkeeper.mledger.LedgerOffloaderStats; |
| import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; |
| import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; |
| import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; |
| import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck; |
| import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; |
| import org.jclouds.blobstore.BlobStore; |
| import org.jclouds.blobstore.domain.Blob; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class BlobStoreBackedReadHandleImpl implements ReadHandle { |
| private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class); |
| private static final int CACHE_TTL_SECONDS = |
| Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 30 * 60); |
| |
| private final long ledgerId; |
| private final OffloadIndexBlock index; |
| private final BackedInputStream inputStream; |
| private final DataInputStream dataStream; |
| private final ExecutorService executor; |
| // this Cache is accessed only by one thread |
| private final Cache<Long, Long> entryOffsets = CacheBuilder |
| .newBuilder() |
| .expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS) |
| .build(); |
| |
| enum State { |
| Opened, |
| Closed |
| } |
| |
| private State state = null; |
| |
| private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, |
| BackedInputStream inputStream, ExecutorService executor) { |
| this.ledgerId = ledgerId; |
| this.index = index; |
| this.inputStream = inputStream; |
| this.dataStream = new DataInputStream(inputStream); |
| this.executor = executor; |
| state = State.Opened; |
| } |
| |
| @Override |
| public long getId() { |
| return ledgerId; |
| } |
| |
| @Override |
| public LedgerMetadata getLedgerMetadata() { |
| return index.getLedgerMetadata(); |
| } |
| |
| @Override |
| public CompletableFuture<Void> closeAsync() { |
| CompletableFuture<Void> promise = new CompletableFuture<>(); |
| executor.execute(() -> { |
| try { |
| index.close(); |
| inputStream.close(); |
| entryOffsets.invalidateAll(); |
| state = State.Closed; |
| promise.complete(null); |
| } catch (IOException t) { |
| promise.completeExceptionally(t); |
| } |
| }); |
| return promise; |
| } |
| |
| @Override |
| public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) { |
| if (log.isDebugEnabled()) { |
| log.debug("Ledger {}: reading {} - {} ({} entries}", |
| getId(), firstEntry, lastEntry, (1 + lastEntry - firstEntry)); |
| } |
| CompletableFuture<LedgerEntries> promise = new CompletableFuture<>(); |
| executor.execute(() -> { |
| List<LedgerEntry> entries = new ArrayList<LedgerEntry>(); |
| boolean seeked = false; |
| try { |
| if (state == State.Closed) { |
| log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", |
| ledgerId, firstEntry, lastEntry); |
| throw new BKException.BKUnexpectedConditionException(); |
| } |
| if (firstEntry > lastEntry |
| || firstEntry < 0 |
| || lastEntry > getLastAddConfirmed()) { |
| promise.completeExceptionally(new BKException.BKIncorrectParameterException()); |
| return; |
| } |
| long entriesToRead = (lastEntry - firstEntry) + 1; |
| long nextExpectedId = firstEntry; |
| |
| // checking the data stream has enough data to read to avoid throw EOF exception when reading data. |
| // 12 bytes represent the stream have the length and entryID to read. |
| if (dataStream.available() < 12) { |
| log.warn("There hasn't enough data to read, current available data has {} bytes," |
| + " seek to the first entry {} to avoid EOF exception", inputStream.available(), firstEntry); |
| seekToEntry(firstEntry); |
| } |
| |
| while (entriesToRead > 0) { |
| long currentPosition = inputStream.getCurrentPosition(); |
| int length = dataStream.readInt(); |
| if (length < 0) { // hit padding or new block |
| seekToEntry(nextExpectedId); |
| continue; |
| } |
| long entryId = dataStream.readLong(); |
| |
| if (entryId == nextExpectedId) { |
| entryOffsets.put(entryId, currentPosition); |
| ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length); |
| entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf)); |
| int toWrite = length; |
| while (toWrite > 0) { |
| toWrite -= buf.writeBytes(dataStream, toWrite); |
| } |
| entriesToRead--; |
| nextExpectedId++; |
| } else if (entryId > nextExpectedId && entryId < lastEntry) { |
| log.warn("The read entry {} is not the expected entry {} but in the range of {} - {}," |
| + " seeking to the right position", entryId, nextExpectedId, nextExpectedId, lastEntry); |
| seekToEntry(nextExpectedId); |
| } else if (entryId < nextExpectedId |
| && !index.getIndexEntryForEntry(nextExpectedId).equals(index.getIndexEntryForEntry(entryId))) { |
| log.warn("Read an unexpected entry id {} which is smaller than the next expected entry id {}" |
| + ", seeking to the right position", entryId, nextExpectedId); |
| seekToEntry(nextExpectedId); |
| } else if (entryId > lastEntry) { |
| // in the normal case, the entry id should increment in order. But if there has random access in |
| // the read method, we should allow to seek to the right position and the entry id should |
| // never over to the last entry again. |
| if (!seeked) { |
| seekToEntry(nextExpectedId); |
| seeked = true; |
| continue; |
| } |
| log.info("Expected to read {}, but read {}, which is greater than last entry {}", |
| nextExpectedId, entryId, lastEntry); |
| throw new BKException.BKUnexpectedConditionException(); |
| } else { |
| long ignore = inputStream.skip(length); |
| } |
| } |
| |
| promise.complete(LedgerEntriesImpl.create(entries)); |
| } catch (Throwable t) { |
| log.error("Failed to read entries {} - {} from the offloader in ledger {}", |
| firstEntry, lastEntry, ledgerId, t); |
| promise.completeExceptionally(t); |
| entries.forEach(LedgerEntry::close); |
| } |
| }); |
| return promise; |
| } |
| |
| private void seekToEntry(long nextExpectedId) throws IOException { |
| Long knownOffset = entryOffsets.getIfPresent(nextExpectedId); |
| if (knownOffset != null) { |
| inputStream.seek(knownOffset); |
| } else { |
| // we don't know the exact position |
| // we seek to somewhere before the entry |
| long dataOffset = index.getIndexEntryForEntry(nextExpectedId).getDataOffset(); |
| inputStream.seek(dataOffset); |
| } |
| } |
| |
| @Override |
| public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) { |
| return readAsync(firstEntry, lastEntry); |
| } |
| |
| @Override |
| public CompletableFuture<Long> readLastAddConfirmedAsync() { |
| return CompletableFuture.completedFuture(getLastAddConfirmed()); |
| } |
| |
| @Override |
| public CompletableFuture<Long> tryReadLastAddConfirmedAsync() { |
| return CompletableFuture.completedFuture(getLastAddConfirmed()); |
| } |
| |
| @Override |
| public long getLastAddConfirmed() { |
| return getLedgerMetadata().getLastEntryId(); |
| } |
| |
| @Override |
| public long getLength() { |
| return getLedgerMetadata().getLength(); |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return getLedgerMetadata().isClosed(); |
| } |
| |
| @Override |
| public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, |
| long timeOutInMillis, |
| boolean parallel) { |
| CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>(); |
| promise.completeExceptionally(new UnsupportedOperationException()); |
| return promise; |
| } |
| |
| public static ReadHandle open(ScheduledExecutorService executor, |
| BlobStore blobStore, String bucket, String key, String indexKey, |
| VersionCheck versionCheck, |
| long ledgerId, int readBufferSize, |
| LedgerOffloaderStats offloaderStats, String managedLedgerName) |
| throws IOException { |
| int retryCount = 3; |
| OffloadIndexBlock index = null; |
| IOException lastException = null; |
| // The following retry is used to avoid to some network issue cause read index file failure. |
| // If it can not recovery in the retry, we will throw the exception and the dispatcher will schedule to |
| // next read. |
| // If we use a backoff to control the retry, it will introduce a concurrent operation. |
| // We don't want to make it complicated, because in the most of case it shouldn't in the retry loop. |
| while (retryCount-- > 0) { |
| long readIndexStartTime = System.nanoTime(); |
| Blob blob = blobStore.getBlob(bucket, indexKey); |
| offloaderStats.recordReadOffloadIndexLatency(managedLedgerName, |
| System.nanoTime() - readIndexStartTime, TimeUnit.NANOSECONDS); |
| versionCheck.check(indexKey, blob); |
| OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); |
| try (InputStream payLoadStream = blob.getPayload().openStream()) { |
| index = (OffloadIndexBlock) indexBuilder.fromStream(payLoadStream); |
| } catch (IOException e) { |
| // retry to avoid the network issue caused read failure |
| log.warn("Failed to get index block from the offoaded index file {}, still have {} times to retry", |
| indexKey, retryCount, e); |
| lastException = e; |
| continue; |
| } |
| lastException = null; |
| break; |
| } |
| if (lastException != null) { |
| throw lastException; |
| } |
| |
| BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key, |
| versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName); |
| |
| return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor); |
| } |
| |
| // for testing |
| State getState() { |
| return this.state; |
| } |
| } |