GEODE-8784:Create matching Geode stats for all Redis INFO stats (#5853)

Adds the following new Geode redis stats:
- totalConnectionsReceived
- keyspaceHits
- keyspaceMisses

Co-authored-by: John Hutchison <hutchisonjo@vmware.com>
Co-authored-by: Jens Deppe <jdeppe@pivotal.io>
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/RedisStatsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/RedisStatsIntegrationTest.java
index b49863f..849f6b7 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/RedisStatsIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/RedisStatsIntegrationTest.java
@@ -34,6 +34,7 @@
 import org.apache.geode.internal.statistics.EnabledStatisticsClock;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.redis.GeodeRedisServerRule;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 
 public class RedisStatsIntegrationTest {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index 58ced58..b11c184 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -36,6 +36,8 @@
 import org.apache.geode.redis.internal.pubsub.PubSub;
 import org.apache.geode.redis.internal.pubsub.PubSubImpl;
 import org.apache.geode.redis.internal.pubsub.Subscriptions;
+import org.apache.geode.redis.internal.statistics.GeodeRedisStats;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 
 /**
  * The GeodeRedisServer is a server that understands the Redis protocol. As commands are sent to the
@@ -120,9 +122,12 @@
     InternalDistributedSystem system = cache.getInternalDistributedSystem();
     StatisticsClock statisticsClock =
         StatisticsClockFactory.clock(true);
-    return new RedisStats(system.getStatisticsManager(), statisticsClock);
-  }
 
+    return new RedisStats(statisticsClock,
+        new GeodeRedisStats(system.getStatisticsManager(),
+            "redisStats",
+            statisticsClock));
+  }
 
   @VisibleForTesting
   RedisStats getStats() {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/PassiveExpirationManager.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/PassiveExpirationManager.java
index 666ab09..26de09e 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/PassiveExpirationManager.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/PassiveExpirationManager.java
@@ -33,6 +33,7 @@
 import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.executor.key.RedisKeyCommands;
 import org.apache.geode.redis.internal.executor.key.RedisKeyCommandsFunctionInvoker;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 
 public class PassiveExpirationManager {
   private static final Logger logger = LogService.getLogger();
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisStats.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisStats.java
deleted file mode 100644
index 47e1396..0000000
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisStats.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.geode.redis.internal;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.geode.internal.statistics.StatisticsClockFactory.getTime;
-import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor;
-
-import java.util.ArrayList;
-import java.util.EnumMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.geode.StatisticDescriptor;
-import org.apache.geode.Statistics;
-import org.apache.geode.StatisticsFactory;
-import org.apache.geode.StatisticsType;
-import org.apache.geode.StatisticsTypeFactory;
-import org.apache.geode.annotations.Immutable;
-import org.apache.geode.internal.statistics.StatisticsClock;
-import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
-
-public class RedisStats {
-  @Immutable
-  private static final StatisticsType type;
-  @Immutable
-  private static final EnumMap<RedisCommandType, Integer> completedCommandStatIds =
-      new EnumMap<>(RedisCommandType.class);
-  @Immutable
-  private static final EnumMap<RedisCommandType, Integer> timeCommandStatIds =
-      new EnumMap<>(RedisCommandType.class);
-
-  private static final int clientId;
-  private static final int passiveExpirationChecksId;
-  private static final int passiveExpirationCheckTimeId;
-  private static final int passiveExpirationsId;
-  private static final int expirationsId;
-  private static final int expirationTimeId;
-  private final AtomicLong commandsProcessed = new AtomicLong();
-  private final AtomicLong totalNetworkBytesRead = new AtomicLong();
-  private final AtomicLong totalConnectionsReceived = new AtomicLong();
-  private final AtomicLong expirations = new AtomicLong();
-  private final AtomicLong keyspaceHits = new AtomicLong();
-  private final AtomicLong keyspaceMisses = new AtomicLong();
-  private final ScheduledExecutorService perSecondExecutor;
-  private volatile double networkKiloBytesReadDuringLastSecond;
-  private volatile long opsPerformedLastTick;
-  private double opsPerformedOverLastSecond;
-  private long previousNetworkBytesRead;
-  private final StatisticsClock clock;
-  private final Statistics stats;
-  private final long START_TIME_IN_NANOS;
-
-  static {
-    StatisticsTypeFactory f = StatisticsTypeFactoryImpl.singleton();
-    ArrayList<StatisticDescriptor> descriptorList = new ArrayList<>();
-    fillListWithCompletedCommandDescriptors(f, descriptorList);
-    fillListWithTimeCommandDescriptors(f, descriptorList);
-    descriptorList.add(f.createLongGauge("clients",
-        "Current number of clients connected to this redis server.", "clients"));
-    descriptorList.add(f.createLongCounter("passiveExpirationChecks",
-        "Total number of passive expiration checks that have completed. Checks include scanning and expiring.",
-        "checks"));
-    descriptorList.add(f.createLongCounter("passiveExpirationCheckTime",
-        "Total amount of time, in nanoseconds, spent in passive expiration checks on this server.",
-        "nanoseconds"));
-    descriptorList.add(f.createLongCounter("passiveExpirations",
-        "Total number of keys that have been passively expired on this server.", "expirations"));
-    descriptorList.add(f.createLongCounter("expirations",
-        "Total number of keys that have been expired, actively or passively, on this server.",
-        "expirations"));
-    descriptorList.add(f.createLongCounter("expirationTime",
-        "Total amount of time, in nanoseconds, spent expiring keys on this server.",
-        "nanoseconds"));
-    StatisticDescriptor[] descriptorArray =
-        descriptorList.toArray(new StatisticDescriptor[descriptorList.size()]);
-
-    type = f.createType("RedisStats", "Statistics for a GemFire Redis Server", descriptorArray);
-
-    fillCompletedIdMap();
-    fillTimeIdMap();
-    clientId = type.nameToId("clients");
-    passiveExpirationChecksId = type.nameToId("passiveExpirationChecks");
-    passiveExpirationCheckTimeId = type.nameToId("passiveExpirationCheckTime");
-    passiveExpirationsId = type.nameToId("passiveExpirations");
-    expirationsId = type.nameToId("expirations");
-    expirationTimeId = type.nameToId("expirationTime");
-  }
-
-  public RedisStats(StatisticsFactory factory, StatisticsClock clock) {
-    this(factory, "redisStats", clock);
-  }
-
-  public RedisStats(StatisticsFactory factory, String textId, StatisticsClock clock) {
-    stats = factory == null ? null : factory.createAtomicStatistics(type, textId);
-    this.clock = clock;
-    this.START_TIME_IN_NANOS = this.clock.getTime();
-    perSecondExecutor = startPerSecondUpdater();
-  }
-
-  private static void fillListWithCompletedCommandDescriptors(StatisticsTypeFactory f,
-      ArrayList<StatisticDescriptor> descriptorList) {
-    for (RedisCommandType command : RedisCommandType.values()) {
-      if (command.isUnimplemented()) {
-        continue;
-      }
-      String name = command.name().toLowerCase();
-      String statName = name + "Completed";
-      String statDescription = "Total number of redis '" + name
-          + "' operations that have completed execution on this server.";
-      String units = "operations";
-      descriptorList.add(f.createLongCounter(statName, statDescription, units));
-    }
-  }
-
-  private static void fillListWithTimeCommandDescriptors(StatisticsTypeFactory f,
-      ArrayList<StatisticDescriptor> descriptorList) {
-
-    for (RedisCommandType command : RedisCommandType.values()) {
-      if (command.isUnimplemented()) {
-        continue;
-      }
-
-      String name = command.name().toLowerCase();
-      String statName = name + "Time";
-      String statDescription =
-          "Total amount of time, in nanoseconds, spent executing redis '"
-              + name +
-              "' operations on this server.";
-
-      String units = "nanoseconds";
-      descriptorList.add(f.createLongCounter(statName, statDescription, units));
-    }
-  }
-
-  private static void fillCompletedIdMap() {
-    for (RedisCommandType command : RedisCommandType.values()) {
-      if (command.isUnimplemented()) {
-        continue;
-      }
-      String name = command.name().toLowerCase();
-      String statName = name + "Completed";
-      completedCommandStatIds.put(command, type.nameToId(statName));
-    }
-  }
-
-  private static void fillTimeIdMap() {
-    for (RedisCommandType command : RedisCommandType.values()) {
-      if (command.isUnimplemented()) {
-        continue;
-      }
-      String name = command.name().toLowerCase();
-      String statName = name + "Time";
-      timeCommandStatIds.put(command, type.nameToId(statName));
-    }
-  }
-
-  public double getNetworkKiloBytesReadOverLastSecond() {
-    return networkKiloBytesReadDuringLastSecond;
-  }
-
-  public double getOpsPerformedOverLastSecond() {
-    return opsPerformedOverLastSecond;
-  }
-
-  private long getCurrentTimeNanos() {
-    return clock.getTime();
-  }
-
-  public long startCommand(RedisCommandType command) {
-    return getTime();
-  }
-
-  public void endCommand(RedisCommandType command, long start) {
-    if (clock.isEnabled()) {
-      stats.incLong(timeCommandStatIds.get(command), getCurrentTimeNanos() - start);
-    }
-    stats.incLong(completedCommandStatIds.get(command), 1);
-  }
-
-  public void addClient() {
-    totalConnectionsReceived.incrementAndGet();
-    stats.incLong(clientId, 1);
-  }
-
-  public void removeClient() {
-    stats.incLong(clientId, -1);
-  }
-
-  public long getTotalConnectionsReceived() {
-    return totalConnectionsReceived.get();
-  }
-
-  public long getConnectedClients() {
-    return stats.getLong(clientId);
-  }
-
-  public void incCommandsProcessed() {
-    commandsProcessed.incrementAndGet();
-  }
-
-  public long getCommandsProcessed() {
-    return commandsProcessed.get();
-  }
-
-  public void incNetworkBytesRead(long bytesRead) {
-    totalNetworkBytesRead.addAndGet(bytesRead);
-  }
-
-  public long getTotalNetworkBytesRead() {
-    return totalNetworkBytesRead.get();
-  }
-
-  private long getUptimeInMilliseconds() {
-    long uptimeInNanos = getCurrentTimeNanos() - START_TIME_IN_NANOS;
-    return TimeUnit.NANOSECONDS.toMillis(uptimeInNanos);
-  }
-
-  public long getUptimeInSeconds() {
-    return TimeUnit.MILLISECONDS.toSeconds(getUptimeInMilliseconds());
-  }
-
-  public long getUptimeInDays() {
-    return TimeUnit.MILLISECONDS.toDays(getUptimeInMilliseconds());
-  }
-
-  public void incKeyspaceHits() {
-    keyspaceHits.incrementAndGet();
-  }
-
-  public long getKeyspaceHits() {
-    return keyspaceHits.get();
-  }
-
-  public void incKeyspaceMisses() {
-    keyspaceMisses.incrementAndGet();
-  }
-
-  public long getKeyspaceMisses() {
-    return keyspaceMisses.get();
-  }
-
-  public long startPassiveExpirationCheck() {
-    return getCurrentTimeNanos();
-  }
-
-  public void endPassiveExpirationCheck(long start, long expireCount) {
-    if (expireCount > 0) {
-      incPassiveExpirations(expireCount);
-    }
-    if (clock.isEnabled()) {
-      stats.incLong(passiveExpirationCheckTimeId, getCurrentTimeNanos() - start);
-    }
-    stats.incLong(passiveExpirationChecksId, 1);
-  }
-
-  public long startExpiration() {
-    return getCurrentTimeNanos();
-  }
-
-  public void endExpiration(long start) {
-    if (clock.isEnabled()) {
-      stats.incLong(expirationTimeId, getCurrentTimeNanos() - start);
-    }
-    stats.incLong(expirationsId, 1);
-    expirations.incrementAndGet();
-  }
-
-  public void incPassiveExpirations(long count) {
-    stats.incLong(passiveExpirationsId, count);
-  }
-
-  public void close() {
-    if (stats != null) {
-      stats.close();
-    }
-    stopPerSecondUpdater();
-  }
-
-  private ScheduledExecutorService startPerSecondUpdater() {
-    int INTERVAL = 1;
-
-    ScheduledExecutorService perSecondExecutor =
-        newSingleThreadScheduledExecutor("GemFireRedis-PerSecondUpdater-");
-
-    perSecondExecutor.scheduleWithFixedDelay(
-        () -> doPerSecondUpdates(),
-        INTERVAL,
-        INTERVAL,
-        SECONDS);
-
-    return perSecondExecutor;
-  }
-
-  private void stopPerSecondUpdater() {
-    perSecondExecutor.shutdownNow();
-  }
-
-  private void doPerSecondUpdates() {
-    updateNetworkKilobytesReadLastSecond();
-    updateOpsPerformedOverLastSecond();
-  }
-
-  private void updateNetworkKilobytesReadLastSecond() {
-    long totalNetworkBytesRead = getTotalNetworkBytesRead();
-    long deltaNetworkBytesRead = totalNetworkBytesRead - previousNetworkBytesRead;
-    networkKiloBytesReadDuringLastSecond = deltaNetworkBytesRead / 1000;
-    previousNetworkBytesRead = getTotalNetworkBytesRead();
-  }
-
-  private void updateOpsPerformedOverLastSecond() {
-    long totalOpsPerformed = getCommandsProcessed();
-    long opsPerformedThisTick = totalOpsPerformed - opsPerformedLastTick;
-    opsPerformedOverLastSecond = opsPerformedThisTick;
-    opsPerformedLastTick = getCommandsProcessed();
-  }
-}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/CommandHelper.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/CommandHelper.java
index a2ad2bc..3c40582 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/CommandHelper.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/CommandHelper.java
@@ -25,8 +25,8 @@
 
 import org.apache.geode.cache.Region;
 import org.apache.geode.redis.internal.RedisConstants;
-import org.apache.geode.redis.internal.RedisStats;
 import org.apache.geode.redis.internal.executor.StripedExecutor;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 
 /**
  * Provides methods to help implement command execution.
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
index 3cb1b59..751d3b9 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
@@ -25,7 +25,6 @@
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.redis.internal.RedisCommandType;
-import org.apache.geode.redis.internal.RedisStats;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.CommandHelper;
 import org.apache.geode.redis.internal.data.RedisData;
@@ -34,6 +33,7 @@
 import org.apache.geode.redis.internal.data.RedisSetCommandsFunctionExecutor;
 import org.apache.geode.redis.internal.data.RedisStringCommandsFunctionExecutor;
 import org.apache.geode.redis.internal.executor.string.SetOptions;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 
 public class CommandFunction extends SingleResultRedisFunction {
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
index e8c85bd..13eb8fd 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
@@ -28,13 +28,13 @@
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.execute.InternalFunction;
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
-import org.apache.geode.redis.internal.RedisStats;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.CommandHelper;
 import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.data.RedisKeyCommandsFunctionExecutor;
 import org.apache.geode.redis.internal.executor.SingleResultCollector;
 import org.apache.geode.redis.internal.executor.StripedExecutor;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 
 @SuppressWarnings("unchecked")
 public class RenameFunction implements InternalFunction {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/server/InfoExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/server/InfoExecutor.java
index bba1b22..ebe13da 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/server/InfoExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/server/InfoExecutor.java
@@ -19,12 +19,12 @@
 import java.util.List;
 
 import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.redis.internal.RedisStats;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 
 public class InfoExecutor extends AbstractExecutor {
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ByteToCommandDecoder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ByteToCommandDecoder.java
index f57efe0..4139daa 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ByteToCommandDecoder.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ByteToCommandDecoder.java
@@ -22,7 +22,7 @@
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
 
-import org.apache.geode.redis.internal.RedisStats;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 
 /**
  * This is the first part of the channel pipeline for Netty. Here incoming bytes are read and a
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 7006c2c..de3ba83 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -47,12 +47,12 @@
 import org.apache.geode.redis.internal.ParameterRequirements.RedisParametersMismatchException;
 import org.apache.geode.redis.internal.RedisCommandType;
 import org.apache.geode.redis.internal.RedisConstants;
-import org.apache.geode.redis.internal.RedisStats;
 import org.apache.geode.redis.internal.RegionProvider;
 import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
 import org.apache.geode.redis.internal.executor.CommandFunction;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.pubsub.PubSub;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 
 /**
  * This class extends {@link ChannelInboundHandlerAdapter} from Netty and it is the last part of the
@@ -306,7 +306,7 @@
         }
       }
 
-      final long start = redisStats.startCommand(command.getCommandType());
+      final long start = redisStats.startCommand();
       try {
         writeToChannel(command.execute(this));
       } finally {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
index 19e5d9e..3a1d903 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
@@ -58,9 +58,9 @@
 import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.management.ManagementException;
-import org.apache.geode.redis.internal.RedisStats;
 import org.apache.geode.redis.internal.RegionProvider;
 import org.apache.geode.redis.internal.pubsub.PubSub;
+import org.apache.geode.redis.internal.statistics.RedisStats;
 
 public class NettyRedisServer {
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/statistics/GeodeRedisStats.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/statistics/GeodeRedisStats.java
new file mode 100644
index 0000000..f0f1f0c
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/statistics/GeodeRedisStats.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal.statistics;
+
+import java.util.ArrayList;
+import java.util.EnumMap;
+
+import org.apache.geode.StatisticDescriptor;
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.StatisticsType;
+import org.apache.geode.StatisticsTypeFactory;
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
+import org.apache.geode.redis.internal.RedisCommandType;
+
+public class GeodeRedisStats {
+  @Immutable
+  private static final StatisticsType type;
+  @Immutable
+  private static final EnumMap<RedisCommandType, Integer> completedCommandStatIds =
+      new EnumMap<>(RedisCommandType.class);
+  @Immutable
+  private static final EnumMap<RedisCommandType, Integer> timeCommandStatIds =
+      new EnumMap<>(RedisCommandType.class);
+
+  private static final int currentlyConnectedClients;
+  private static final int passiveExpirationChecksId;
+  private static final int passiveExpirationCheckTimeId;
+  private static final int passiveExpirationsId;
+  private static final int expirationsId;
+  private static final int expirationTimeId;
+  private static final int totalConnectionsReceived;
+  private static final int commandsProcessed;
+  private static final int keyspaceHits;
+  private static final int keyspaceMisses;
+  private static final int totalNetworkBytesRead;
+  private final Statistics stats;
+  private final StatisticsClock clock;
+
+  public GeodeRedisStats(StatisticsFactory factory, String textId,
+      StatisticsClock clock) {
+    this.clock = clock;
+    stats = factory == null ? null : factory.createAtomicStatistics(type, textId);
+  }
+
+  static {
+    StatisticsTypeFactory statisticsTypeFactory = StatisticsTypeFactoryImpl.singleton();
+    ArrayList<StatisticDescriptor> descriptorList = new ArrayList<>();
+
+    fillListWithCompletedCommandDescriptors(statisticsTypeFactory, descriptorList);
+    fillListWithTimeCommandDescriptors(statisticsTypeFactory, descriptorList);
+    fillListWithCommandDescriptors(statisticsTypeFactory, descriptorList);
+
+    StatisticDescriptor[] descriptorArray =
+        descriptorList.toArray(new StatisticDescriptor[descriptorList.size()]);
+
+    type = statisticsTypeFactory
+        .createType("RedisStats",
+            "Statistics for a GemFire Redis Server",
+            descriptorArray);
+
+    fillCompletedIdMap(type);
+    fillTimeIdMap(type);
+
+    currentlyConnectedClients = type.nameToId("connectedClients");
+    passiveExpirationChecksId = type.nameToId("passiveExpirationChecks");
+    passiveExpirationCheckTimeId = type.nameToId("passiveExpirationCheckTime");
+    passiveExpirationsId = type.nameToId("passiveExpirations");
+    expirationsId = type.nameToId("expirations");
+    expirationTimeId = type.nameToId("expirationTime");
+    totalConnectionsReceived = type.nameToId("totalConnectionsReceived");
+    commandsProcessed = type.nameToId("commandsProcessed");
+    totalNetworkBytesRead = type.nameToId("totalNetworkBytesRead");
+    keyspaceHits = type.nameToId("keyspaceHits");
+    keyspaceMisses = type.nameToId("keyspaceMisses");
+  }
+
+  private long getCurrentTimeNanos() {
+    return clock.getTime();
+  }
+
+  public void endCommand(RedisCommandType command, long start) {
+    if (clock.isEnabled()) {
+      stats.incLong(timeCommandStatIds.get(command), getCurrentTimeNanos() - start);
+    }
+    stats.incLong(completedCommandStatIds.get(command), 1);
+  }
+
+  public void addClient() {
+    stats.incLong(currentlyConnectedClients, 1);
+    stats.incLong(totalConnectionsReceived, 1);
+  }
+
+  public void removeClient() {
+    stats.incLong(currentlyConnectedClients, -1);
+  }
+
+  public void endPassiveExpirationCheck(long start, long expireCount) {
+    if (expireCount > 0) {
+      incPassiveExpirations(expireCount);
+    }
+    if (clock.isEnabled()) {
+      stats.incLong(passiveExpirationCheckTimeId, getCurrentTimeNanos() - start);
+    }
+    stats.incLong(passiveExpirationChecksId, 1);
+  }
+
+  private void incPassiveExpirations(long count) {
+    stats.incLong(passiveExpirationsId, count);
+  }
+
+  public void endExpiration(long start) {
+    if (clock.isEnabled()) {
+      stats.incLong(expirationTimeId, getCurrentTimeNanos() - start);
+    }
+    stats.incLong(expirationsId, 1);
+  }
+
+  public void incrementCommandsProcessed() {
+    stats.incLong(commandsProcessed, 1);
+  }
+
+  public void incrementTotalNetworkBytesRead(long bytes) {
+    stats.incLong(totalNetworkBytesRead, bytes);
+  }
+
+  public void incrementKeyspaceHits() {
+    stats.incLong(keyspaceHits, 1);
+  }
+
+  public void incrementKeyspaceMisses() {
+    stats.incLong(keyspaceMisses, 1);
+  }
+
+  public void close() {
+    if (stats != null) {
+      stats.close();
+    }
+  }
+
+  private static void fillListWithTimeCommandDescriptors(StatisticsTypeFactory factory,
+      ArrayList<StatisticDescriptor> descriptorList) {
+
+    for (RedisCommandType command : RedisCommandType.values()) {
+
+      if (command.isUnimplemented()) {
+        continue;
+      }
+
+      String name = command.name().toLowerCase();
+      String statName = name + "Time";
+      String statDescription =
+          "Total amount of time, in nanoseconds, spent executing redis '"
+              + name + "' operations on this server.";
+      String units = "nanoseconds";
+
+      descriptorList.add(factory.createLongCounter(statName, statDescription, units));
+    }
+  }
+
+  private static void fillListWithCompletedCommandDescriptors(StatisticsTypeFactory factory,
+      ArrayList<StatisticDescriptor> descriptorList) {
+    for (RedisCommandType command : RedisCommandType.values()) {
+
+      if (command.isUnimplemented()) {
+        continue;
+      }
+
+      String name = command.name().toLowerCase();
+      String statName = name + "Completed";
+      String statDescription = "Total number of redis '" + name
+          + "' operations that have completed execution on this server.";
+      String units = "operations";
+
+      descriptorList.add(factory.createLongCounter(statName, statDescription, units));
+    }
+  }
+
+  private static void fillCompletedIdMap(StatisticsType type) {
+    for (RedisCommandType command : RedisCommandType.values()) {
+
+      if (command.isUnimplemented()) {
+        continue;
+      }
+
+      String name = command.name().toLowerCase();
+      String statName = name + "Completed";
+
+      completedCommandStatIds.put(command, type.nameToId(statName));
+    }
+  }
+
+  private static void fillListWithCommandDescriptors(StatisticsTypeFactory statisticsTypeFactory,
+      ArrayList<StatisticDescriptor> descriptorList) {
+
+    descriptorList.add(statisticsTypeFactory.createLongGauge("connectedClients",
+        "Current client connections to this redis server.",
+        "clients"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("commandsProcessed",
+        "Total number of commands processed by this redis server.",
+        "commands"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("keyspaceHits",
+        "Total number of successful key lookups on this redis server"
+            + " from cache on this redis server.",
+        "hits"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("keyspaceMisses",
+        "Total number of keys requested but not found on this redis server.",
+        "misses"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("totalNetworkBytesRead",
+        "Total number of bytes read by this redis server.",
+        "bytes"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("totalConnectionsReceived",
+        "Total number of client connections received by this redis server since startup.",
+        "connections"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("passiveExpirationChecks",
+        "Total number of passive expiration checks that have"
+            + " completed. Checks include scanning and expiring.",
+        "checks"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("passiveExpirationCheckTime",
+        "Total amount of time, in nanoseconds, spent in passive "
+            + "expiration checks on this server.",
+        "nanoseconds"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("passiveExpirations",
+        "Total number of keys that have been passively expired on this server.",
+        "expirations"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("expirations",
+        "Total number of keys that have been expired, actively or passively, on this server.",
+        "expirations"));
+
+    descriptorList.add(statisticsTypeFactory.createLongCounter("expirationTime",
+        "Total amount of time, in nanoseconds, spent expiring keys on this server.",
+        "nanoseconds"));
+  }
+
+  private static void fillTimeIdMap(StatisticsType type) {
+    for (RedisCommandType command : RedisCommandType.values()) {
+
+      if (command.isUnimplemented()) {
+        continue;
+      }
+
+      String name = command.name().toLowerCase();
+      String statName = name + "Time";
+      timeCommandStatIds.put(command, type.nameToId(statName));
+    }
+  }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java
new file mode 100644
index 0000000..2a535f6
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/statistics/RedisStats.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal.statistics;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.statistics.StatisticsClockFactory.getTime;
+import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.geode.internal.statistics.StatisticsClock;
+import org.apache.geode.redis.internal.RedisCommandType;
+
+public class RedisStats {
+
+  private final AtomicLong commandsProcessed = new AtomicLong();
+  private final AtomicLong totalNetworkBytesRead = new AtomicLong();
+  private final AtomicLong totalConnectionsReceived = new AtomicLong();
+  private final AtomicLong currentlyConnectedClients = new AtomicLong();
+  private final AtomicLong expirations = new AtomicLong();
+  private final AtomicLong keyspaceHits = new AtomicLong();
+  private final AtomicLong keyspaceMisses = new AtomicLong();
+  private final ScheduledExecutorService perSecondExecutor;
+  private volatile double networkKiloBytesReadOverLastSecond;
+  private volatile long opsPerformedLastTick;
+  private double opsPerformedOverLastSecond;
+  private long previousNetworkBytesRead;
+  private final StatisticsClock clock;
+  private final GeodeRedisStats geodeRedisStats;
+  private final long START_TIME_IN_NANOS;
+
+  public RedisStats(StatisticsClock clock,
+      GeodeRedisStats geodeRedisStats) {
+
+    this.clock = clock;
+    this.geodeRedisStats = geodeRedisStats;
+    perSecondExecutor = startPerSecondUpdater();
+    START_TIME_IN_NANOS = clock.getTime();
+  }
+
+
+  public void incCommandsProcessed() {
+    commandsProcessed.incrementAndGet();
+    geodeRedisStats.incrementCommandsProcessed();
+  }
+
+  public long getCommandsProcessed() {
+    return commandsProcessed.get();
+  }
+
+  public void addClient() {
+    totalConnectionsReceived.incrementAndGet();
+    currentlyConnectedClients.incrementAndGet();
+    geodeRedisStats.addClient();
+  }
+
+  public void removeClient() {
+    currentlyConnectedClients.decrementAndGet();
+    geodeRedisStats.removeClient();
+  }
+
+  public long getTotalConnectionsReceived() {
+    return totalConnectionsReceived.get();
+  }
+
+  public long getConnectedClients() {
+    return currentlyConnectedClients.get();
+  }
+
+  public double getNetworkKiloBytesReadOverLastSecond() {
+    return networkKiloBytesReadOverLastSecond;
+  }
+
+  public double getOpsPerformedOverLastSecond() {
+    return opsPerformedOverLastSecond;
+  }
+
+  public long startCommand() {
+    return getTime();
+  }
+
+  public void endCommand(RedisCommandType command, long start) {
+    geodeRedisStats.endCommand(command, start);
+  }
+
+  public void incNetworkBytesRead(long bytesRead) {
+    totalNetworkBytesRead.addAndGet(bytesRead);
+    geodeRedisStats.incrementTotalNetworkBytesRead(bytesRead);
+  }
+
+  public long getTotalNetworkBytesRead() {
+    return totalNetworkBytesRead.get();
+  }
+
+  private long getCurrentTimeNanos() {
+    return clock.getTime();
+  }
+
+  public long getUptimeInMilliseconds() {
+    long uptimeInNanos = getCurrentTimeNanos() - START_TIME_IN_NANOS;
+    return TimeUnit.NANOSECONDS.toMillis(uptimeInNanos);
+  }
+
+  public long getUptimeInSeconds() {
+    return TimeUnit.MILLISECONDS.toSeconds(getUptimeInMilliseconds());
+  }
+
+  public long getUptimeInDays() {
+    return TimeUnit.MILLISECONDS.toDays(getUptimeInMilliseconds());
+  }
+
+  public void incKeyspaceHits() {
+    keyspaceHits.incrementAndGet();
+    geodeRedisStats.incrementKeyspaceHits();
+  }
+
+  public long getKeyspaceHits() {
+    return keyspaceHits.get();
+  }
+
+  public void incKeyspaceMisses() {
+    keyspaceMisses.incrementAndGet();
+    geodeRedisStats.incrementKeyspaceMisses();
+  }
+
+  public long getKeyspaceMisses() {
+    return keyspaceMisses.get();
+  }
+
+  public long startPassiveExpirationCheck() {
+    return getCurrentTimeNanos();
+  }
+
+  public void endPassiveExpirationCheck(long start, long expireCount) {
+    geodeRedisStats.endPassiveExpirationCheck(start, expireCount);
+  }
+
+  public long startExpiration() {
+    return getCurrentTimeNanos();
+  }
+
+  public void endExpiration(long start) {
+    geodeRedisStats.endExpiration(start);
+    expirations.incrementAndGet();
+  }
+
+  public void close() {
+    geodeRedisStats.close();
+    stopPerSecondUpdater();
+  }
+
+  private ScheduledExecutorService startPerSecondUpdater() {
+    int INTERVAL = 1;
+
+    ScheduledExecutorService perSecondExecutor =
+        newSingleThreadScheduledExecutor("GemFireRedis-PerSecondUpdater-");
+
+    perSecondExecutor.scheduleWithFixedDelay(
+        () -> doPerSecondUpdates(),
+        INTERVAL,
+        INTERVAL,
+        SECONDS);
+
+    return perSecondExecutor;
+  }
+
+  private void stopPerSecondUpdater() {
+    perSecondExecutor.shutdownNow();
+  }
+
+  private void doPerSecondUpdates() {
+    updateNetworkKilobytesReadLastSecond();
+    updateOpsPerformedOverLastSecond();
+  }
+
+  private void updateNetworkKilobytesReadLastSecond() {
+    long totalNetworkBytesRead = getTotalNetworkBytesRead();
+    long deltaNetworkBytesRead = totalNetworkBytesRead - previousNetworkBytesRead;
+    networkKiloBytesReadOverLastSecond = deltaNetworkBytesRead / 1000;
+    previousNetworkBytesRead = getTotalNetworkBytesRead();
+  }
+
+  private void updateOpsPerformedOverLastSecond() {
+    long totalOpsPerformed = getCommandsProcessed();
+    long opsPerformedThisTick = totalOpsPerformed - opsPerformedLastTick;
+    opsPerformedOverLastSecond = opsPerformedThisTick;
+    opsPerformedLastTick = getCommandsProcessed();
+  }
+}