| /** |
| * |
| * Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com> |
| * |
| * ==================================================================== |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * ==================================================================== |
| */ |
| package org.jclouds.blobstore.integration.internal; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static org.testng.Assert.assertEquals; |
| |
| import java.io.IOException; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Map.Entry; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.jclouds.blobstore.BlobStoreContext; |
| import org.jclouds.blobstore.attr.ConsistencyModels; |
| import org.jclouds.blobstore.domain.Blob; |
| import org.jclouds.blobstore.domain.StorageMetadata; |
| import org.jclouds.blobstore.domain.StorageType; |
| import org.jclouds.blobstore.util.internal.BlobStoreUtilsImpl; |
| import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule; |
| import org.testng.ITestContext; |
| import org.testng.annotations.AfterClass; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.BeforeSuite; |
| |
| import com.google.common.base.Function; |
| import com.google.common.base.Predicate; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.Sets; |
| import com.google.inject.Module; |
| |
| public class BaseBlobStoreIntegrationTest { |
| protected static final String LOCAL_ENCODING = System.getProperty("file.encoding"); |
| protected static final String XML_STRING_FORMAT = "<apples><apple name=\"%s\"></apple> </apples>"; |
| protected static final String TEST_STRING = String.format(XML_STRING_FORMAT, "apple"); |
| |
| protected Map<String, String> fiveStrings = ImmutableMap.of("one", String.format( |
| XML_STRING_FORMAT, "apple"), "two", String.format(XML_STRING_FORMAT, "bear"), "three", |
| String.format(XML_STRING_FORMAT, "candy"), "four", String.format(XML_STRING_FORMAT, |
| "dogma"), "five", String.format(XML_STRING_FORMAT, "emma")); |
| |
| protected Map<String, String> fiveStringsUnderPath = ImmutableMap.of("path/1", String.format( |
| XML_STRING_FORMAT, "apple"), "path/2", String.format(XML_STRING_FORMAT, "bear"), |
| "path/3", String.format(XML_STRING_FORMAT, "candy"), "path/4", String.format( |
| XML_STRING_FORMAT, "dogma"), "path/5", String |
| .format(XML_STRING_FORMAT, "emma")); |
| |
| public static long INCONSISTENCY_WINDOW = 10000; |
| protected static volatile AtomicInteger containerIndex = new AtomicInteger(0); |
| |
| protected volatile BlobStoreContext context; |
| protected static volatile int containerCount = 10; |
| public static final String CONTAINER_PREFIX = System.getProperty("user.name") + "-blobstore"; |
| /** |
| * two test groups integration and live. |
| */ |
| private volatile static BlockingQueue<String> containerJsr330 = new ArrayBlockingQueue<String>( |
| containerCount); |
| |
| /** |
| * There are a lot of retries here mainly from experience running inside amazon EC2. |
| */ |
| @BeforeSuite |
| public void setUpResourcesForAllThreads(ITestContext testContext) throws Exception { |
| createContainersSharedByAllThreads(getCloudResources(testContext), testContext); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private BlobStoreContext getCloudResources(ITestContext testContext) |
| throws ClassNotFoundException, InstantiationException, IllegalAccessException, |
| Exception { |
| String initializerClass = checkNotNull(System.getProperty("jclouds.test.initializer"), |
| "jclouds.test.initializer"); |
| Class<BaseTestInitializer> clazz = (Class<BaseTestInitializer>) Class |
| .forName(initializerClass); |
| BaseTestInitializer initializer = clazz.newInstance(); |
| return initializer.init(createHttpModule(), testContext); |
| } |
| |
| protected ExecutorService exec; |
| |
| /** |
| * we are doing this at a class level, as the context.getBlobStore() object is going to be shared |
| * for all methods in the class. We don't want to do this for group, as some test classes may |
| * want to have a different implementation of context.getBlobStore(). For example, one class may |
| * want non-blocking i/o and another class google appengine. |
| */ |
| @BeforeClass(groups = { "integration", "live" }) |
| public void setUpResourcesOnThisThread(ITestContext testContext) throws Exception { |
| context = getCloudResources(testContext); |
| exec = Executors.newCachedThreadPool(); |
| } |
| |
| @AfterClass(groups = { "integration", "live" }) |
| protected void tearDownClient() throws Exception { |
| if (exec != null) { |
| exec.shutdown(); |
| exec.awaitTermination(60, TimeUnit.SECONDS); |
| } |
| context.close(); |
| } |
| |
| private static volatile boolean initialized = false; |
| |
| protected void createContainersSharedByAllThreads(BlobStoreContext context, |
| ITestContext testContext) throws Exception { |
| while (!initialized) { |
| synchronized (BaseBlobStoreIntegrationTest.class) { |
| if (!initialized) { |
| deleteEverything(context); |
| for (; containerIndex.get() < containerCount; containerIndex.incrementAndGet()) { |
| String containerName = CONTAINER_PREFIX + containerIndex; |
| if (blackListContainers.contains(containerName)) { |
| containerCount++; |
| } else { |
| try { |
| createContainerAndEnsureEmpty(context, containerName); |
| containerJsr330.put(containerName); |
| } catch (Throwable e) { |
| e.printStackTrace(); |
| // throw away the container and try again with the next index |
| deleteContainerOrWarnIfUnable(context, containerName); |
| containerCount++; |
| } |
| } |
| } |
| testContext.setAttribute("containerJsr330", containerJsr330); |
| System.err.printf("*** containers to test: %s%n", containerJsr330); |
| // careful not to keep too many files open |
| context.close(); |
| initialized = true; |
| } |
| } |
| } |
| } |
| |
| private static void deleteContainerOrWarnIfUnable(BlobStoreContext context, String containerName) { |
| try { |
| context.getBlobStore().deleteContainer(containerName); |
| } catch (Throwable ex) { |
| System.err.printf("unable to delete container %s, ignoring...%n", containerName); |
| ex.printStackTrace(); |
| blackListContainers.add(containerName); |
| } |
| } |
| |
| private static final Set<String> blackListContainers = Sets.newHashSet(); |
| |
| /** |
| * Tries to delete all containers, runs up to two times |
| */ |
| protected static void deleteEverything(final BlobStoreContext context) throws Exception { |
| try { |
| for (int i = 0; i < 2; i++) { |
| Iterable<? extends StorageMetadata> testContainers = Iterables.filter(context |
| .getBlobStore().list(), new Predicate<StorageMetadata>() { |
| public boolean apply(StorageMetadata input) { |
| return (input.getType() == StorageType.CONTAINER || input.getType() == StorageType.FOLDER) |
| && input.getName().startsWith(CONTAINER_PREFIX.toLowerCase()); |
| } |
| }); |
| for (StorageMetadata container : testContainers) { |
| deleteContainerOrWarnIfUnable(context, container.getName()); |
| } |
| } // try twice |
| } catch (CancellationException e) { |
| throw e; |
| } |
| } |
| |
| /** |
| * two test groups integration and live. |
| */ |
| |
| public static boolean SANITY_CHECK_RETURNED_BUCKET_NAME = false; |
| |
| /** |
| * Due to eventual consistency, container commands may not return correctly immediately. Hence, |
| * we will try up to the inconsistency window to see if the assertion completes. |
| */ |
| protected static void assertConsistencyAware(BlobStoreContext context, Runnable assertion) |
| throws InterruptedException { |
| if (context.getConsistencyModel() == ConsistencyModels.STRICT) { |
| assertion.run(); |
| return; |
| } else { |
| |
| AssertionError error = null; |
| for (int i = 0; i < 30; i++) { |
| try { |
| assertion.run(); |
| return; |
| } catch (AssertionError e) { |
| error = e; |
| } |
| Thread.sleep(INCONSISTENCY_WINDOW / 30); |
| } |
| if (error != null) |
| throw error; |
| } |
| } |
| |
| protected void assertConsistencyAware(Runnable assertion) throws InterruptedException { |
| assertConsistencyAware(context, assertion); |
| } |
| |
| protected static void createContainerAndEnsureEmpty(BlobStoreContext context, |
| final String containerName) throws InterruptedException { |
| context.getBlobStore().createContainerInLocation("default", containerName); |
| if (context.getConsistencyModel() == ConsistencyModels.EVENTUAL) |
| Thread.sleep(1000); |
| context.getBlobStore().clearContainer(containerName); |
| } |
| |
| protected void createContainerAndEnsureEmpty(String containerName) throws InterruptedException { |
| createContainerAndEnsureEmpty(context, containerName); |
| } |
| |
| protected String addBlobToContainer(String sourceContainer, String key) { |
| Blob sourceObject = context.getBlobStore().newBlob(key); |
| sourceObject.getMetadata().setContentType("text/xml"); |
| sourceObject.setPayload(TEST_STRING); |
| return addBlobToContainer(sourceContainer, sourceObject); |
| } |
| |
| protected void add5BlobsUnderPathAnd5UnderRootToContainer(String sourceContainer) { |
| for (Entry<String, String> entry : Iterables.concat(fiveStrings.entrySet(), |
| fiveStringsUnderPath.entrySet())) { |
| Blob sourceObject = context.getBlobStore().newBlob(entry.getKey()); |
| sourceObject.getMetadata().setContentType("text/xml"); |
| sourceObject.setPayload(entry.getValue()); |
| addBlobToContainer(sourceContainer, sourceObject); |
| } |
| } |
| |
| protected String addBlobToContainer(String sourceContainer, Blob object) { |
| return context.getBlobStore().putBlob(sourceContainer, object); |
| } |
| |
| protected Blob validateContent(String sourceContainer, String key) throws InterruptedException { |
| assertConsistencyAwareContainerSize(sourceContainer, 1); |
| Blob newObject = context.getBlobStore().getBlob(sourceContainer, key); |
| assert newObject != null; |
| try { |
| assertEquals(BlobStoreUtilsImpl.getContentAsStringOrNullAndClose(newObject), TEST_STRING); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| return newObject; |
| } |
| |
| protected void assertConsistencyAwareContainerSize(final String containerName, final int count) |
| throws InterruptedException { |
| assertConsistencyAware(new Runnable() { |
| public void run() { |
| try { |
| assert context.getBlobStore().countBlobs(containerName) == count : String.format( |
| "expected only %d values in %s: %s", count, containerName, Sets |
| .newHashSet(Iterables.transform(context.getBlobStore().list( |
| containerName), new Function<StorageMetadata, String>() { |
| |
| public String apply(StorageMetadata from) { |
| return from.getName(); |
| } |
| |
| }))); |
| } catch (Exception e) { |
| Throwables.propagateIfPossible(e); |
| } |
| } |
| }); |
| } |
| |
| public String getContainerName() throws InterruptedException { |
| String containerName = containerJsr330.poll(30, TimeUnit.SECONDS); |
| assert containerName != null : "unable to get a container for the test"; |
| createContainerAndEnsureEmpty(containerName); |
| return containerName; |
| } |
| |
| /** |
| * requestor will create a container using the name returned from this. This method will take |
| * care not to exceed the maximum containers permitted by a service by deleting an existing |
| * container first. |
| * |
| * @throws InterruptedException |
| */ |
| public String getScratchContainerName() throws InterruptedException { |
| return allocateNewContainerName(getContainerName()); |
| } |
| |
| public void returnContainer(final String containerName) { |
| if (containerName != null) { |
| containerJsr330.add(containerName); |
| /* |
| * Ensure that any returned container name actually exists on the server. Return of a |
| * non-existent container introduces subtle testing bugs, where later unrelated tests will |
| * fail. |
| * |
| * NOTE: This sanity check should only be run for Stub-based Integration testing -- it will |
| * *substantially* slow down tests on a real server over a network. |
| */ |
| if (SANITY_CHECK_RETURNED_BUCKET_NAME) { |
| if (!Iterables.any(context.getBlobStore().list(), new Predicate<StorageMetadata>() { |
| public boolean apply(StorageMetadata md) { |
| return containerName.equals(md.getName()); |
| } |
| })) { |
| throw new IllegalStateException( |
| "Test returned the name of a non-existent container: " + containerName); |
| } |
| } |
| } |
| } |
| |
| /** |
| * abandon old container name instead of waiting for the container to be created. |
| * |
| * @throws InterruptedException |
| */ |
| public void destroyContainer(String scratchContainer) throws InterruptedException { |
| if (scratchContainer != null) { |
| recycleContainerAndAddToPool(scratchContainer); |
| } |
| } |
| |
| protected void recycleContainerAndAddToPool(String scratchContainer) throws InterruptedException { |
| String newScratchContainer = recycleContainer(scratchContainer); |
| returnContainer(newScratchContainer); |
| } |
| |
| protected String recycleContainer(final String container) throws InterruptedException { |
| String newScratchContainer = allocateNewContainerName(container); |
| createContainerAndEnsureEmpty(newScratchContainer); |
| return newScratchContainer; |
| } |
| |
| private String allocateNewContainerName(final String container) { |
| exec.submit(new Runnable() { |
| public void run() { |
| deleteContainerOrWarnIfUnable(context, container); |
| } |
| }); |
| String newScratchContainer = container + containerIndex.incrementAndGet(); |
| System.err.printf("*** allocated new container %s...%n", container); |
| return newScratchContainer; |
| } |
| |
| protected Module createHttpModule() { |
| return new JavaUrlHttpCommandExecutorServiceModule(); |
| } |
| |
| } |