blob: a6b032fc83103a47cfd09a02e8d11890d8514ca3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.gcs;
import com.google.api.gax.paging.Page;
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.CopyWriter;
import com.google.cloud.storage.HmacKey;
import com.google.cloud.storage.PostPolicyV4;
import com.google.cloud.storage.ServiceAccount;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageOptions;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Provides a thread-safe wrapper around any {@link Storage} implementations which may not offer thread safety.
*
* The {@link Storage} implementation provided by google-cloud-nio for in-memory testing is thread-unsafe. This causes
* a problem for backup-restore tests, which have threads reading and writing cloud-store data from the test threads,
* overseer nodes, as well as any nodes hosting individual cores. This class takes a heavyhanded approach to solving
* this by adding synchronization across all instance methods. This allows google-cloud-nio's in-memory fake to be used
* from multiple threads simultaneously as our tests require.
*/
public class ConcurrentDelegatingStorage implements Storage {
private final Storage delegate;
public ConcurrentDelegatingStorage(Storage delegate) {
this.delegate = delegate;
}
@Override
public synchronized Bucket create(BucketInfo bucketInfo, BucketTargetOption... options) { return delegate.create(bucketInfo, options); }
@Override
public synchronized Blob create(BlobInfo blobInfo, BlobTargetOption... options) { return delegate.create(blobInfo, options); }
@Override
public synchronized Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options) { return delegate.create(blobInfo, content, options); }
@Override
public synchronized Blob create(BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options) { return delegate.create(blobInfo, content, offset, length, options); }
@Override
public synchronized Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) { return delegate.create(blobInfo, content, options); }
@Override
public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... blobWriteOptions) throws IOException { return delegate.createFrom(blobInfo, path, blobWriteOptions); }
@Override
public Blob createFrom(BlobInfo blobInfo, Path path, int i, BlobWriteOption... blobWriteOptions) throws IOException { return delegate.createFrom(blobInfo, path, i, blobWriteOptions); }
@Override
public Blob createFrom(BlobInfo blobInfo, InputStream inputStream, BlobWriteOption... blobWriteOptions) throws IOException { return delegate.createFrom(blobInfo, inputStream, blobWriteOptions); }
@Override
public Blob createFrom(BlobInfo blobInfo, InputStream inputStream, int i, BlobWriteOption... blobWriteOptions) throws IOException { return delegate.createFrom(blobInfo, inputStream, i, blobWriteOptions); }
@Override
public synchronized Bucket get(String bucket, BucketGetOption... options) { return delegate.get(bucket, options); }
@Override
public synchronized Bucket lockRetentionPolicy(BucketInfo bucket, BucketTargetOption... options) { return delegate.lockRetentionPolicy(bucket, options); }
@Override
public synchronized Blob get(String bucket, String blob, BlobGetOption... options) { return delegate.get(bucket, blob, options); }
@Override
public synchronized Blob get(BlobId blob, BlobGetOption... options) { return delegate.get(blob, options); }
@Override
public synchronized Blob get(BlobId blob) { return delegate.get(blob); }
@Override
public synchronized Page<Bucket> list(BucketListOption... options) { return delegate.list(options); }
@Override
public synchronized Page<Blob> list(String bucket, BlobListOption... options) { return delegate.list(bucket, options); }
@Override
public synchronized Bucket update(BucketInfo bucketInfo, BucketTargetOption... options) { return delegate.update(bucketInfo, options); }
@Override
public synchronized Blob update(BlobInfo blobInfo, BlobTargetOption... options) { return delegate.update(blobInfo, options); }
@Override
public synchronized Blob update(BlobInfo blobInfo) { return delegate.update(blobInfo); }
@Override
public synchronized boolean delete(String bucket, BucketSourceOption... options) { return delegate.delete(bucket, options); }
@Override
public synchronized boolean delete(String bucket, String blob, BlobSourceOption... options) { return delegate.delete(bucket, blob, options); }
@Override
public synchronized boolean delete(BlobId blob, BlobSourceOption... options) { return delegate.delete(blob, options); }
@Override
public synchronized boolean delete(BlobId blob) { return delegate.delete(blob); }
@Override
public synchronized Blob compose(ComposeRequest composeRequest) { return delegate.compose(composeRequest); }
@Override
public synchronized CopyWriter copy(CopyRequest copyRequest) { return delegate.copy(copyRequest); }
@Override
public synchronized byte[] readAllBytes(String bucket, String blob, BlobSourceOption... options) { return delegate.readAllBytes(bucket, blob, options); }
@Override
public synchronized byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { return delegate.readAllBytes(blob, options); }
@Override
public synchronized StorageBatch batch() { return delegate.batch(); }
@Override
public synchronized ReadChannel reader(String bucket, String blob, BlobSourceOption... options) { return new ConcurrentReadChannel(this, delegate.reader(bucket, blob, options)); }
@Override
public synchronized ReadChannel reader(BlobId blob, BlobSourceOption... options) { return new ConcurrentReadChannel(this, delegate.reader(blob, options)); }
@Override
public synchronized WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { return new ConcurrentWriteChannel(this, delegate.writer(blobInfo, options)); }
@Override
public synchronized WriteChannel writer(URL signedURL) { return new ConcurrentWriteChannel(this, delegate.writer(signedURL)); }
@Override
public synchronized URL signUrl(BlobInfo blobInfo, long duration, TimeUnit unit, SignUrlOption... options) { return delegate.signUrl(blobInfo, duration, unit, options); }
@Override
public PostPolicyV4 generateSignedPostPolicyV4(BlobInfo blobInfo, long l, TimeUnit timeUnit, PostPolicyV4.PostFieldsV4 postFieldsV4, PostPolicyV4.PostConditionsV4 postConditionsV4, PostPolicyV4Option... postPolicyV4Options) { return delegate.generateSignedPostPolicyV4(blobInfo, l, timeUnit, postFieldsV4, postConditionsV4, postPolicyV4Options); }
@Override
public PostPolicyV4 generateSignedPostPolicyV4(BlobInfo blobInfo, long l, TimeUnit timeUnit, PostPolicyV4.PostFieldsV4 postFieldsV4, PostPolicyV4Option... postPolicyV4Options) { return delegate.generateSignedPostPolicyV4(blobInfo, l, timeUnit, postFieldsV4, postPolicyV4Options); }
@Override
public PostPolicyV4 generateSignedPostPolicyV4(BlobInfo blobInfo, long l, TimeUnit timeUnit, PostPolicyV4.PostConditionsV4 postConditionsV4, PostPolicyV4Option... postPolicyV4Options) { return delegate.generateSignedPostPolicyV4(blobInfo, l, timeUnit, postConditionsV4, postPolicyV4Options); }
@Override
public PostPolicyV4 generateSignedPostPolicyV4(BlobInfo blobInfo, long l, TimeUnit timeUnit, PostPolicyV4Option... postPolicyV4Options) { return delegate.generateSignedPostPolicyV4(blobInfo, l, timeUnit, postPolicyV4Options); }
@Override
public synchronized List<Blob> get(BlobId... blobIds) { return delegate.get(blobIds); }
@Override
public synchronized List<Blob> get(Iterable<BlobId> blobIds) { return delegate.get(blobIds); }
@Override
public synchronized List<Blob> update(BlobInfo... blobInfos) { return delegate.update(blobInfos); }
@Override
public synchronized List<Blob> update(Iterable<BlobInfo> blobInfos) { return delegate.update(blobInfos); }
@Override
public synchronized List<Boolean> delete(BlobId... blobIds) { return delegate.delete(blobIds); }
@Override
public synchronized List<Boolean> delete(Iterable<BlobId> blobIds) { return delegate.delete(blobIds); }
@Override
public synchronized Acl getAcl(String bucket, Acl.Entity entity, BucketSourceOption... options) { return delegate.getAcl(bucket, entity, options); }
@Override
public synchronized Acl getAcl(String bucket, Acl.Entity entity) { return delegate.getAcl(bucket, entity); }
@Override
public synchronized boolean deleteAcl(String bucket, Acl.Entity entity, BucketSourceOption... options) { return deleteAcl(bucket, entity, options); }
@Override
public synchronized boolean deleteAcl(String bucket, Acl.Entity entity) { return delegate.deleteAcl(bucket, entity); }
@Override
public synchronized Acl createAcl(String bucket, Acl acl, BucketSourceOption... options) { return delegate.createAcl(bucket, acl, options); }
@Override
public synchronized Acl createAcl(String bucket, Acl acl) { return delegate.createAcl(bucket, acl); }
@Override
public synchronized Acl updateAcl(String bucket, Acl acl, BucketSourceOption... options) { return delegate.updateAcl(bucket, acl, options); }
@Override
public synchronized Acl updateAcl(String bucket, Acl acl) { return delegate.updateAcl(bucket, acl); }
@Override
public synchronized List<Acl> listAcls(String bucket, BucketSourceOption... options) { return delegate.listAcls(bucket, options); }
@Override
public synchronized List<Acl> listAcls(String bucket) { return delegate.listAcls(bucket); }
@Override
public synchronized Acl getDefaultAcl(String bucket, Acl.Entity entity) { return delegate.getDefaultAcl(bucket, entity); }
@Override
public synchronized boolean deleteDefaultAcl(String bucket, Acl.Entity entity) { return delegate.deleteDefaultAcl(bucket, entity); }
@Override
public synchronized Acl createDefaultAcl(String bucket, Acl acl) { return delegate.createDefaultAcl(bucket, acl); }
@Override
public synchronized Acl updateDefaultAcl(String bucket, Acl acl) { return delegate.updateDefaultAcl(bucket, acl); }
@Override
public synchronized List<Acl> listDefaultAcls(String bucket) { return delegate.listDefaultAcls(bucket); }
@Override
public synchronized Acl getAcl(BlobId blob, Acl.Entity entity) { return delegate.getAcl(blob, entity); }
@Override
public synchronized boolean deleteAcl(BlobId blob, Acl.Entity entity) { return delegate.deleteAcl(blob, entity); }
@Override
public synchronized Acl createAcl(BlobId blob, Acl acl) { return delegate.createAcl(blob, acl); }
@Override
public synchronized Acl updateAcl(BlobId blob, Acl acl) { return delegate.updateAcl(blob, acl); }
@Override
public synchronized List<Acl> listAcls(BlobId blob) { return delegate.listAcls(blob); }
@Override
public HmacKey createHmacKey(ServiceAccount serviceAccount, CreateHmacKeyOption... createHmacKeyOptions) { return delegate.createHmacKey(serviceAccount, createHmacKeyOptions); }
@Override
public Page<HmacKey.HmacKeyMetadata> listHmacKeys(ListHmacKeysOption... listHmacKeysOptions) { return delegate.listHmacKeys(listHmacKeysOptions); }
@Override
public HmacKey.HmacKeyMetadata getHmacKey(String s, GetHmacKeyOption... getHmacKeyOptions) { return delegate.getHmacKey(s, getHmacKeyOptions); }
@Override
public void deleteHmacKey(HmacKey.HmacKeyMetadata hmacKeyMetadata, DeleteHmacKeyOption... deleteHmacKeyOptions) { delegate.deleteHmacKey(hmacKeyMetadata, deleteHmacKeyOptions); }
@Override
public HmacKey.HmacKeyMetadata updateHmacKeyState(HmacKey.HmacKeyMetadata hmacKeyMetadata, HmacKey.HmacKeyState hmacKeyState, UpdateHmacKeyOption... updateHmacKeyOptions) { return delegate.updateHmacKeyState(hmacKeyMetadata, hmacKeyState, updateHmacKeyOptions); }
@Override
public synchronized Policy getIamPolicy(String bucket, BucketSourceOption... options) {
return delegate.getIamPolicy(bucket, options);
}
@Override
public synchronized Policy setIamPolicy(String bucket, Policy policy, BucketSourceOption... options) {
return delegate.setIamPolicy(bucket, policy, options);
}
@Override
public synchronized List<Boolean> testIamPermissions(String bucket, List<String> permissions, BucketSourceOption... options) {
return delegate.testIamPermissions(bucket, permissions, options);
}
@Override
public synchronized ServiceAccount getServiceAccount(String projectId) {
return delegate.getServiceAccount(projectId);
}
@Override
public synchronized StorageOptions getOptions() {
return delegate.getOptions();
}
public static class ConcurrentReadChannel implements ReadChannel {
private final ConcurrentDelegatingStorage storage;
private final ReadChannel delegate;
public ConcurrentReadChannel(ConcurrentDelegatingStorage storage, ReadChannel delegate) {
this.storage = storage;
this.delegate = delegate;
}
@Override
public boolean isOpen() {
synchronized (storage) {
return delegate.isOpen();
}
}
@Override
public void close() {
synchronized (storage) {
delegate.close();
}
}
@Override
public void seek(long position) throws IOException {
synchronized (storage) {
delegate.seek(position);
}
}
@Override
public void setChunkSize(int chunkSize) {
synchronized (storage) {
delegate.setChunkSize(chunkSize);
}
}
@Override
public RestorableState<ReadChannel> capture() {
synchronized (storage) {
return delegate.capture();
}
}
@Override
public int read(ByteBuffer dst) throws IOException {
synchronized (storage) {
return delegate.read(dst);
}
}
}
public static class ConcurrentWriteChannel implements WriteChannel {
private final ConcurrentDelegatingStorage storage;
private final WriteChannel delegate;
public ConcurrentWriteChannel(ConcurrentDelegatingStorage storage, WriteChannel delegate) {
this.storage = storage;
this.delegate = delegate;
}
@Override
public void setChunkSize(int chunkSize) {
synchronized (storage) {
delegate.setChunkSize(chunkSize);
}
}
@Override
public RestorableState<WriteChannel> capture() {
synchronized (storage) {
return delegate.capture();
}
}
@Override
public int write(ByteBuffer src) throws IOException {
synchronized (storage) {
return delegate.write(src);
}
}
@Override
public boolean isOpen() {
synchronized (storage) {
return delegate.isOpen();
}
}
@Override
public void close() throws IOException {
synchronized (storage) {
delegate.close();
}
}
}
}