blob: 565ec886910cae2928098d31a5ff2ba645538576 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system
import org.apache.samza.util.{Clock, Logging, SystemClock}
import org.apache.samza.SamzaException
import scala.collection.JavaConverters._
import scala.collection.mutable
/**
* Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
* 5 seconds), so that we can make many metadata requests in quick succession without
* hammering the actual systems. This is useful for example during task startup, when
* each task independently fetches the offsets for own partition.
*/
class StreamMetadataCache (
/** System implementations from which the actual metadata is loaded on cache miss */
systemAdmins: SystemAdmins,
/** Maximum age (in milliseconds) of a cache entry */
val cacheTTLms: Int = 5000,
/** Clock used for determining expiry (for mocking in tests) */
clock: Clock = SystemClock.instance) extends Logging {
private case class CacheEntry(metadata: SystemStreamMetadata, lastRefreshMs: Long)
private var cache = Map[SystemStream, CacheEntry]()
private val lock = new Object
/**
* Returns metadata about each of the given streams (such as first offset, newest
* offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
* using the given SystemAdmins.
*
* @param streams Set of SystemStreams for which the metadata is requested
* @param partitionsMetadataOnly Flag to indicate that only partition count metadata should be fetched/refreshed
*/
def getStreamMetadata(
streams: Set[SystemStream],
partitionsMetadataOnly: Boolean = false): Map[SystemStream, SystemStreamMetadata] = {
val time = clock.currentTimeMillis
val cacheHits = streams.flatMap(stream => getFromCache(stream, time)).toMap
val cacheMisses = (streams -- cacheHits.keySet)
.groupBy[String](_.getSystem)
.flatMap {
case (systemName, systemStreams) =>
val systemAdmin = systemAdmins.getSystemAdmin(systemName)
val streamToMetadata = if (partitionsMetadataOnly) {
systemAdmin.getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms)
} else {
systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream).asJava)
}
streamToMetadata.asScala.map {
case (streamName, metadata) => (new SystemStream(systemName, streamName) -> metadata)
}
}
val allResults = cacheHits ++ cacheMisses
val missing = streams.filter(stream => allResults.getOrElse(stream, null) == null)
if (!missing.isEmpty) {
throw new SamzaException("Cannot get metadata for unknown streams: " + missing.mkString(", "))
}
if (!partitionsMetadataOnly) {
cacheMisses.foreach { case (stream, metadata) => addToCache(stream, metadata, time) }
}
allResults
}
/**
* Returns the list of System Streams for this system.
* @param systemName
*/
def getAllSystemStreams(systemName: String): mutable.Set[SystemStream] = {
val systemAdmin = systemAdmins.getSystemAdmin(systemName)
systemAdmin.getAllSystemStreams().asScala
}
/**
* Returns metadata about the given streams. If the metadata isn't in the cache, it is retrieved from the systems
* using the given SystemAdmins.
*
* @param stream SystemStreams for which the metadata is requested
* @param partitionsMetadataOnly Flag to indicate that only partition count metadata should be fetched/refreshed
*/
def getSystemStreamMetadata(stream: SystemStream, partitionsMetadataOnly: Boolean): SystemStreamMetadata = {
getStreamMetadata(Set(stream), partitionsMetadataOnly).get(stream).orNull
}
private def getFromCache(stream: SystemStream, now: Long) = {
cache.get(stream) match {
case Some(CacheEntry(metadata, lastRefresh)) =>
if (now - lastRefresh > cacheTTLms) None else Some(stream -> metadata)
case None => None
}
}
private def addToCache(systemStream: SystemStream, metadata: SystemStreamMetadata, now: Long) {
lock synchronized {
cache += systemStream -> CacheEntry(metadata, now)
}
}
}