blob: 0387c324d91a748244f437b3ae999a01485bf1e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.management.internal.rest;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.junit.assertions.ClusterManagementRealizationResultAssert.assertManagementResult;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.classloader.ClassPathLoader;
import org.apache.geode.management.api.ClusterManagementService;
import org.apache.geode.management.cluster.client.ClusterManagementServiceBuilder;
import org.apache.geode.management.configuration.Deployment;
import org.apache.geode.test.compiler.ClassBuilder;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.rules.MemberStarterRule;
import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
public class DeploymentManagementRedployDUnitTest {
private static final String VERSION1 = "Version1";
private static final String VERSION2 = "Version2";
private static final String JAR_NAME_A = "DeployCommandRedeployDUnitTestA.jar";
private static final String FUNCTION_A = "DeployCommandRedeployDUnitFunctionA";
private File jarAVersion1;
private File jarAVersion2;
private static final String JAR_NAME_B = "DeployCommandRedeployDUnitTestB.jar";
private static final String FUNCTION_B = "DeployCommandRedeployDUnitFunctionB";
private static final String PACKAGE_B = "jddunit.function";
private static final String FULLY_QUALIFIED_FUNCTION_B = PACKAGE_B + "." + FUNCTION_B;
private File jarBVersion1;
private File jarBVersion2;
private MemberVM locator;
private MemberVM server;
@Rule
public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
@Rule
public ClusterStartupRule lsRule = new ClusterStartupRule();
private ClusterManagementService client;
private Deployment deployment;
@Before
public void setup() throws Exception {
jarAVersion1 = createJarWithFunctionA(VERSION1);
jarAVersion2 = createJarWithFunctionA(VERSION2);
jarBVersion1 = createJarWithFunctionB(VERSION1);
jarBVersion2 = createJarWithFunctionB(VERSION2);
locator = lsRule.startLocatorVM(0, MemberStarterRule::withHttpService);
server = lsRule.startServerVM(1, locator.getPort());
client = new ClusterManagementServiceBuilder()
.setPort(locator.getHttpPort())
.build();
deployment = new Deployment();
}
@Test
public void redeployJarsWithNewVersionsOfFunctions() {
deployment.setFile(jarAVersion1);
assertManagementResult(client.create(deployment)).isSuccessful();
server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1));
deployment.setFile(jarBVersion1);
assertManagementResult(client.create(deployment)).isSuccessful();
server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
server.invoke(() -> assertThatCanLoad(JAR_NAME_B, FULLY_QUALIFIED_FUNCTION_B));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_B, VERSION1));
deployment.setFile(jarBVersion2);
assertManagementResult(client.create(deployment)).isSuccessful();
server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
server.invoke(() -> assertThatCanLoad(JAR_NAME_B, FULLY_QUALIFIED_FUNCTION_B));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_B, VERSION2));
deployment.setFile(jarAVersion2);
assertManagementResult(client.create(deployment)).isSuccessful();
server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
server.invoke(() -> assertThatCanLoad(JAR_NAME_B, FULLY_QUALIFIED_FUNCTION_B));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_B, VERSION2));
}
@Test
public void redeployJarsWithNewVersionsOfFunctionsAndMultipleLocators() throws IOException {
Properties props = new Properties();
props.setProperty("locators", "localhost[" + locator.getPort() + "]");
MemberVM locator2 = lsRule.startLocatorVM(2, props);
deployment.setFile(jarAVersion1);
assertManagementResult(client.create(deployment)).isSuccessful();
server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1));
deployment.setFile(jarAVersion2);
assertManagementResult(client.create(deployment)).isSuccessful();
server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
server.stop(false);
Path locator1Jar =
locator.getWorkingDir().toPath().resolve("cluster_config/cluster/" + JAR_NAME_A);
Path locator2Jar =
locator2.getWorkingDir().toPath().resolve("cluster_config/cluster/" + JAR_NAME_A);
await().pollDelay(1, TimeUnit.SECONDS)
.until(() -> FileUtils.contentEquals(locator1Jar.toFile(), jarAVersion2) &&
FileUtils.contentEquals(locator2Jar.toFile(), jarAVersion2));
lsRule.startServerVM(1, locator.getPort());
server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
}
private static LoopingFunctionExecutor executor;
@Test
public void hotDeployShouldNotResultInAnyFailedFunctionExecutions() {
deployment.setFile(jarAVersion1);
assertManagementResult(client.create(deployment)).isSuccessful();
server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION1));
server.invoke(() -> {
executor = new LoopingFunctionExecutor();
executor.startExecuting(FUNCTION_A);
executor.waitForExecutions(100);
});
deployment.setFile(jarAVersion2);
assertManagementResult(client.create(deployment)).isSuccessful();
server.invoke(() -> assertThatCanLoad(JAR_NAME_A, FUNCTION_A));
server.invoke(() -> assertThatFunctionHasVersion(FUNCTION_A, VERSION2));
server.invoke(() -> {
executor.waitForExecutions(100);
executor.stopExecutionAndThrowAnyException();
});
}
// Note that jar A is a Declarable Function, while jar B is only a Function.
// Also, the function for jar A resides in the default package, whereas jar B specifies a package.
// This ensures that this test has identical coverage to some tests that it replaced.
private File createJarWithFunctionA(String version) throws Exception {
URL classTemplateUrl = DeploymentManagementRedployDUnitTest.class
.getResource("DeployCommandRedeployDUnitTest_FunctionATemplate");
assertThat(classTemplateUrl).isNotNull();
String classContents = FileUtils.readFileToString(new File(classTemplateUrl.toURI()), "UTF-8");
classContents = classContents.replaceAll("FUNCTION_A", FUNCTION_A);
classContents = classContents.replaceAll("VERSION", version);
File jar = new File(temporaryFolder.newFolder(JAR_NAME_A + version), JAR_NAME_A);
ClassBuilder functionClassBuilder = new ClassBuilder();
functionClassBuilder.writeJarFromContent(FUNCTION_A, classContents, jar);
return jar;
}
private File createJarWithFunctionB(String version) throws Exception {
URL classTemplateUrl = DeploymentManagementRedployDUnitTest.class
.getResource("DeployCommandRedeployDUnitTest_FunctionBTemplate");
assertThat(classTemplateUrl).isNotNull();
String classContents = FileUtils.readFileToString(new File(classTemplateUrl.toURI()), "UTF-8");
classContents = classContents.replaceAll("PACKAGE_B", PACKAGE_B);
classContents = classContents.replaceAll("FUNCTION_B", FUNCTION_B);
classContents = classContents.replaceAll("VERSION", version);
File jar = new File(temporaryFolder.newFolder(JAR_NAME_B + version), JAR_NAME_B);
ClassBuilder functionClassBuilder = new ClassBuilder();
functionClassBuilder.writeJarFromContent("jddunit/function/" + FUNCTION_B, classContents, jar);
return jar;
}
private static void assertThatFunctionHasVersion(String functionId, String version) {
@SuppressWarnings("deprecation")
GemFireCacheImpl gemFireCache = GemFireCacheImpl.getInstance();
DistributedSystem distributedSystem = gemFireCache.getDistributedSystem();
@SuppressWarnings("unchecked")
Execution<Void, String, List<String>> execution =
FunctionService.onMember(distributedSystem.getDistributedMember());
List<String> result = execution.execute(functionId).getResult();
assertThat(result.get(0)).isEqualTo(version);
}
private static void assertThatCanLoad(String jarName, String className)
throws ClassNotFoundException {
assertThat(ClassPathLoader.getLatest().getJarDeploymentService()
.getDeployed(FilenameUtils.getBaseName(jarName)).isSuccessful()).isTrue();
assertThat(ClassPathLoader.getLatest().forName(className)).isNotNull();
}
private static class LoopingFunctionExecutor implements Serializable {
private final AtomicInteger COUNT_OF_EXECUTIONS = new AtomicInteger();
private final AtomicReference<Exception> EXCEPTION = new AtomicReference<>();
private final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
public void startExecuting(String functionId) {
ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
EXECUTOR_SERVICE.submit(() -> {
@SuppressWarnings("deprecation")
GemFireCacheImpl gemFireCache = GemFireCacheImpl.getInstance();
DistributedSystem distributedSystem = gemFireCache.getDistributedSystem();
while (!Thread.currentThread().isInterrupted()) {
try {
COUNT_OF_EXECUTIONS.incrementAndGet();
FunctionService.onMember(distributedSystem.getDistributedMember()).execute(functionId)
.getResult();
} catch (Exception e) {
EXCEPTION.set(e);
}
}
});
}
public void waitForExecutions(int numberOfExecutions) {
int initialCount = COUNT_OF_EXECUTIONS.get();
int countToWaitFor = initialCount + numberOfExecutions;
Callable<Boolean> doneWaiting = () -> COUNT_OF_EXECUTIONS.get() >= countToWaitFor;
await().until(doneWaiting);
}
public void stopExecutionAndThrowAnyException() throws Exception {
EXECUTOR_SERVICE.shutdownNow();
Exception e = EXCEPTION.get();
if (e != null) {
throw e;
}
}
}
}