| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.metrics |
| |
| import com.google.common.util.concurrent.ThreadFactoryBuilder |
| import com.sun.management.OperatingSystemMXBean |
| import com.sun.management.UnixOperatingSystemMXBean |
| import org.apache.samza.util.Logging |
| |
| import java.lang.Thread.State._ |
| import java.lang.management.ManagementFactory |
| import java.util.concurrent.Executors |
| import java.util.concurrent.TimeUnit |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection._ |
| |
| /** |
| * Straight up ripoff of Hadoop's metrics2 JvmMetrics class. |
| */ |
| class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runnable with Logging { |
| final val M = 1024 * 1024.0F |
| final val PCT = 100.0 |
| |
| val memoryMXBean = ManagementFactory.getMemoryMXBean() |
| val gcBeans = ManagementFactory.getGarbageCollectorMXBeans() |
| val threadMXBean = ManagementFactory.getThreadMXBean() |
| val osMXBean = ManagementFactory.getOperatingSystemMXBean() |
| var gcBeanCounters = Map[String, (Counter, Counter)]() |
| val executor = Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder().setNameFormat("Samza JvmMetrics Thread-%d").setDaemon(true).build()) |
| |
| // jvm metrics |
| val gMemNonHeapUsedM = newGauge("mem-non-heap-used-mb", 0.0F) |
| val gMemNonHeapCommittedM = newGauge("mem-non-heap-committed-mb", 0.0F) |
| val gMemNonHeapMaxM = newGauge("mem-non-heap-max-mb", 0.0F) |
| val gMemHeapUsedM = newGauge("mem-heap-used-mb", 0.0F) |
| val gMemHeapCommittedM = newGauge("mem-heap-committed-mb", 0.0F) |
| val gMemHeapMaxM = newGauge("mem-heap-max-mb", 0.0F) |
| val gThreadsNew = newGauge("threads-new", 0L) |
| val gThreadsRunnable = newGauge("threads-runnable", 0L) |
| val gThreadsBlocked = newGauge("threads-blocked", 0L) |
| val gThreadsWaiting = newGauge("threads-waiting", 0L) |
| val gThreadsTimedWaiting = newGauge("threads-timed-waiting", 0L) |
| val gThreadsTerminated = newGauge("threads-terminated", 0L) |
| val cGcCount = newCounter("gc-count") |
| val cGcTimeMillis = newCounter("gc-time-millis") |
| |
| // Conditional metrics. Only emitted if the Operating System supports it. |
| val gProcessCpuUsage = if (osMXBean.isInstanceOf[OperatingSystemMXBean]) newGauge("process-cpu-usage", 0.0) else null |
| val gSystemCpuUsage = if (osMXBean.isInstanceOf[OperatingSystemMXBean]) newGauge("system-cpu-usage", 0.0) else null |
| val gOpenFileDescriptorCount = if (osMXBean.isInstanceOf[UnixOperatingSystemMXBean]) newGauge("open-file-descriptor-count", 0.0) else null |
| |
| def start { |
| executor.scheduleWithFixedDelay(this, 0, 5, TimeUnit.SECONDS) |
| } |
| |
| def run { |
| debug("updating jvm metrics") |
| |
| updateMemoryUsage |
| updateGcUsage |
| updateThreadUsage |
| updateOperatingSystemMetrics |
| |
| debug("updated metrics to: [%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s]" format |
| (gMemNonHeapUsedM, gMemNonHeapCommittedM, gMemNonHeapMaxM, gMemHeapUsedM, gMemHeapCommittedM,gMemHeapMaxM, gThreadsNew, |
| gThreadsRunnable, gThreadsBlocked, gThreadsWaiting, gThreadsTimedWaiting, |
| gThreadsTerminated, cGcCount, cGcTimeMillis, gProcessCpuUsage, gSystemCpuUsage, gOpenFileDescriptorCount)) |
| } |
| |
| def stop = executor.shutdown |
| |
| private def updateMemoryUsage { |
| val memNonHeap = memoryMXBean.getNonHeapMemoryUsage() |
| val memHeap = memoryMXBean.getHeapMemoryUsage() |
| gMemNonHeapUsedM.set(memNonHeap.getUsed() / M) |
| gMemNonHeapCommittedM.set(memNonHeap.getCommitted() / M) |
| gMemNonHeapMaxM.set(memNonHeap.getMax / M) |
| gMemHeapUsedM.set(memHeap.getUsed() / M) |
| gMemHeapCommittedM.set(memHeap.getCommitted() / M) |
| gMemHeapMaxM.set(memHeap.getMax() / M) |
| } |
| |
| private def updateGcUsage { |
| var count = 0l |
| var timeMillis = 0l |
| |
| gcBeans.asScala.foreach(gcBean => { |
| val c = gcBean.getCollectionCount() |
| val t = gcBean.getCollectionTime() |
| val gcInfo = getGcInfo(gcBean.getName) |
| gcInfo._1.inc(c - gcInfo._1.getCount()) |
| gcInfo._2.inc(t - gcInfo._2.getCount()) |
| count += c |
| timeMillis += t |
| }) |
| |
| cGcCount.inc(count - cGcCount.getCount()) |
| cGcTimeMillis.inc(timeMillis - cGcTimeMillis.getCount()) |
| } |
| |
| private def getGcInfo(gcName: String): (Counter, Counter) = { |
| gcBeanCounters.get(gcName) match { |
| case Some(gcBeanCounterTuple) => gcBeanCounterTuple |
| case _ => { |
| val t = (newCounter("%s-gc-count" format gcName), newCounter("%s-gc-time-millis" format gcName)) |
| gcBeanCounters += (gcName -> t) |
| t |
| } |
| } |
| } |
| |
| private def updateThreadUsage { |
| var threadsNew = 0l |
| var threadsRunnable = 0l |
| var threadsBlocked = 0l |
| var threadsWaiting = 0l |
| var threadsTimedWaiting = 0l |
| var threadsTerminated = 0l |
| var threadIds = threadMXBean.getAllThreadIds |
| |
| threadMXBean.getThreadInfo(threadIds, 0).foreach(threadInfo => |
| Option(threadInfo) match { |
| case Some(threadInfo) => { |
| threadInfo.getThreadState match { |
| case NEW => threadsNew += 1 |
| case RUNNABLE => threadsRunnable += 1 |
| case BLOCKED => threadsBlocked += 1 |
| case WAITING => threadsWaiting += 1 |
| case TIMED_WAITING => threadsTimedWaiting += 1 |
| case TERMINATED => threadsTerminated += 1 |
| } |
| } |
| case _ => // race protection |
| }) |
| |
| gThreadsNew.set(threadsNew) |
| gThreadsRunnable.set(threadsRunnable) |
| gThreadsBlocked.set(threadsBlocked) |
| gThreadsWaiting.set(threadsWaiting) |
| gThreadsTimedWaiting.set(threadsTimedWaiting) |
| gThreadsTerminated.set(threadsTerminated) |
| } |
| |
| private def updateOperatingSystemMetrics { |
| if (osMXBean.isInstanceOf[OperatingSystemMXBean]) { |
| val operatingSystemMXBean = osMXBean.asInstanceOf[OperatingSystemMXBean] |
| gProcessCpuUsage.set(operatingSystemMXBean.getProcessCpuLoad * PCT) |
| gSystemCpuUsage.set(operatingSystemMXBean.getSystemCpuLoad * PCT) |
| |
| if (osMXBean.isInstanceOf[UnixOperatingSystemMXBean]) { |
| gOpenFileDescriptorCount.set(osMXBean.asInstanceOf[UnixOperatingSystemMXBean].getOpenFileDescriptorCount) |
| } |
| } |
| } |
| } |