blob: 2f9a0baf59f7aa0781ace01174573511187c9d9c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.metrics.reporter
import org.apache.samza.SamzaException
import org.apache.samza.config._
import org.apache.samza.metrics.{MetricsRegistryMap, MetricsReporter, MetricsReporterFactory}
import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, Serde, SerdeFactory}
import org.apache.samza.system.{SystemFactory, SystemProducer, SystemStream}
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{Logging, ReflectionUtil, StreamUtil, Util}
class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging {
protected def getProducer(reporterName: String, config: Config, registry: MetricsRegistryMap): SystemProducer = {
val systemConfig = new SystemConfig(config)
val systemName = getSystemStream(reporterName, config).getSystem
val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
.getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
val systemFactory = ReflectionUtil.getObj(systemFactoryClassName, classOf[SystemFactory])
info("Got system factory %s." format systemFactory)
val producer = systemFactory.getProducer(systemName, config, registry)
info("Got producer %s." format producer)
producer
}
protected def getSystemStream(reporterName: String, config: Config): SystemStream = {
val metricsConfig = new MetricsConfig(config)
val metricsSystemStreamName = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterStream(reporterName))
.toOption
.getOrElse(throw new SamzaException("No metrics stream defined in config."))
val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
info("Got system stream %s." format systemStream)
systemStream
}
protected def getSerde(reporterName: String, config: Config): Serde[MetricsSnapshot] = {
val streamConfig = new StreamConfig(config)
val systemConfig = new SystemConfig(config)
val systemStream = getSystemStream(reporterName, config)
val streamSerdeName = streamConfig.getStreamMsgSerde(systemStream)
val systemSerdeName = systemConfig.getSystemMsgSerde(systemStream.getSystem)
val serdeName = streamSerdeName.orElse(systemSerdeName.orElse(null))
val serializerConfig = new SerializerConfig(config)
val serde = if (serdeName != null) {
JavaOptionals.toRichOptional(serializerConfig.getSerdeFactoryClass(serdeName)).toOption match {
case Some(serdeClassName) =>
ReflectionUtil.getObj(serdeClassName, classOf[SerdeFactory[MetricsSnapshot]]).getSerde(serdeName, config)
case _ => null
}
} else {
new MetricsSnapshotSerdeV2
}
info("Got serde %s." format serde)
serde
}
protected def getBlacklist(reporterName: String, config: Config): Option[String] = {
val metricsConfig = new MetricsConfig(config)
val blacklist = JavaOptionals.toRichOptional(metricsConfig.getMetricsSnapshotReporterBlacklist(reporterName)).toOption
info("Got blacklist as: %s" format blacklist)
blacklist
}
protected def getReportingInterval(reporterName: String, config: Config): Int = {
val metricsConfig = new MetricsConfig(config)
val reportingInterval = metricsConfig.getMetricsSnapshotReporterInterval(reporterName)
info("Got reporting interval: %d" format reportingInterval)
reportingInterval
}
protected def getJobId(config: Config): String = {
val jobConfig = new JobConfig(config)
jobConfig.getJobId
}
protected def getJobName(config: Config): String = {
val jobConfig = new JobConfig(config)
JavaOptionals.toRichOptional(jobConfig.getName).toOption
.getOrElse(throw new SamzaException("Job name must be defined in config."))
}
def getMetricsReporter(reporterName: String, containerName: String, config: Config): MetricsReporter = {
info("Creating new metrics snapshot reporter.")
val registry = new MetricsRegistryMap
val systemStream = getSystemStream(reporterName, config)
val producer = getProducer(reporterName, config, registry)
val reportingInterval = getReportingInterval(reporterName, config);
val jobName = getJobName(config)
val jobId = getJobId(config)
val serde = getSerde(reporterName, config)
val blacklist = getBlacklist(reporterName, config)
val reporter = new MetricsSnapshotReporter(
producer,
systemStream,
reportingInterval,
jobName,
jobId,
containerName,
Util.getTaskClassVersion(config),
Util.getSamzaVersion,
Util.getLocalHost.getHostName,
serde, blacklist)
reporter.register(this.getClass.getSimpleName, registry)
reporter
}
}