blob: 84a487afac3364b9062815a5b8eb843cb2562e85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics.reporter
import java.lang.management.ManagementFactory
import java.util
import org.apache.samza.util.Logging
import javax.management.MBeanServer
import javax.management.ObjectName
import org.apache.samza.config.Config
import org.apache.samza.metrics._
import scala.collection.JavaConverters._
import org.apache.samza.metrics.JmxUtil._
class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
var sources = Map[ReadableMetricsRegistry, String]()
var listeners = Map[ReadableMetricsRegistry, ReadableMetricsRegistryListener]()
def start() {
for ((registry, listener) <- listeners) {
// First, add a listener for all new metrics that are added.
registry.listen(listener)
// Second, add all existing metrics.
registry.getGroups.asScala.foreach(group => {
registry.getGroup(group).asScala.foreach {
case (name, metric) =>
metric.visit(new MetricsVisitor {
def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))))
def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry))))
def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry))))
})
}
})
}
}
def register(source: String, registry: ReadableMetricsRegistry) {
if (!listeners.contains(registry)) {
sources += registry -> source
listeners += registry -> new ReadableMetricsRegistryListener {
def onCounter(group: String, counter: Counter) {
registerBean(new JmxCounter(counter, getObjectName(group, counter.getName, source)))
}
def onGauge(group: String, gauge: Gauge[_]) {
registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, gauge.getName, source)))
}
def onTimer(group: String, timer: Timer) {
registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source)))
}
}
} else {
warn("Trying to re-register a registry for source %s. Ignoring." format source)
}
}
def stop() {
for ((registry, listener) <- listeners) {
registry.unlisten(listener)
}
}
def registerBean(bean: MetricMBean) {
if (!server.isRegistered(bean.objectName)) {
debug("Registering MBean for %s." format bean.objectName)
server.registerMBean(bean, bean.objectName)
}
}
}
trait MetricMBean {
def objectName(): ObjectName
}
abstract class AbstractBean(val on: ObjectName) extends MetricMBean {
override def objectName = on
}
trait JmxGaugeMBean extends MetricMBean {
def getValue(): Object
}
class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], on: ObjectName) extends JmxGaugeMBean {
def getValue = g.getValue
def objectName = on
}
trait JmxCounterMBean extends MetricMBean {
def getCount(): Long
}
class JmxCounter(c: org.apache.samza.metrics.Counter, on: ObjectName) extends JmxCounterMBean {
def getCount() = c.getCount()
def objectName = on
}
trait JmxTimerMBean extends MetricMBean {
def getAverageTime(): Double
}
class JmxTimer(t: org.apache.samza.metrics.Timer, on: ObjectName) extends JmxTimerMBean {
def getAverageTime() = t.getSnapshot().getAverage()
def objectName = on
}
class JmxReporterFactory extends MetricsReporterFactory with Logging {
def getMetricsReporter(name: String, containerName: String, config: Config) = {
info("Creating JMX reporter with name %s." format name)
new JmxReporter(ManagementFactory.getPlatformMBeanServer)
}
}