| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.redis.internal.executor.server; |
| |
| import static org.apache.geode.test.awaitility.GeodeAwaitility.await; |
| import static org.assertj.core.api.Assertions.assertThat; |
| |
| import java.time.Duration; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import com.google.common.util.concurrent.AtomicDouble; |
| import org.assertj.core.data.Offset; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import redis.clients.jedis.Jedis; |
| |
| import org.apache.geode.internal.statistics.EnabledStatisticsClock; |
| import org.apache.geode.internal.statistics.StatisticsClock; |
| import org.apache.geode.test.awaitility.GeodeAwaitility; |
| import org.apache.geode.test.dunit.rules.RedisPortSupplier; |
| |
| public abstract class AbstractRedisInfoStatsIntegrationTest implements RedisPortSupplier { |
| |
| private static final int TIMEOUT = (int) GeodeAwaitility.getTimeout().toMillis(); |
| private static final String EXISTING_HASH_KEY = "Existing_Hash"; |
| private static final String EXISTING_STRING_KEY = "Existing_String"; |
| private static final String EXISTING_SET_KEY_1 = "Existing_Set_1"; |
| private static final String EXISTING_SET_KEY_2 = "Existing_Set_2"; |
| |
| private Jedis jedis; |
| private static long START_TIME; |
| private static StatisticsClock statisticsClock; |
| |
| private long preTestConnectionsReceived = 0; |
| private long preTestConnectedClients = 0; |
| |
| private static final String COMMANDS_PROCESSED = "total_commands_processed"; |
| private static final String TOTAL_CONNECTIONS_RECEIVED = "total_connections_received"; |
| private static final String CONNECTED_CLIENTS = "connected_clients"; |
| private static final String OPS_PERFORMED_OVER_LAST_SECOND = "instantaneous_ops_per_sec"; |
| private static final String TOTAL_NETWORK_BYTES_READ = "total_net_input_bytes"; |
| private static final String NETWORK_KB_READ_OVER_LAST_SECOND = "instantaneous_input_kbps"; |
| private static final String UPTIME_IN_DAYS = "uptime_in_days"; |
| private static final String UPTIME_IN_SECONDS = "uptime_in_seconds"; |
| |
| private static final AtomicInteger numInfoCalled = new AtomicInteger(0); |
| |
| |
| // ------------------- Setup -------------------------- // |
| @BeforeClass |
| public static void beforeClass() { |
| statisticsClock = new EnabledStatisticsClock(); |
| START_TIME = statisticsClock.getTime(); |
| } |
| |
| @Before |
| public void before() { |
| jedis = new Jedis("localhost", getPort(), TIMEOUT); |
| numInfoCalled.set(0); |
| |
| long preSetupCommandsProcessed = Long.valueOf(getInfo(jedis).get(COMMANDS_PROCESSED)); |
| |
| jedis.set(EXISTING_STRING_KEY, "A_Value"); |
| jedis.hset(EXISTING_HASH_KEY, "Field1", "Value1"); |
| jedis.sadd(EXISTING_SET_KEY_1, "m1", "m2", "m3"); |
| jedis.sadd(EXISTING_SET_KEY_2, "m4", "m5", "m6"); |
| |
| // the info command increments command processed so we need to account for that. |
| // the +1 is needed because info returns the number of commands processed before that call to |
| // info |
| await().atMost(Duration.ofSeconds(2)) |
| .untilAsserted(() -> assertThat( |
| Long.valueOf(getInfo(jedis).get(COMMANDS_PROCESSED)) - numInfoCalled.get() + 1) |
| .isEqualTo(preSetupCommandsProcessed + 4)); |
| |
| preTestConnectionsReceived = Long.valueOf(getInfo(jedis).get(TOTAL_CONNECTIONS_RECEIVED)); |
| preTestConnectedClients = Long.valueOf(getInfo(jedis).get(CONNECTED_CLIENTS)); |
| numInfoCalled.set(0); |
| } |
| |
| @After |
| public void after() { |
| jedis.flushAll(); |
| await().atMost(Duration.ofSeconds(5)) |
| .untilAsserted( |
| () -> assertThat(Long.valueOf(getInfo(jedis).get(CONNECTED_CLIENTS))).isEqualTo(1)); |
| jedis.close(); |
| } |
| |
| // ------------------- Stats Section -------------------------- // |
| |
| // note: see AbstractHitsMissesIntegrationTest for testing of hits/misses |
| |
| @Test |
| public void commandsProcessed_shouldIncrement_givenSuccessfulCommand() { |
| long initialCommandsProcessed = Long.valueOf(getInfo(jedis).get(COMMANDS_PROCESSED)); |
| jedis.ttl("key"); |
| |
| validateCommandsProcessed(jedis, initialCommandsProcessed, 1); |
| } |
| |
| @Test |
| public void opsPerformedOverLastSecond_ShouldUpdate_givenOperationsOccurring() { |
| int NUMBER_SECONDS_TO_RUN = 3; |
| |
| AtomicInteger numberOfCommandsExecuted = new AtomicInteger(); |
| AtomicDouble actualCommandsProcessedOverLastSecond = new AtomicDouble(); |
| |
| await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { |
| jedis.set("key", "value"); |
| numberOfCommandsExecuted.getAndIncrement(); |
| actualCommandsProcessedOverLastSecond.set( |
| Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))); |
| |
| return true; |
| }); |
| |
| long expected = (numberOfCommandsExecuted.get() + numInfoCalled.get()) / NUMBER_SECONDS_TO_RUN; |
| |
| assertThat(actualCommandsProcessedOverLastSecond.get()) |
| .isCloseTo(expected, |
| Offset.offset(getTenPercentOf(actualCommandsProcessedOverLastSecond.get()))); |
| |
| // if time passes w/o operations |
| await().during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS).until(() -> true); |
| |
| assertThat(Double.valueOf(getInfo(jedis).get(OPS_PERFORMED_OVER_LAST_SECOND))).isEqualTo(0D); |
| } |
| |
| @Test |
| public void networkBytesRead_shouldIncrementBySizeOfCommandSent() { |
| long initialNetworkBytesRead = Long.valueOf(getInfo(jedis).get(TOTAL_NETWORK_BYTES_READ)); |
| String infoCommandString = "*3\r\n$3\r\ninfo\r\n"; |
| String respCommandString = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"; |
| |
| jedis.set("key", "value"); |
| |
| validateNetworkBytesRead(jedis, initialNetworkBytesRead, |
| respCommandString.length() + infoCommandString.length()); |
| } |
| |
| @Test |
| public void networkKiloBytesReadOverLastSecond_shouldBeCloseToBytesReadOverLastSecond() { |
| |
| double REASONABLE_SOUNDING_OFFSET = .8; |
| int NUMBER_SECONDS_TO_RUN = 2; |
| String RESP_COMMAND_STRING = "*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"; |
| int BYTES_SENT_PER_COMMAND = RESP_COMMAND_STRING.length(); |
| AtomicInteger totalBytesSent = new AtomicInteger(); |
| AtomicReference<Double> actual_kbs = new AtomicReference<>((double) 0); |
| |
| await().during(Duration.ofSeconds(NUMBER_SECONDS_TO_RUN)).until(() -> { |
| jedis.set("key", "value"); |
| totalBytesSent.addAndGet(BYTES_SENT_PER_COMMAND); |
| actual_kbs.set(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))); |
| return true; |
| }); |
| |
| double expectedBytesReceived = totalBytesSent.get() / NUMBER_SECONDS_TO_RUN; |
| double expected_kbs = expectedBytesReceived / 1000; |
| |
| assertThat(actual_kbs.get()).isCloseTo(expected_kbs, Offset.offset(REASONABLE_SOUNDING_OFFSET)); |
| |
| // if time passes w/o operations |
| await() |
| .during(NUMBER_SECONDS_TO_RUN, TimeUnit.SECONDS) |
| .until(() -> true); |
| |
| assertThat(Double.valueOf(getInfo(jedis).get(NETWORK_KB_READ_OVER_LAST_SECOND))).isEqualTo(0); |
| |
| } |
| |
| // todo test rejected connections |
| @Test |
| public void should_UpdateRejectedConnections() { |
| Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); |
| Jedis jedis3 = new Jedis("localhost", getPort(), TIMEOUT); |
| |
| jedis2.ping(); |
| jedis3.ping(); |
| |
| validateConnectedClients(jedis, preTestConnectedClients, 2); |
| |
| jedis2.close(); |
| jedis3.close(); |
| |
| validateConnectedClients(jedis, preTestConnectedClients, 0); |
| } |
| |
| // ------------------- Clients Section -------------------------- // |
| |
| @Test |
| public void connectedClients_incrAndDecrWhenClientConnectsAndDisconnects() { |
| Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); |
| jedis2.ping(); |
| |
| validateConnectedClients(jedis, preTestConnectedClients, 1); |
| |
| jedis2.close(); |
| |
| validateConnectedClients(jedis, preTestConnectedClients, 0); |
| } |
| |
| @Test |
| public void totalConnectionsReceivedStat_shouldIncrement_whenNewConnectionOccurs() { |
| Jedis jedis2 = new Jedis("localhost", getPort(), TIMEOUT); |
| jedis2.ping(); |
| |
| validateConnectionsReceived(jedis, preTestConnectionsReceived, 1); |
| |
| jedis2.close(); |
| |
| validateConnectedClients(jedis, preTestConnectedClients, 0); |
| } |
| |
| // ------------------- Server Section -------------------------- // |
| |
| @Test |
| public void upTimeInDays_shouldBeEqualToTimeSinceStartInDays() { |
| long startTimeInNanos = getStartTime(); |
| long currentTimeInNanos = getCurrentTime(); |
| |
| long expectedNanos = currentTimeInNanos - startTimeInNanos; |
| long expectedDays = TimeUnit.NANOSECONDS.toDays(expectedNanos); |
| |
| assertThat(Long.valueOf(getInfo(jedis).get(UPTIME_IN_DAYS))).isEqualTo(expectedDays); |
| } |
| |
| @Test |
| public void uptimeInSeconds_shouldReturnTimeSinceStartInSeconds() { |
| long serverUptimeAtStartOfTestInNanos = getCurrentTime(); |
| long statsUpTimeAtStartOfTest = Long.valueOf(getInfo(jedis).get(UPTIME_IN_SECONDS)); |
| |
| await().during(Duration.ofSeconds(3)).until(() -> true); |
| |
| long expectedNanos = getCurrentTime() - serverUptimeAtStartOfTestInNanos; |
| long expectedSeconds = TimeUnit.NANOSECONDS.toSeconds(expectedNanos); |
| |
| assertThat(Long.valueOf(getInfo(jedis).get(UPTIME_IN_SECONDS)) - statsUpTimeAtStartOfTest) |
| .isCloseTo(expectedSeconds, Offset.offset(1l)); |
| } |
| |
| // ------------------- Helper Methods ----------------------------- // |
| public long getStartTime() { |
| return START_TIME; |
| } |
| |
| public long getCurrentTime() { |
| return this.statisticsClock.getTime(); |
| } |
| |
| private double getTenPercentOf(Double value) { |
| return Math.ceil(value * .1); |
| } |
| |
| /** |
| * Convert the values returned by the INFO command into a basic param:value map. |
| */ |
| static synchronized Map<String, String> getInfo(Jedis jedis) { |
| Map<String, String> results = new HashMap<>(); |
| String rawInfo = jedis.info(); |
| numInfoCalled.incrementAndGet(); |
| |
| for (String line : rawInfo.split("\r\n")) { |
| int colonIndex = line.indexOf(":"); |
| if (colonIndex > 0) { |
| String key = line.substring(0, colonIndex); |
| String value = line.substring(colonIndex + 1); |
| results.put(key, value); |
| } |
| } |
| |
| return results; |
| } |
| |
| private void validateNetworkBytesRead(Jedis jedis, long initialNetworkBytesRead, |
| int responseLength) { |
| await().atMost(Duration.ofSeconds(2)).untilAsserted( |
| () -> assertThat(Long.valueOf(getInfo(jedis).get(TOTAL_NETWORK_BYTES_READ))) |
| .isEqualTo(initialNetworkBytesRead + responseLength)); |
| } |
| |
| private void validateCommandsProcessed(Jedis jedis, long initialCommandsProcessed, int diff) { |
| await().atMost(Duration.ofSeconds(2)).untilAsserted( |
| () -> assertThat( |
| Long.valueOf(getInfo(jedis).get(COMMANDS_PROCESSED)) - numInfoCalled.get() + 1) |
| .isEqualTo(initialCommandsProcessed + diff)); |
| } |
| |
| private void validateConnectedClients(Jedis jedis, long initialConnectedClients, int diff) { |
| await().atMost(Duration.ofSeconds(2)).untilAsserted( |
| () -> assertThat(Long.valueOf(getInfo(jedis).get(CONNECTED_CLIENTS))) |
| .isEqualTo(initialConnectedClients + diff)); |
| } |
| |
| private void validateConnectionsReceived(Jedis jedis, long initialConnectionsReceived, int diff) { |
| assertThat(Long.valueOf(getInfo(jedis).get(TOTAL_CONNECTIONS_RECEIVED))) |
| .isEqualTo(initialConnectionsReceived + diff); |
| } |
| } |