blob: e905884ddd05433548ee5ff3c07f26674a44e79f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.options.GetOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BlobStoreBackedInputStreamImpl extends BackedInputStream {
private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedInputStreamImpl.class);
private final BlobStore blobStore;
private final String bucket;
private final String key;
private final VersionCheck versionCheck;
private final ByteBuf buffer;
private final long objectLen;
private final int bufferSize;
private long cursor;
private long bufferOffsetStart;
private long bufferOffsetEnd;
public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String key,
VersionCheck versionCheck,
long objectLen, int bufferSize) {
this.blobStore = blobStore;
this.bucket = bucket;
this.key = key;
this.versionCheck = versionCheck;
this.buffer = PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize);
this.objectLen = objectLen;
this.bufferSize = bufferSize;
this.cursor = 0;
this.bufferOffsetStart = this.bufferOffsetEnd = -1;
}
/**
* Refill the buffered input if it is empty.
* @return true if there are bytes to read, false otherwise
*/
private boolean refillBufferIfNeeded() throws IOException {
if (buffer.readableBytes() == 0) {
if (cursor >= objectLen) {
return false;
}
long startRange = cursor;
long endRange = Math.min(cursor + bufferSize - 1,
objectLen - 1);
try {
Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange));
versionCheck.check(key, blob);
try (InputStream stream = blob.getPayload().openStream()) {
buffer.clear();
bufferOffsetStart = startRange;
bufferOffsetEnd = endRange;
long bytesRead = endRange - startRange + 1;
int bytesToCopy = (int) bytesRead;
while (bytesToCopy > 0) {
bytesToCopy -= buffer.writeBytes(stream, bytesToCopy);
}
cursor += buffer.readableBytes();
}
} catch (Throwable e) {
throw new IOException("Error reading from BlobStore", e);
}
}
return true;
}
@Override
public int read() throws IOException {
if (refillBufferIfNeeded()) {
return buffer.readUnsignedByte();
} else {
return -1;
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (refillBufferIfNeeded()) {
int bytesToRead = Math.min(len, buffer.readableBytes());
buffer.readBytes(b, off, bytesToRead);
return bytesToRead;
} else {
return -1;
}
}
@Override
public void seek(long position) {
log.debug("Seeking to {} on {}/{}, current position {} (bufStart:{}, bufEnd:{})",
position, bucket, key, cursor, bufferOffsetStart, bufferOffsetEnd);
if (position >= bufferOffsetStart && position <= bufferOffsetEnd) {
long newIndex = position - bufferOffsetStart;
buffer.readerIndex((int) newIndex);
} else {
this.cursor = position;
buffer.clear();
}
}
@Override
public void seekForward(long position) throws IOException {
if (position >= cursor) {
seek(position);
} else {
throw new IOException(String.format("Error seeking, new position %d < current position %d",
position, cursor));
}
}
@Override
public void close() {
buffer.release();
}
@Override
public int available() throws IOException {
return (int) (objectLen - cursor) + buffer.readableBytes();
}
}