EXPERIMENTAL: COS-backed directory
read caching will be essential. do this properly
diff --git a/Makefile b/Makefile
index 1dd26ca..a14bdcb 100644
--- a/Makefile
+++ b/Makefile
@@ -475,10 +475,7 @@
@rm -f src/couch/priv/couch_js/config.h
@rm -f dev/*.beam dev/devnode.* dev/pbkdf2.pyc log/crash.log
@rm -f src/couch_dist/certs/out
-ifeq ($(with_nouveau), 1)
- @cd nouveau && ./gradlew clean
-endif
-
+ @$(MAKE) nouveau-clean
.PHONY: distclean
# target: distclean - Remove build and release artifacts
@@ -551,6 +548,12 @@
@cd nouveau && ./gradlew build -x test
endif
+.PHONY: nouveau-clean
+nouveau-clean:
+ifeq ($(with_nouveau), 1)
+ @cd nouveau && ./gradlew clean
+endif
+
.PHONY: nouveau-test
nouveau-test: nouveau-test-gradle nouveau-test-elixir
diff --git a/nouveau/build.gradle b/nouveau/build.gradle
index 38a92fa..232e6f5 100644
--- a/nouveau/build.gradle
+++ b/nouveau/build.gradle
@@ -35,15 +35,15 @@
implementation group: 'org.apache.lucene', name: 'lucene-facet', version: luceneVersion
implementation group: 'org.apache.lucene', name: 'lucene-misc', version: luceneVersion
+ implementation group: 'com.ibm.cos', name: 'ibm-cos-java-sdk', version: '2.13.1'
+
def swaggerVersion = '2.2.8'
implementation group: 'io.swagger.core.v3', name: 'swagger-jaxrs2-jakarta', version: swaggerVersion
implementation group: 'io.swagger.core.v3', name: 'swagger-jaxrs2-servlet-initializer-v2', version: swaggerVersion
testImplementation platform('org.junit:junit-bom:5.8.2')
- testImplementation 'org.junit.jupiter:junit-jupiter-api'
- testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
+ testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testImplementation 'org.assertj:assertj-core'
- testImplementation 'org.mockito:mockito-core'
}
group = 'org.apache.couchdb.nouveau'
diff --git a/nouveau/gradle/wrapper/gradle-wrapper.properties b/nouveau/gradle/wrapper/gradle-wrapper.properties
index 37aef8d..c30b486 100644
--- a/nouveau/gradle/wrapper/gradle-wrapper.properties
+++ b/nouveau/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/COSDirectory.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/COSDirectory.java
new file mode 100644
index 0000000..ffb8ce2
--- /dev/null
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/lucene9/COSDirectory.java
@@ -0,0 +1,463 @@
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.couchdb.nouveau.lucene9;
+
+import com.ibm.cloud.objectstorage.AmazonServiceException;
+import com.ibm.cloud.objectstorage.services.s3.AmazonS3;
+import com.ibm.cloud.objectstorage.services.s3.model.AbortMultipartUploadRequest;
+import com.ibm.cloud.objectstorage.services.s3.model.CompleteMultipartUploadRequest;
+import com.ibm.cloud.objectstorage.services.s3.model.GetObjectRequest;
+import com.ibm.cloud.objectstorage.services.s3.model.InitiateMultipartUploadRequest;
+import com.ibm.cloud.objectstorage.services.s3.model.ListMultipartUploadsRequest;
+import com.ibm.cloud.objectstorage.services.s3.model.ListObjectsV2Request;
+import com.ibm.cloud.objectstorage.services.s3.model.ListObjectsV2Result;
+import com.ibm.cloud.objectstorage.services.s3.model.MultipartUploadListing;
+import com.ibm.cloud.objectstorage.services.s3.model.PartETag;
+import com.ibm.cloud.objectstorage.services.s3.model.UploadPartRequest;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import org.apache.lucene.store.BaseDirectory;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+import org.apache.lucene.store.LockFactory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.LockReleaseFailedException;
+
+public final class COSDirectory extends BaseDirectory {
+
+ private static final LockFactory COS_LOCK_FACTORY = new COSLockFactory();
+
+ private static final int READ_AHEAD_BUFFER_SIZE = 128 * 1024;
+
+ private static final int WRITE_BUFFER_SIZE = 1024 * 1024;
+
+ private static final String LEGAL_HOLD_ID = "COSDirectoryLock";
+
+ private final AmazonS3 s3;
+
+ private final String bucketName;
+
+ private final String prefix;
+
+ public COSDirectory(final AmazonS3 s3, final String bucketName, final String prefix) throws IOException {
+ this(s3, bucketName, prefix, COS_LOCK_FACTORY);
+ }
+
+ public COSDirectory(final AmazonS3 s3, final String bucketName, final String prefix, LockFactory lockFactory)
+ throws IOException {
+ super(lockFactory);
+ this.s3 = Objects.requireNonNull(s3);
+ this.bucketName = Objects.requireNonNull(bucketName);
+ this.prefix = Objects.requireNonNull(prefix);
+ }
+
+ @Override
+ public String[] listAll() throws IOException {
+ final int strip = prefix.length() + 1;
+ try {
+ final List<String> result = new LinkedList<String>();
+ String continuationToken = "";
+ ListObjectsV2Result listObjectsResult;
+ do {
+ var request = new ListObjectsV2Request()
+ .withBucketName(bucketName)
+ .withPrefix(prefix)
+ .withContinuationToken(continuationToken);
+ listObjectsResult = s3.listObjectsV2(request);
+ for (var s : listObjectsResult.getObjectSummaries()) {
+ result.add(s.getKey().substring(strip));
+ }
+ continuationToken = listObjectsResult.getContinuationToken();
+ } while (listObjectsResult.isTruncated());
+ return result.toArray(String[]::new);
+ } catch (final AmazonServiceException e) {
+ throw new IOException(String.format("listAll for %s/%s failed", bucketName, prefix), e);
+ }
+ }
+
+ @Override
+ public void deleteFile(String name) throws IOException {
+ try {
+ s3.deleteObject(bucketName, key(name));
+ } catch (final AmazonServiceException e) {
+ throw new IOException(String.format("deleteFile for %s/%s failed", bucketName, key(name)), e);
+ }
+ }
+
+ @Override
+ public long fileLength(String name) throws IOException {
+ return getObjectLength(key(name));
+ }
+
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ return new COSIndexOutput(name);
+ }
+
+ @Override
+ public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
+ final var name = String.format("%s-%s-%s", prefix, UUID.randomUUID(), suffix);
+ return createOutput(name, context);
+ }
+
+ @Override
+ public void sync(Collection<String> names) throws IOException {
+ // no-op
+ }
+
+ @Override
+ public void syncMetaData() throws IOException {
+ // no-op
+ }
+
+ @Override
+ public void rename(String source, String dest) throws IOException {
+ try {
+ s3.copyObject(bucketName, key(source), bucketName, key(dest));
+ s3.deleteObject(bucketName, key(source));
+ } catch (final AmazonServiceException e) {
+ throw new IOException(
+ String.format("Rename of %s/%s to %s/%s failed", bucketName, key(source), bucketName, key(dest)),
+ e);
+ }
+ }
+
+ @Override
+ public IndexInput openInput(String name, IOContext context) throws IOException {
+ final var length = getObjectLength(key(name));
+ return new COSIndexInput(name, 0, length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+
+ @Override
+ public Set<String> getPendingDeletions() throws IOException {
+ return Collections.emptySet();
+ }
+
+ public void cleanupMultipartUploads() throws IOException {
+ var request = new ListMultipartUploadsRequest(bucketName);
+ try {
+ MultipartUploadListing result;
+ do {
+ result = s3.listMultipartUploads(request);
+ for (var part : result.getMultipartUploads()) {
+ if (part.getKey().startsWith(prefix)) {
+ s3.abortMultipartUpload(
+ new AbortMultipartUploadRequest(bucketName, part.getKey(), part.getUploadId()));
+ }
+ }
+ request.setKeyMarker(result.getKeyMarker());
+ request.setUploadIdMarker(result.getUploadIdMarker());
+ } while (result.isTruncated());
+ } catch (final AmazonServiceException e) {
+ throw new IOException(String.format("Failed to cleanup multipart uploads in %s/%s", bucketName, prefix), e);
+ }
+ }
+
+ private String key(final String name) {
+ return String.format("%s/%s", prefix, name);
+ }
+
+ private class COSIndexInput extends IndexInput {
+
+ private final String name;
+
+ private final long offset;
+
+ private final long length;
+
+ private long pos;
+
+ private byte[] buffer;
+
+ private long bufferOffset;
+
+ private COSIndexInput(final String name, final long offset, final long length) {
+ super(String.format("cos://%s/%s/%s", bucketName, prefix, name));
+ this.name = Objects.requireNonNull(name);
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no-op
+ }
+
+ @Override
+ public COSIndexInput clone() {
+ final COSIndexInput result = new COSIndexInput(name, offset, length);
+ result.pos = pos;
+ return result;
+ }
+
+ @Override
+ public long getFilePointer() {
+ return pos;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ if (pos > length) {
+ throw new EOFException();
+ }
+ this.pos = pos;
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
+ return new COSIndexInput(name, offset, length);
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ readAhead(1);
+ this.pos++;
+ return buffer[(int) (this.offset + this.pos - 1 - bufferOffset)];
+ }
+
+ @Override
+ public void readBytes(final byte[] b, final int offset, final int len) throws IOException {
+ readAhead(len);
+ System.arraycopy(buffer, (int) (this.offset + this.pos - bufferOffset), b, offset, len);
+ this.pos += len;
+ }
+
+ private void readAhead(final int min) throws IOException {
+ // Ensure we have buffered at least offset + pos to offset + pos + min from COS.
+ // To save roundtrips read ahead a full BUFFER_SIZE unless min is even larger.
+ if (buffer == null
+ || !(this.offset + this.pos >= this.bufferOffset
+ && this.offset + this.pos + min <= this.bufferOffset + this.buffer.length)) {
+ final var request = new GetObjectRequest(bucketName, key(name));
+ request.setRange(
+ this.offset + this.pos, this.offset + this.pos + Math.max(min, READ_AHEAD_BUFFER_SIZE));
+ try {
+ final var object = s3.getObject(request);
+ try (var in = object.getObjectContent()) {
+ this.buffer = in.readAllBytes();
+ this.bufferOffset = this.offset + this.pos;
+ }
+ } catch (final AmazonServiceException e) {
+ throw new IOException(String.format("readByte failed for %s/%s", bucketName, key(name)), e);
+ }
+ }
+ }
+ }
+
+ private long getObjectLength(final String key) throws IOException {
+ try {
+ return s3.getObjectMetadata(bucketName, key).getContentLength();
+ } catch (final AmazonServiceException e) {
+ throw new IOException(String.format("getObjectLength failed for %s/%s", bucketName, key), e);
+ }
+ }
+
+ private static class COSLockFactory extends LockFactory {
+
+ @Override
+ public Lock obtainLock(Directory dir, String lockName) throws IOException {
+ if (!(dir instanceof COSDirectory)) {
+ throw new IllegalArgumentException(dir + " not supported by this lock factory");
+ }
+ final var cosDir = (COSDirectory) dir;
+ try {
+ cosDir.s3.addLegalHold(cosDir.bucketName, cosDir.key(lockName), LEGAL_HOLD_ID);
+ } catch (final AmazonServiceException e) {
+ throw new LockObtainFailedException(
+ String.format("Failed to lock %s/%s", cosDir.bucketName, cosDir.key(lockName)), e);
+ }
+ cosDir.cleanupMultipartUploads();
+ return new COSLock(cosDir, lockName);
+ }
+ }
+
+ private class COSIndexOutput extends IndexOutput {
+
+ private final String name;
+
+ private final String uploadId;
+
+ private final List<PartETag> partETags;
+
+ private final CRC32 crc;
+
+ private final ByteArrayOutputStream bos;
+
+ private final CheckedOutputStream cos;
+
+ private int partNumber;
+
+ private long bytesWritten;
+
+ private boolean failed;
+
+ private COSIndexOutput(final String name) throws IOException {
+ super(String.format("cos://%s/%s/%s", bucketName, prefix, name), name);
+
+ final var request = new InitiateMultipartUploadRequest(bucketName, key(name));
+ try {
+ final var result = s3.initiateMultipartUpload(request);
+ this.uploadId = result.getUploadId();
+ } catch (final AmazonServiceException e) {
+ throw new IOException(
+ String.format("InitiateMultipartUpload failed for %s/%s", bucketName, key(name)), e);
+ }
+
+ this.name = Objects.requireNonNull(name);
+ this.partETags = new LinkedList<PartETag>();
+ this.crc = new CRC32();
+ this.bos = new ByteArrayOutputStream(WRITE_BUFFER_SIZE);
+ this.cos = new CheckedOutputStream(bos, crc);
+ this.partNumber = 1;
+ }
+
+ @Override
+ public void close() throws IOException {
+ cos.close();
+
+ if (failed) {
+ try {
+ s3.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, key(name), uploadId));
+ } catch (final AmazonServiceException e) {
+ throw new IOException(
+ String.format("abortMultipartUpload failed for %s/%s", bucketName, key(name)), e);
+ }
+ } else {
+ flush(true);
+ final var request = new CompleteMultipartUploadRequest(bucketName, key(name), uploadId, partETags);
+ try {
+ s3.completeMultipartUpload(request);
+ } catch (final AmazonServiceException e) {
+ throw new IOException(
+ String.format("completeMultipartUpload failed for %s/%s", bucketName, key(name)), e);
+ }
+ }
+ }
+
+ @Override
+ public long getFilePointer() {
+ return bytesWritten;
+ }
+
+ @Override
+ public long getChecksum() throws IOException {
+ return crc.getValue();
+ }
+
+ @Override
+ public void writeByte(final byte b) throws IOException {
+ assert bos.size() < WRITE_BUFFER_SIZE : "buffer should never be full at start of method";
+ cos.write(b);
+ bytesWritten++;
+ flush(false);
+ }
+
+ @Override
+ public void writeBytes(final byte[] b, int offset, int length) throws IOException {
+ assert bos.size() < WRITE_BUFFER_SIZE : "buffer should never be full at start of method";
+
+ while (length > 0) {
+ var l = (int) Math.min(WRITE_BUFFER_SIZE - bos.size(), length);
+ cos.write(b, offset, l);
+ bytesWritten += l;
+ offset += l;
+ length -= l;
+ flush(false);
+ }
+ }
+
+ private void flush(final boolean force) throws IOException {
+ var bufferSize = bos.size();
+ if (bufferSize == WRITE_BUFFER_SIZE || (force && bufferSize > 0)) {
+ var bytes = bos.toByteArray();
+ bos.reset();
+ var request = new UploadPartRequest()
+ .withBucketName(bucketName)
+ .withKey(key(name))
+ .withUploadId(uploadId)
+ .withPartSize(bytes.length)
+ .withPartNumber(partNumber)
+ .withInputStream(new ByteArrayInputStream(bytes));
+ try {
+ var result = s3.uploadPart(request);
+ this.partNumber++;
+ this.partETags.add(result.getPartETag());
+ } catch (final AmazonServiceException e) {
+ failed = true;
+ throw new IOException(String.format("uploadPart failed for %s/%s", bucketName, key(name)), e);
+ }
+ }
+ }
+ }
+
+ private static class COSLock extends Lock {
+
+ private final COSDirectory cosDir;
+
+ private final String lockName;
+
+ private COSLock(final COSDirectory cosDir, final String lockName) {
+ this.cosDir = Objects.requireNonNull(cosDir);
+ this.lockName = Objects.requireNonNull(lockName);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ cosDir.s3.deleteLegalHold(cosDir.bucketName, cosDir.key(lockName), LEGAL_HOLD_ID);
+ } catch (final AmazonServiceException e) {
+ throw new LockReleaseFailedException(
+ String.format("Failed to release %s/%s", cosDir.bucketName, cosDir.key(lockName)), e);
+ }
+ }
+
+ @Override
+ public void ensureValid() throws IOException {
+ try {
+ final var holds = cosDir.s3.listLegalHolds(cosDir.bucketName, cosDir.key(lockName));
+ final var isValid =
+ holds.getLegalHolds().stream().anyMatch(h -> h.getId().equals(LEGAL_HOLD_ID));
+ if (!isValid) {
+ throw new IOException("Lock no longer held");
+ }
+ } catch (final AmazonServiceException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/COSDirectoryTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/COSDirectoryTest.java
new file mode 100644
index 0000000..e2e2000
--- /dev/null
+++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/COSDirectoryTest.java
@@ -0,0 +1,66 @@
+package org.apache.couchdb.nouveau.lucene9;
+
+import com.ibm.cloud.objectstorage.ClientConfiguration;
+import com.ibm.cloud.objectstorage.client.builder.AwsClientBuilder.EndpointConfiguration;
+import com.ibm.cloud.objectstorage.services.s3.AmazonS3;
+import com.ibm.cloud.objectstorage.services.s3.AmazonS3ClientBuilder;
+import java.io.IOException;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DoublePoint;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.NoLockFactory;
+import org.junit.jupiter.api.Test;
+
+public class COSDirectoryTest {
+
+ @Test
+ public void testBasics() throws Exception {
+ AmazonS3 s3 = createClient("s3.eu-gb.cloud-object-storage.appdomain.cloud", "eu-gb");
+
+ var cosDir = new COSDirectory(s3, "rnewson-cosdir-demo", "index1", NoLockFactory.INSTANCE);
+ cleanup(cosDir);
+ try {
+ var conf = new IndexWriterConfig(new StandardAnalyzer());
+ conf.setUseCompoundFile(false);
+
+ try (var writer = new IndexWriter(cosDir, conf)) {
+ var doc = new Document();
+ doc.add(new TextField("foo", "hello there", Store.YES));
+ doc.add(new StoredField("bar", "bar"));
+ doc.add(new DoublePoint("baz", 12.0));
+ writer.addDocument(doc);
+ writer.commit();
+ }
+ } finally {
+ cleanup(cosDir);
+ }
+ }
+
+ private static AmazonS3 createClient(String endpointUrl, String location) {
+ ClientConfiguration clientConfig = new ClientConfiguration()
+ .withRequestTimeout(5000)
+ .withMaxErrorRetry(0)
+ .withMaxConnections(1);
+
+ return AmazonS3ClientBuilder.standard()
+ .withEndpointConfiguration(new EndpointConfiguration(endpointUrl, location))
+ .withPathStyleAccessEnabled(true)
+ .withClientConfiguration(clientConfig)
+ .build();
+ }
+
+ private void cleanup(final Directory dir) throws IOException {
+ for (String name : dir.listAll()) {
+ dir.deleteFile(name);
+ }
+ if (dir instanceof COSDirectory) {
+ ((COSDirectory) dir).cleanupMultipartUploads();
+ }
+ }
+}