blob: 17088588c3466945b9c636a263a6748e47e814ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.kubernetes
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.model._
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
/**
* Tracking info cache pool on flink kubernetes mode.
*/
class FlinkTrackController extends Logger with AutoCloseable {
// cache for tracking identifiers
lazy val trackIds: TrackIdCache = TrackIdCache.build()
lazy val canceling: TrackIdCache = TrackIdCache.build()
// cache for flink Job-manager rest url
lazy val endpoints: EndpointCache = EndpointCache.build()
// cache for tracking flink job status
lazy val jobStatuses: JobStatusCache = JobStatusCache.build()
// cache for tracking kubernetes events with Deployment kind
lazy val k8sDeploymentEvents: K8sDeploymentEventCache = K8sDeploymentEventCache.build()
// cache for last each flink cluster metrics (such as a session cluster or a application cluster)
lazy val flinkMetrics: MetricCache = MetricCache.build()
override def close(): Unit = {
jobStatuses.cleanUp()
k8sDeploymentEvents.cleanUp()
trackIds.cleanUp()
}
/**
* collect all tracking identifiers
*/
def collectAllTrackIds(): Set[TrackId] = trackIds.getAll()
/**
* determines whether the specified TrackId is in the trace
*/
def isInTracking(trackId: TrackId): Boolean = {
if (!trackId.isLegal) false; else {
trackIds.get(trackId) != null
}
}
def unTracking(trackId: TrackId): Unit = {
if (trackId.isLegal) {
trackIds.invalidate(trackId)
canceling.invalidate(trackId)
jobStatuses.invalidate(trackId)
flinkMetrics.invalidate(ClusterKey.of(trackId))
IngressController.deleteIngress(trackId.clusterId, trackId.namespace)
}
}
/**
* collect all legal tracking ids, and covert to ClusterKey
*/
private[kubernetes] def collectTracks(): Set[TrackId] = collectAllTrackIds().filter(_.isActive)
/**
* collect the aggregation of flink metrics that in tracking
*/
def collectAccMetric(): FlinkMetricCV = {
// get cluster metrics that in tracking
collectTracks() match {
case k if k.isEmpty => FlinkMetricCV.empty
case k =>
flinkMetrics.getAll(for (elem <- k) yield {
ClusterKey.of(elem)
}) match {
case m if m.isEmpty => FlinkMetricCV.empty
case m =>
// aggregate metrics
m.values.fold(FlinkMetricCV.empty)((x, y) => x + y)
}
}
}
/**
* get flink job-manager rest url from cache which will auto refresh when it it empty.
*/
def getClusterRestUrl(clusterKey: ClusterKey): Option[String] = {
Option(endpoints.get(clusterKey)).filter(_.nonEmpty).orElse(refreshClusterRestUrl(clusterKey))
}
/**
* refresh flink job-manager rest url from remote flink cluster, and cache it.
*/
def refreshClusterRestUrl(clusterKey: ClusterKey): Option[String] = {
val restUrl = KubernetesRetriever.retrieveFlinkRestUrl(clusterKey)
if (restUrl.nonEmpty) {
endpoints.put(clusterKey, restUrl.get)
}
restUrl
}
}
//----cache----
case class CacheKey(key: java.lang.Long) extends Serializable
class TrackIdCache {
private[this] lazy val cache: Cache[CacheKey, TrackId] = Caffeine.newBuilder.build()
def update(k: TrackId): Unit = {
val key = CacheKey(k.appId)
cache.invalidate(key)
cache.put(key, k)
}
def set(k: TrackId): Unit = cache.put(CacheKey(k.appId), k)
def invalidate(k: TrackId): Unit = cache.invalidate(CacheKey(k.appId))
def get(k: TrackId): TrackId = cache.getIfPresent(CacheKey(k.appId))
def has(k: TrackId): Boolean = get(k) != null
def getAll(): Set[TrackId] = cache.asMap().values().toSet
def cleanUp(): Unit = cache.cleanUp()
}
object TrackIdCache {
def build(): TrackIdCache = {
new TrackIdCache()
}
}
class JobStatusCache {
private[this] lazy val cache: Cache[CacheKey, JobStatusCV] = Caffeine.newBuilder.build()
def putAll(kvs: Map[TrackId, JobStatusCV]): Unit = cache.putAll(kvs.map(t => (CacheKey(t._1.appId), t._2)))
def put(k: TrackId, v: JobStatusCV): Unit = cache.put(CacheKey(k.appId), v)
def asMap(): Map[CacheKey, JobStatusCV] = cache.asMap().toMap
def getAsMap(trackIds: Set[TrackId]): Map[CacheKey, JobStatusCV] = cache.getAllPresent(trackIds.map(t => t.appId)).toMap
def get(k: TrackId): JobStatusCV = cache.getIfPresent(CacheKey(k.appId))
def invalidate(k: TrackId): Unit = cache.invalidate(CacheKey(k.appId))
def cleanUp(): Unit = cache.cleanUp()
}
object JobStatusCache {
def build(): JobStatusCache = new JobStatusCache()
}
class EndpointCache {
private[this] lazy val cache: Cache[ClusterKey, String] = Caffeine.newBuilder().expireAfterWrite(24, TimeUnit.HOURS).build()
def invalidate(k: ClusterKey): Unit = cache.invalidate(k)
def put(k: ClusterKey, v: String): Unit = cache.put(k, v)
def get(key: ClusterKey): String = cache.getIfPresent(key)
}
object EndpointCache {
def build(): EndpointCache = new EndpointCache()
}
class K8sDeploymentEventCache {
def put(k: K8sEventKey, v: K8sDeploymentEventCV): Unit = cache.put(k, v)
def get(k: K8sEventKey): K8sDeploymentEventCV = cache.getIfPresent(k)
def asMap(): Map[K8sEventKey, K8sDeploymentEventCV] = cache.asMap().toMap
def cleanUp(): Unit = cache.cleanUp()
val cache: Cache[K8sEventKey, K8sDeploymentEventCV] = Caffeine.newBuilder.build()
}
object K8sDeploymentEventCache {
def build(): K8sDeploymentEventCache = new K8sDeploymentEventCache()
}
class MetricCache {
private[this] lazy val cache: Cache[ClusterKey, FlinkMetricCV] = Caffeine.newBuilder().build()
def put(k: ClusterKey, v: FlinkMetricCV): Unit = cache.put(k, v)
def asMap(): Map[ClusterKey, FlinkMetricCV] = cache.asMap().toMap
def getAll(k: Set[ClusterKey]): Map[ClusterKey, FlinkMetricCV] = cache.getAllPresent(k).toMap
def get(key: ClusterKey): FlinkMetricCV = cache.getIfPresent(key)
def invalidate(key: ClusterKey): Unit = cache.invalidate(key)
}
object MetricCache {
def build(): MetricCache = new MetricCache()
}