blob: 7c4a0b05272e08f009741d9980f7ddb8bb3f4c7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.metrics.function.executions;
import static java.io.File.pathSeparatorChar;
import static java.lang.Boolean.TRUE;
import static java.lang.String.valueOf;
import static java.util.stream.Collectors.toList;
import static org.apache.geode.test.compiler.ClassBuilder.writeJarFromClasses;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.metrics.MetricsPublishingService;
import org.apache.geode.metrics.SimpleMetricsPublishingService;
import org.apache.geode.rules.ServiceJarRule;
import org.apache.geode.test.junit.rules.gfsh.GfshRule;
/**
* Acceptance tests for function executions timer on a loner server with no locator
*/
public class FunctionExecutionsTimerLonerTest {
private int serverPort;
private int jmxRmiPort;
private Path serverFolder;
private Path serviceJarPath;
private Path functionHelpersJarPath;
private ClientCache clientCache;
private Pool serverPool;
private String connectCommand;
private String startServerCommandWithStatsEnabled;
private String startServerCommandWithStatsDisabled;
private String startServerCommandWithTimeStatsDisabled;
private String stopServerCommand;
@Rule
public GfshRule gfshRule = new GfshRule();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public ServiceJarRule serviceJarRule = new ServiceJarRule();
@Before
public void setUp() throws IOException {
int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
serverPort = ports[0];
jmxRmiPort = ports[1];
serverFolder = temporaryFolder.newFolder("server").toPath().toAbsolutePath();
serviceJarPath = serviceJarRule.createJarFor("services.jar",
MetricsPublishingService.class, SimpleMetricsPublishingService.class);
functionHelpersJarPath =
temporaryFolder.getRoot().toPath().resolve("function-helpers.jar").toAbsolutePath();
writeJarFromClasses(functionHelpersJarPath.toFile(), FunctionToTimeWithResult.class,
GetFunctionExecutionTimerValues.class, ExecutionsTimerValues.class, ThreadSleep.class);
startServerCommandWithStatsEnabled = startServerCommand(false, true);
startServerCommandWithStatsDisabled = startServerCommand(true, true);
startServerCommandWithTimeStatsDisabled = startServerCommand(false, false);
stopServerCommand = "stop server --dir=" + serverFolder;
connectCommand = "connect --jmx-manager=localhost[" + jmxRmiPort + "]";
}
@After
public void tearDown() {
stopServer();
}
@Test
public void timersNotRegisteredIfFunctionDeployedButNotExecuted() {
startServerWithStatsEnabled();
deployFunction(FunctionToTimeWithResult.class);
assertThat(getExecutionsTimerValuesFor(FunctionToTimeWithResult.ID))
.as("Function executions timers on server")
.isEmpty();
}
@Test
public void timersRegisteredIfFunctionDeployedAndThenExecuted() {
startServerWithStatsEnabled();
deployFunction(FunctionToTimeWithResult.class);
Duration functionDuration = Duration.ofSeconds(1);
executeFunctionById(FunctionToTimeWithResult.ID, functionDuration);
assertThat(getExecutionsTimerValuesFor(FunctionToTimeWithResult.ID))
.as("Function executions timers on server")
.hasSize(2);
}
@Test
public void timersRegisteredIfStatsDisabled() {
startServerWithStatsDisabled();
executeFunctionThatSucceeds(new FunctionToTimeWithResult(), Duration.ofMillis(1));
assertThat(getExecutionsTimerValuesFor(FunctionToTimeWithResult.ID))
.as("Function executions timers on server")
.hasSize(2);
}
@Test
public void timersUnregisteredIfServerRestarts() {
startServerWithStatsEnabled();
executeFunctionThatSucceeds(new FunctionToTimeWithResult(), Duration.ofMillis(1));
restartServer();
assertThat(getExecutionsTimerValuesFor(FunctionToTimeWithResult.ID))
.as("Function executions timers on server")
.isEmpty();
}
@Test
public void successTimerRecordsCountAndTotalTimeIfFunctionSucceeds() {
startServerWithStatsEnabled();
FunctionToTimeWithResult function = new FunctionToTimeWithResult();
Duration functionDuration = Duration.ofSeconds(1);
executeFunctionThatSucceeds(function, functionDuration);
ExecutionsTimerValues successTimerValues = getSuccessTimerValues(function.getId());
assertThat(successTimerValues.count)
.as("Successful function executions count")
.isEqualTo(1);
assertThat(successTimerValues.totalTime)
.as("Successful function executions total time")
.isGreaterThan(functionDuration.toNanos());
}
@Test
public void successTimerRecordsCountAndTotalTimeIfTimeStatsDisabled() {
startServerWithTimeStatsDisabled();
FunctionToTimeWithResult function = new FunctionToTimeWithResult();
Duration functionDuration = Duration.ofSeconds(1);
executeFunctionThatSucceeds(function, functionDuration);
ExecutionsTimerValues successTimerValues = getSuccessTimerValues(function.getId());
assertThat(successTimerValues.count)
.as("Successful function executions count")
.isEqualTo(1);
assertThat(successTimerValues.totalTime)
.as("Successful function executions total time")
.isGreaterThan(functionDuration.toNanos());
}
@Test
public void failureTimerRecordsCountAndTotalTimeIfFunctionThrows() {
startServerWithStatsEnabled();
FunctionToTimeWithResult function = new FunctionToTimeWithResult();
Duration functionDuration = Duration.ofSeconds(1);
executeFunctionThatThrows(function, functionDuration);
ExecutionsTimerValues failureTimerValues = getFailureTimerValues(function.getId());
assertThat(failureTimerValues.count)
.as("Failed function executions count")
.isEqualTo(1);
assertThat(failureTimerValues.totalTime)
.as("Failed function executions total time")
.isGreaterThan(functionDuration.toNanos());
}
private String startServerCommand(boolean statsDisabled, boolean enableTimeStatistics) {
return String.join(" ",
"start server",
"--name=server",
"--dir=" + serverFolder,
"--server-port=" + serverPort,
"--classpath=" + serviceJarPath + pathSeparatorChar + functionHelpersJarPath,
"--enable-time-statistics=" + enableTimeStatistics,
"--http-service-port=0",
"--J=-Dgemfire.enable-cluster-config=true",
"--J=-Dgemfire.jmx-manager=true",
"--J=-Dgemfire.jmx-manager-start=true",
"--J=-Dgemfire.jmx-manager-port=" + jmxRmiPort,
"--J=-Dgemfire.statsDisabled=" + statsDisabled);
}
private void createClientAndPool() {
clientCache = new ClientCacheFactory()
.addPoolServer("localhost", serverPort)
.create();
serverPool = PoolManager.createFactory()
.addServer("localhost", serverPort)
.create("server-pool");
}
private void closeClientAndPool() {
if (serverPool != null) {
serverPool.destroy();
}
if (clientCache != null) {
clientCache.close();
}
}
private void stopServer() {
closeClientAndPool();
gfshRule.execute(stopServerCommand);
}
private void startServerWithStatsEnabled() {
startServer(startServerCommandWithStatsEnabled);
}
private void startServerWithStatsDisabled() {
startServer(startServerCommandWithStatsDisabled);
}
private void startServerWithTimeStatsDisabled() {
startServer(startServerCommandWithTimeStatsDisabled);
}
private void startServer(String command) {
gfshRule.execute(command);
createClientAndPool();
}
private void restartServer() {
stopServer();
startServerWithStatsEnabled();
}
@SuppressWarnings("SameParameterValue")
private <T> void deployFunction(Class<? extends Function<T>> functionClass) {
Path functionJarPath = temporaryFolder.getRoot().toPath()
.resolve(functionClass.getSimpleName() + ".jar").toAbsolutePath();
Throwable thrown =
catchThrowable(() -> writeJarFromClasses(functionJarPath.toFile(), functionClass));
assertThat(thrown)
.as("Exception from writing function JAR")
.isNull();
String deployFunctionCommand = "deploy --jar=" + functionJarPath;
gfshRule.execute(connectCommand, deployFunctionCommand);
}
private void executeFunctionThatSucceeds(Function<? super String[]> function, Duration duration) {
Throwable thrown = catchThrowable(() -> executeFunction(function, duration, true));
assertThat(thrown)
.as("Exception from function expected to succeed")
.isNull();
}
private void executeFunctionThatThrows(Function<? super String[]> function, Duration duration) {
Throwable thrown = catchThrowable(() -> executeFunction(function, duration, false));
assertThat(thrown)
.withFailMessage("Expected function to throw but it did not")
.isNotNull();
}
private void executeFunction(Function<? super String[]> function, Duration duration,
boolean successful) {
@SuppressWarnings("unchecked")
Execution<String[], Object, List<Object>> execution =
(Execution<String[], Object, List<Object>>) FunctionService.onServer(serverPool);
execution
.setArguments(new String[] {valueOf(duration.toMillis()), valueOf(successful)})
.execute(function)
.getResult();
}
@SuppressWarnings("SameParameterValue")
private void executeFunctionById(String functionId, Duration duration) {
@SuppressWarnings("unchecked")
Execution<String[], Object, List<Object>> execution =
(Execution<String[], Object, List<Object>>) FunctionService.onServer(serverPool);
Throwable thrown = catchThrowable(() -> execution
.setArguments(new String[] {valueOf(duration.toMillis()), TRUE.toString()})
.execute(functionId)
.getResult());
assertThat(thrown)
.as("Exception from function expected to succeed")
.isNull();
}
private ExecutionsTimerValues getSuccessTimerValues(String functionId) {
return getExecutionsTimerValues(functionId, true);
}
private ExecutionsTimerValues getFailureTimerValues(String functionId) {
return getExecutionsTimerValues(functionId, false);
}
private ExecutionsTimerValues getExecutionsTimerValues(String functionId,
boolean succeededTagValue) {
List<ExecutionsTimerValues> executionsTimerValues = getExecutionsTimerValuesFor(functionId)
.stream()
.filter(v -> v.succeeded == succeededTagValue)
.collect(toList());
assertThat(executionsTimerValues)
.hasSize(1);
return executionsTimerValues.get(0);
}
private List<ExecutionsTimerValues> getExecutionsTimerValuesFor(String functionId) {
@SuppressWarnings("unchecked")
Execution<Void, List<ExecutionsTimerValues>, List<List<ExecutionsTimerValues>>> functionExecution =
(Execution<Void, List<ExecutionsTimerValues>, List<List<ExecutionsTimerValues>>>) FunctionService
.onServer(serverPool);
List<List<ExecutionsTimerValues>> results = functionExecution
.execute(new GetFunctionExecutionTimerValues())
.getResult();
assertThat(results)
.hasSize(1);
return results.get(0).stream()
.filter(v -> v.functionId.equals(functionId))
.collect(toList());
}
}