blob: ffb8ce241edef91212853759af455f93545c6e5f [file] [log] [blame]
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.apache.couchdb.nouveau.lucene9;
import com.ibm.cloud.objectstorage.AmazonServiceException;
import com.ibm.cloud.objectstorage.services.s3.AmazonS3;
import com.ibm.cloud.objectstorage.services.s3.model.AbortMultipartUploadRequest;
import com.ibm.cloud.objectstorage.services.s3.model.CompleteMultipartUploadRequest;
import com.ibm.cloud.objectstorage.services.s3.model.GetObjectRequest;
import com.ibm.cloud.objectstorage.services.s3.model.InitiateMultipartUploadRequest;
import com.ibm.cloud.objectstorage.services.s3.model.ListMultipartUploadsRequest;
import com.ibm.cloud.objectstorage.services.s3.model.ListObjectsV2Request;
import com.ibm.cloud.objectstorage.services.s3.model.ListObjectsV2Result;
import com.ibm.cloud.objectstorage.services.s3.model.MultipartUploadListing;
import com.ibm.cloud.objectstorage.services.s3.model.PartETag;
import com.ibm.cloud.objectstorage.services.s3.model.UploadPartRequest;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.zip.CRC32;
import java.util.zip.CheckedOutputStream;
import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.LockReleaseFailedException;
public final class COSDirectory extends BaseDirectory {
private static final LockFactory COS_LOCK_FACTORY = new COSLockFactory();
private static final int READ_AHEAD_BUFFER_SIZE = 128 * 1024;
private static final int WRITE_BUFFER_SIZE = 1024 * 1024;
private static final String LEGAL_HOLD_ID = "COSDirectoryLock";
private final AmazonS3 s3;
private final String bucketName;
private final String prefix;
public COSDirectory(final AmazonS3 s3, final String bucketName, final String prefix) throws IOException {
this(s3, bucketName, prefix, COS_LOCK_FACTORY);
}
public COSDirectory(final AmazonS3 s3, final String bucketName, final String prefix, LockFactory lockFactory)
throws IOException {
super(lockFactory);
this.s3 = Objects.requireNonNull(s3);
this.bucketName = Objects.requireNonNull(bucketName);
this.prefix = Objects.requireNonNull(prefix);
}
@Override
public String[] listAll() throws IOException {
final int strip = prefix.length() + 1;
try {
final List<String> result = new LinkedList<String>();
String continuationToken = "";
ListObjectsV2Result listObjectsResult;
do {
var request = new ListObjectsV2Request()
.withBucketName(bucketName)
.withPrefix(prefix)
.withContinuationToken(continuationToken);
listObjectsResult = s3.listObjectsV2(request);
for (var s : listObjectsResult.getObjectSummaries()) {
result.add(s.getKey().substring(strip));
}
continuationToken = listObjectsResult.getContinuationToken();
} while (listObjectsResult.isTruncated());
return result.toArray(String[]::new);
} catch (final AmazonServiceException e) {
throw new IOException(String.format("listAll for %s/%s failed", bucketName, prefix), e);
}
}
@Override
public void deleteFile(String name) throws IOException {
try {
s3.deleteObject(bucketName, key(name));
} catch (final AmazonServiceException e) {
throw new IOException(String.format("deleteFile for %s/%s failed", bucketName, key(name)), e);
}
}
@Override
public long fileLength(String name) throws IOException {
return getObjectLength(key(name));
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
return new COSIndexOutput(name);
}
@Override
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
final var name = String.format("%s-%s-%s", prefix, UUID.randomUUID(), suffix);
return createOutput(name, context);
}
@Override
public void sync(Collection<String> names) throws IOException {
// no-op
}
@Override
public void syncMetaData() throws IOException {
// no-op
}
@Override
public void rename(String source, String dest) throws IOException {
try {
s3.copyObject(bucketName, key(source), bucketName, key(dest));
s3.deleteObject(bucketName, key(source));
} catch (final AmazonServiceException e) {
throw new IOException(
String.format("Rename of %s/%s to %s/%s failed", bucketName, key(source), bucketName, key(dest)),
e);
}
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
final var length = getObjectLength(key(name));
return new COSIndexInput(name, 0, length);
}
@Override
public void close() throws IOException {
// no-op
}
@Override
public Set<String> getPendingDeletions() throws IOException {
return Collections.emptySet();
}
public void cleanupMultipartUploads() throws IOException {
var request = new ListMultipartUploadsRequest(bucketName);
try {
MultipartUploadListing result;
do {
result = s3.listMultipartUploads(request);
for (var part : result.getMultipartUploads()) {
if (part.getKey().startsWith(prefix)) {
s3.abortMultipartUpload(
new AbortMultipartUploadRequest(bucketName, part.getKey(), part.getUploadId()));
}
}
request.setKeyMarker(result.getKeyMarker());
request.setUploadIdMarker(result.getUploadIdMarker());
} while (result.isTruncated());
} catch (final AmazonServiceException e) {
throw new IOException(String.format("Failed to cleanup multipart uploads in %s/%s", bucketName, prefix), e);
}
}
private String key(final String name) {
return String.format("%s/%s", prefix, name);
}
private class COSIndexInput extends IndexInput {
private final String name;
private final long offset;
private final long length;
private long pos;
private byte[] buffer;
private long bufferOffset;
private COSIndexInput(final String name, final long offset, final long length) {
super(String.format("cos://%s/%s/%s", bucketName, prefix, name));
this.name = Objects.requireNonNull(name);
this.offset = offset;
this.length = length;
}
@Override
public void close() throws IOException {
// no-op
}
@Override
public COSIndexInput clone() {
final COSIndexInput result = new COSIndexInput(name, offset, length);
result.pos = pos;
return result;
}
@Override
public long getFilePointer() {
return pos;
}
@Override
public void seek(long pos) throws IOException {
if (pos > length) {
throw new EOFException();
}
this.pos = pos;
}
@Override
public long length() {
return length;
}
@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
return new COSIndexInput(name, offset, length);
}
@Override
public byte readByte() throws IOException {
readAhead(1);
this.pos++;
return buffer[(int) (this.offset + this.pos - 1 - bufferOffset)];
}
@Override
public void readBytes(final byte[] b, final int offset, final int len) throws IOException {
readAhead(len);
System.arraycopy(buffer, (int) (this.offset + this.pos - bufferOffset), b, offset, len);
this.pos += len;
}
private void readAhead(final int min) throws IOException {
// Ensure we have buffered at least offset + pos to offset + pos + min from COS.
// To save roundtrips read ahead a full BUFFER_SIZE unless min is even larger.
if (buffer == null
|| !(this.offset + this.pos >= this.bufferOffset
&& this.offset + this.pos + min <= this.bufferOffset + this.buffer.length)) {
final var request = new GetObjectRequest(bucketName, key(name));
request.setRange(
this.offset + this.pos, this.offset + this.pos + Math.max(min, READ_AHEAD_BUFFER_SIZE));
try {
final var object = s3.getObject(request);
try (var in = object.getObjectContent()) {
this.buffer = in.readAllBytes();
this.bufferOffset = this.offset + this.pos;
}
} catch (final AmazonServiceException e) {
throw new IOException(String.format("readByte failed for %s/%s", bucketName, key(name)), e);
}
}
}
}
private long getObjectLength(final String key) throws IOException {
try {
return s3.getObjectMetadata(bucketName, key).getContentLength();
} catch (final AmazonServiceException e) {
throw new IOException(String.format("getObjectLength failed for %s/%s", bucketName, key), e);
}
}
private static class COSLockFactory extends LockFactory {
@Override
public Lock obtainLock(Directory dir, String lockName) throws IOException {
if (!(dir instanceof COSDirectory)) {
throw new IllegalArgumentException(dir + " not supported by this lock factory");
}
final var cosDir = (COSDirectory) dir;
try {
cosDir.s3.addLegalHold(cosDir.bucketName, cosDir.key(lockName), LEGAL_HOLD_ID);
} catch (final AmazonServiceException e) {
throw new LockObtainFailedException(
String.format("Failed to lock %s/%s", cosDir.bucketName, cosDir.key(lockName)), e);
}
cosDir.cleanupMultipartUploads();
return new COSLock(cosDir, lockName);
}
}
private class COSIndexOutput extends IndexOutput {
private final String name;
private final String uploadId;
private final List<PartETag> partETags;
private final CRC32 crc;
private final ByteArrayOutputStream bos;
private final CheckedOutputStream cos;
private int partNumber;
private long bytesWritten;
private boolean failed;
private COSIndexOutput(final String name) throws IOException {
super(String.format("cos://%s/%s/%s", bucketName, prefix, name), name);
final var request = new InitiateMultipartUploadRequest(bucketName, key(name));
try {
final var result = s3.initiateMultipartUpload(request);
this.uploadId = result.getUploadId();
} catch (final AmazonServiceException e) {
throw new IOException(
String.format("InitiateMultipartUpload failed for %s/%s", bucketName, key(name)), e);
}
this.name = Objects.requireNonNull(name);
this.partETags = new LinkedList<PartETag>();
this.crc = new CRC32();
this.bos = new ByteArrayOutputStream(WRITE_BUFFER_SIZE);
this.cos = new CheckedOutputStream(bos, crc);
this.partNumber = 1;
}
@Override
public void close() throws IOException {
cos.close();
if (failed) {
try {
s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key(name), uploadId));
} catch (final AmazonServiceException e) {
throw new IOException(
String.format("abortMultipartUpload failed for %s/%s", bucketName, key(name)), e);
}
} else {
flush(true);
final var request = new CompleteMultipartUploadRequest(bucketName, key(name), uploadId, partETags);
try {
s3.completeMultipartUpload(request);
} catch (final AmazonServiceException e) {
throw new IOException(
String.format("completeMultipartUpload failed for %s/%s", bucketName, key(name)), e);
}
}
}
@Override
public long getFilePointer() {
return bytesWritten;
}
@Override
public long getChecksum() throws IOException {
return crc.getValue();
}
@Override
public void writeByte(final byte b) throws IOException {
assert bos.size() < WRITE_BUFFER_SIZE : "buffer should never be full at start of method";
cos.write(b);
bytesWritten++;
flush(false);
}
@Override
public void writeBytes(final byte[] b, int offset, int length) throws IOException {
assert bos.size() < WRITE_BUFFER_SIZE : "buffer should never be full at start of method";
while (length > 0) {
var l = (int) Math.min(WRITE_BUFFER_SIZE - bos.size(), length);
cos.write(b, offset, l);
bytesWritten += l;
offset += l;
length -= l;
flush(false);
}
}
private void flush(final boolean force) throws IOException {
var bufferSize = bos.size();
if (bufferSize == WRITE_BUFFER_SIZE || (force && bufferSize > 0)) {
var bytes = bos.toByteArray();
bos.reset();
var request = new UploadPartRequest()
.withBucketName(bucketName)
.withKey(key(name))
.withUploadId(uploadId)
.withPartSize(bytes.length)
.withPartNumber(partNumber)
.withInputStream(new ByteArrayInputStream(bytes));
try {
var result = s3.uploadPart(request);
this.partNumber++;
this.partETags.add(result.getPartETag());
} catch (final AmazonServiceException e) {
failed = true;
throw new IOException(String.format("uploadPart failed for %s/%s", bucketName, key(name)), e);
}
}
}
}
private static class COSLock extends Lock {
private final COSDirectory cosDir;
private final String lockName;
private COSLock(final COSDirectory cosDir, final String lockName) {
this.cosDir = Objects.requireNonNull(cosDir);
this.lockName = Objects.requireNonNull(lockName);
}
@Override
public void close() throws IOException {
try {
cosDir.s3.deleteLegalHold(cosDir.bucketName, cosDir.key(lockName), LEGAL_HOLD_ID);
} catch (final AmazonServiceException e) {
throw new LockReleaseFailedException(
String.format("Failed to release %s/%s", cosDir.bucketName, cosDir.key(lockName)), e);
}
}
@Override
public void ensureValid() throws IOException {
try {
final var holds = cosDir.s3.listLegalHolds(cosDir.bucketName, cosDir.key(lockName));
final var isValid =
holds.getLegalHolds().stream().anyMatch(h -> h.getId().equals(LEGAL_HOLD_ID));
if (!isValid) {
throw new IOException("Lock no longer held");
}
} catch (final AmazonServiceException e) {
throw new IOException(e);
}
}
}
}