Add a metric to track the number of active threads of a container's thread pool (#1476)

Feature: Add a metric to track the number of active threads of a container's thread pool. This will be used to calculate the thread pool utilization of a job, which will be used as a signal for the autosizing controller to scale up or down the job thread pool size and number of containers.

Changes:
 - Add a metric to track the number of active threads of a container's thread pool.
 - Update the existing container thread pool size metric to reflect the pool size from the thread pool instead of emitting the static job.container.thread.pool.size
diff --git a/docs/learn/documentation/versioned/operations/monitoring.md b/docs/learn/documentation/versioned/operations/monitoring.md
index de2497c..0671047 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -394,6 +394,8 @@
 | | executor-work-factor | The work factor of the run loop. A work factor of 1 indicates full throughput, while a work factor of less than 1 will introduce delays into the execution to approximate the requested work factor. The work factor is set by the disk space monitor in accordance with the disk quota policy. Given the latest percentage of available disk quota, this policy returns the work factor that should be applied. |
 | | physical-memory-mb | The physical memory used by the Samza container process (native + on heap) (in MBs). |
 | | physical-memory-utilization | The ratio between the physical memory used by the Samza container process (native + on heap) and the total physical memory of the Samza container. |
+| | container-thread-pool-size | The current size of a Samza container's thread pool. It may or may not be the same as job.container.thread.pool.size, depending on the implementation. |
+| | container-active-threads | The approximate actively used threads in a Samza container's thread pool. |
 | | <TaskName\>-<StoreName\>-restore-time | Time taken to restore task stores (per task store). |
 
 
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3a4ba17..2e87f48 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -26,7 +26,7 @@
 import java.time.Duration
 import java.util
 import java.util.{Base64, Optional}
-import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, ScheduledExecutorService, ThreadPoolExecutor, TimeUnit}
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -476,7 +476,6 @@
 
     val threadPoolSize = jobConfig.getThreadPoolSize
     info("Got thread pool size: " + threadPoolSize)
-    samzaContainerMetrics.containerThreadPoolSize.set(threadPoolSize)
 
     val taskThreadPool = if (threadPoolSize > 0) {
       Executors.newFixedThreadPool(threadPoolSize,
@@ -619,16 +618,20 @@
 
     val containerMemoryMb : Int = new ClusterManagerConfig(config).getContainerMemoryMb
 
-    val memoryStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
-    memoryStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener {
+    val hostStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
+    hostStatisticsMonitor.registerListener(new SystemStatisticsMonitor.Listener {
       override def onUpdate(sample: SystemMemoryStatistics): Unit = {
         val physicalMemoryBytes : Long = sample.getPhysicalMemoryBytes
         val physicalMemoryMb : Float = physicalMemoryBytes / (1024.0F * 1024.0F)
         val memoryUtilization : Float = physicalMemoryMb.toFloat / containerMemoryMb
+        val containerThreadPoolSize : Long = taskThreadPool.asInstanceOf[ThreadPoolExecutor].getPoolSize
+        val containerActiveThreads : Long = taskThreadPool.asInstanceOf[ThreadPoolExecutor].getActiveCount
         logger.debug("Container physical memory utilization (mb): " + physicalMemoryMb)
         logger.debug("Container physical memory utilization: " + memoryUtilization)
         samzaContainerMetrics.physicalMemoryMb.set(physicalMemoryMb)
-        samzaContainerMetrics.physicalMemoryUtilization.set(memoryUtilization);
+        samzaContainerMetrics.physicalMemoryUtilization.set(memoryUtilization)
+        samzaContainerMetrics.containerThreadPoolSize.set(containerThreadPoolSize)
+        samzaContainerMetrics.containerActiveThreads.set(containerActiveThreads)
       }
     })
 
@@ -674,7 +677,7 @@
       reporters = reporters,
       jvm = jvm,
       diskSpaceMonitor = diskSpaceMonitor,
-      hostStatisticsMonitor = memoryStatisticsMonitor,
+      hostStatisticsMonitor = hostStatisticsMonitor,
       taskThreadPool = taskThreadPool,
       timerExecutor = timerExecutor,
       containerContext = containerContext,
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 46de166..33d288f 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -49,6 +49,7 @@
   val physicalMemoryMb = newGauge("physical-memory-mb", 0.0F)
   val physicalMemoryUtilization = newGauge("physical-memory-utilization", 0.0F)
   val containerThreadPoolSize = newGauge("container-thread-pool-size", 0L)
+  val containerActiveThreads = newGauge("container-active-threads", 0L)
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()