blob: 59b9458bb6cd9e6c4b9ef4be0bc0192b9fbf7766 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.blob.cloud.s3;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import javax.jcr.RepositoryException;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.jackrabbit.core.data.DataRecord;
import org.apache.jackrabbit.core.data.DataStore;
import org.apache.jackrabbit.core.data.DataStoreException;
import org.apache.jackrabbit.oak.plugins.blob.datastore.AbstractDataStoreTest;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.ConfigurableDataRecordAccessProvider;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordAccessProvider;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordDownloadOptions;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUpload;
import org.apache.jackrabbit.oak.plugins.blob.datastore.directaccess.DataRecordUploadException;
import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.getFixtures;
import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.getS3Config;
import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.getS3DataStore;
import static org.apache.jackrabbit.oak.blob.cloud.s3.S3DataStoreUtils.isS3Configured;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
/**
* Test {@link S3DataStore} with S3Backend and local cache on.
* It requires to pass aws config file via system property or system properties by prefixing with 'ds.'.
* See details @ {@link S3DataStoreUtils}.
* For e.g. -Dconfig=/opt/cq/aws.properties. Sample aws properties located at
* src/test/resources/aws.properties
*/
@RunWith(Parameterized.class)
public class TestS3Ds extends AbstractDataStoreTest {
protected static final Logger LOG = LoggerFactory.getLogger(TestS3Ds.class);
protected static long ONE_KB = 1024;
protected static long ONE_MB = ONE_KB * ONE_KB;
protected static long ONE_HUNDRED_MB = ONE_MB * 100;
protected static long ONE_GB = ONE_HUNDRED_MB * 10;
private static Date overallStartTime = getBackdatedDate();
private Date thisTestStartTime = null;
protected Properties props;
protected String bucket;
@Parameterized.Parameter
public String s3Class;
@Parameterized.Parameters(name = "{index}: ({0})")
public static List<String> fixtures() {
return getFixtures();
}
public static Date getBackdatedDate() {
// Use a backdated date to accommodate time drift when deleting created resources.
return DateUtils.addMinutes(new Date(), -1);
}
@BeforeClass
public static void assumptions() {
assumeTrue(isS3Configured());
}
private static List<String> createdBucketNames = Lists.newArrayList();
@Override
@Before
public void setUp() throws Exception {
props = getS3Config();
thisTestStartTime = getBackdatedDate();
bucket = randomGen.nextInt(9999) + "-" +
randomGen.nextInt(9999) + "-s3ds-unittest-autogenerated";
createdBucketNames.add(bucket);
props.setProperty(S3Constants.S3_BUCKET, bucket);
props.setProperty("secret", "123456");
props.setProperty(S3Constants.PRESIGNED_HTTP_DOWNLOAD_URI_EXPIRY_SECONDS,"60");
props.setProperty(S3Constants.PRESIGNED_HTTP_UPLOAD_URI_EXPIRY_SECONDS, "60");
props.setProperty(S3Constants.PRESIGNED_URI_ENABLE_ACCELERATION, "60");
props.setProperty(S3Constants.PRESIGNED_HTTP_DOWNLOAD_URI_CACHE_MAX_SIZE, "60");
props.setProperty(S3Constants.S3_ENCRYPTION, S3Constants.S3_ENCRYPTION_NONE);
super.setUp();
}
@Test
public void testInitiateDirectUploadUnlimitedURIs() throws DataRecordUploadException,
RepositoryException {
ConfigurableDataRecordAccessProvider ds
= (ConfigurableDataRecordAccessProvider) createDataStore();
long uploadSize = ONE_GB * 50;
int expectedNumURIs = 5000;
DataRecordUpload upload = ds.initiateDataRecordUpload(uploadSize, -1);
Assert.assertEquals(expectedNumURIs, upload.getUploadURIs().size());
uploadSize = ONE_GB * 100;
expectedNumURIs = 10000;
upload = ds.initiateDataRecordUpload(uploadSize, -1);
Assert.assertEquals(expectedNumURIs, upload.getUploadURIs().size());
uploadSize = ONE_GB * 200;
upload = ds.initiateDataRecordUpload(uploadSize, -1);
Assert.assertEquals(expectedNumURIs, upload.getUploadURIs().size());
}
@Test
public void testGetDownloadURI() throws IOException, RepositoryException {
DataStore ds = createDataStore();
byte[] data = new byte[dataLength];
randomGen.nextBytes(data);
DataRecord record = doSynchronousAddRecord(ds, new ByteArrayInputStream(data));
URI uri = ((DataRecordAccessProvider) ds).getDownloadURI(record.getIdentifier(),
DataRecordDownloadOptions.DEFAULT);
Assert.assertNotNull("uri is null", uri);
// Download content from the URI directly and check
HttpEntity entity = httpGet(uri);
assertStream(new ByteArrayInputStream(data), entity.getContent());
// Download with DataStore API and check
DataRecord getrec = ds.getRecord(record.getIdentifier());
Assert.assertNotNull(getrec);
Assert.assertEquals(data.length, getrec.getLength());
assertRecord(data, getrec);
}
@Test
public void testDataMigration() {
try {
String encryption = props.getProperty(S3Constants.S3_ENCRYPTION);
//manually close the setup ds and remove encryption
ds.close();
props.remove(S3Constants.S3_ENCRYPTION);
ds = createDataStore();
byte[] data = new byte[dataLength];
randomGen.nextBytes(data);
DataRecord rec = ds.addRecord(new ByteArrayInputStream(data));
Assert.assertEquals(data.length, rec.getLength());
assertRecord(data, rec);
ds.close();
// turn encryption now anc recreate datastore instance
props.setProperty(S3Constants.S3_ENCRYPTION, encryption);
props.setProperty(S3Constants.S3_RENAME_KEYS, "true");
ds = createDataStore();
Assert.assertNotEquals(null, ds);
rec = ds.getRecord(rec.getIdentifier());
Assert.assertNotEquals(null, rec);
Assert.assertEquals(data.length, rec.getLength());
assertRecord(data, rec);
randomGen.nextBytes(data);
rec = ds.addRecord(new ByteArrayInputStream(data));
DataRecord rec1 = ds.getRecord(rec.getIdentifier());
Assert.assertEquals(rec.getLength(), rec1.getLength());
assertRecord(data, rec);
ds.close();
} catch (Exception e) {
LOG.error("error:", e);
fail(e.getMessage());
}
}
@Test
public void testInitiateCompleteUpload() throws IOException, RepositoryException, IllegalArgumentException, DataRecordUploadException {
S3DataStore ds = (S3DataStore) createDataStore();
ds.setDirectUploadURIExpirySeconds(60*5);
ds.setDirectDownloadURIExpirySeconds(60*5);
ds.setDirectDownloadURICacheSize(60*5);
DataRecordUpload uploadContext = ds.initiateDataRecordUpload(ONE_GB, 1);
assertNotNull(uploadContext);
String uploadToken = uploadContext.getUploadToken();
byte[] data = new byte[dataLength];
randomGen.nextBytes(data);
// Upload directly using the URI and check
CloseableHttpResponse response = httpPut(uploadContext, new ByteArrayInputStream(data), data.length);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
DataRecord uploadedRecord = ds.completeDataRecordUpload(uploadToken);
assertNotNull(uploadedRecord);
Assert.assertEquals(data.length, uploadedRecord.getLength());
assertRecord(data, uploadedRecord);
// Retieve through DataStore API and check
DataRecord getrec = ds.getRecord(uploadedRecord.getIdentifier());
Assert.assertNotNull(getrec);
Assert.assertEquals(data.length, getrec.getLength());
assertRecord(data, getrec);
}
public CloseableHttpResponse httpPut(@Nullable DataRecordUpload uploadContext, InputStream inputstream, long length) throws IOException {
// this weird combination of @Nullable and assertNotNull() is for IDEs not warning in test methods
assertNotNull(uploadContext);
URI puturl = uploadContext.getUploadURIs().iterator().next();
HttpPut putreq = new HttpPut(puturl);
String keyId = null;
String encryptionType = props.getProperty(S3Constants.S3_ENCRYPTION);
if (encryptionType.equals(S3Constants.S3_ENCRYPTION_SSE_KMS)) {
keyId = props.getProperty(S3Constants.S3_SSE_KMS_KEYID);
putreq.addHeader(new BasicHeader(Headers.SERVER_SIDE_ENCRYPTION,
SSEAlgorithm.KMS.getAlgorithm()));
if(keyId != null) {
putreq.addHeader(new BasicHeader(Headers.SERVER_SIDE_ENCRYPTION_AWS_KMS_KEYID,
keyId));
}
}
putreq.setEntity(new InputStreamEntity(inputstream , length));
CloseableHttpClient httpclient = HttpClients.createDefault();
CloseableHttpResponse response = httpclient.execute(putreq);
return response;
}
private HttpEntity httpGet(URI uri) throws IOException {
HttpGet getreq = new HttpGet(uri);
CloseableHttpClient httpclient = HttpClients.createDefault();
CloseableHttpResponse res = httpclient.execute(getreq);
Assert.assertEquals(200, res.getStatusLine().getStatusCode());
return res.getEntity();
}
protected DataRecord doSynchronousAddRecord(DataStore ds, InputStream in) throws DataStoreException {
return ((S3DataStore)ds).addRecord(in, new BlobOptions().setUpload(BlobOptions.UploadType.SYNCHRONOUS));
}
private static void assertStream(InputStream expected, InputStream actual) throws IOException {
while (true) {
int expectedByte = expected.read();
int actualByte = actual.read();
Assert.assertEquals(expectedByte, actualByte);
if (expectedByte == -1) {
break;
}
}
}
@Override
@After
public void tearDown() {
try {
super.tearDown();
}
catch (Exception ignore) { }
try {
S3DataStoreUtils.deleteBucket(bucket, thisTestStartTime);
}
catch (Exception ignore) { }
}
@AfterClass
public static void verifyAllBucketsDeleted() {
for (String bucket : createdBucketNames) {
try {
S3DataStoreUtils.deleteBucket(bucket, overallStartTime);
}
catch (Exception ignore) { }
}
}
protected DataStore createDataStore() throws RepositoryException {
DataStore s3ds = null;
try {
s3ds = getS3DataStore(s3Class, props, dataStoreDir);
} catch (Exception e) {
e.printStackTrace();
}
sleep(1000);
return s3ds;
}
/**----------Not supported-----------**/
@Override
public void testUpdateLastModifiedOnAccess() {
}
@Override
public void testDeleteAllOlderThan() {
}
}