blob: 6a8b00507ca4cdc2a4178629b627e29d2858b0ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.griffin.measure.context.streaming.checkpoint.offset
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.imps.CuratorFrameworkState
import org.apache.curator.framework.recipes.locks.InterProcessMutex
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.utils.ZKPaths
import org.apache.zookeeper.CreateMode
import scala.collection.JavaConverters._
import scala.util.matching.Regex
import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
/**
* leverage zookeeper for info cache
* @param config
* @param metricName
*/
case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String)
extends OffsetCheckpoint
with OffsetOps {
val Hosts = "hosts"
val Namespace = "namespace"
val Mode = "mode"
val InitClear = "init.clear"
val CloseClear = "close.clear"
val LockPath = "lock.path"
val PersistentRegex: Regex = """^(?i)persist(ent)?$""".r
val EphemeralRegex: Regex = """^(?i)ephemeral$""".r
final val separator = ZKPaths.PATH_SEPARATOR
val hosts: String = config.getOrElse(Hosts, "").toString
val namespace: String = config.getOrElse(Namespace, "").toString
val mode: CreateMode = config.get(Mode) match {
case Some(s: String) =>
s match {
case PersistentRegex() => CreateMode.PERSISTENT
case EphemeralRegex() => CreateMode.EPHEMERAL
case _ => CreateMode.PERSISTENT
}
case _ => CreateMode.PERSISTENT
}
val initClear: Boolean = config.get(InitClear) match {
case Some(b: Boolean) => b
case _ => true
}
val closeClear: Boolean = config.get(CloseClear) match {
case Some(b: Boolean) => b
case _ => false
}
val lockPath: String = config.getOrElse(LockPath, "lock").toString
private val cacheNamespace: String =
if (namespace.isEmpty) metricName else namespace + separator + metricName
private val builder = CuratorFrameworkFactory
.builder()
.connectString(hosts)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace(cacheNamespace)
private val client: CuratorFramework = builder.build
def init(): Unit = {
client.start()
info("start zk info cache")
client.usingNamespace(cacheNamespace)
info(s"init with namespace: $cacheNamespace")
delete(lockPath :: Nil)
if (initClear) {
clear()
}
}
def available(): Boolean = {
client.getState match {
case CuratorFrameworkState.STARTED => true
case _ => false
}
}
def close(): Unit = {
if (closeClear) {
clear()
}
info("close zk info cache")
client.close()
}
def cache(kvs: Map[String, String]): Unit = {
kvs.foreach(kv => createOrUpdate(path(kv._1), kv._2))
}
def read(keys: Iterable[String]): Map[String, String] = {
keys.flatMap { key =>
read(path(key)) match {
case Some(v) => Some((key, v))
case _ => None
}
}.toMap
}
def delete(keys: Iterable[String]): Unit = {
keys.foreach { key =>
delete(path(key))
}
}
def clear(): Unit = {
// delete("/")
delete(finalCacheInfoPath :: Nil)
delete(infoPath :: Nil)
info("clear info")
}
def listKeys(p: String): List[String] = {
children(path(p))
}
def genLock(s: String): CheckpointLockInZK = {
val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
CheckpointLockInZK(new InterProcessMutex(client, lpt))
}
private def path(k: String): String = {
if (k.startsWith(separator)) k else separator + k
}
private def children(path: String): List[String] = {
try {
client.getChildren.forPath(path).asScala.toList
} catch {
case e: Throwable =>
warn(s"list $path warn: ${e.getMessage}")
Nil
}
}
private def createOrUpdate(path: String, content: String): Boolean = {
if (checkExists(path)) {
update(path, content)
} else {
create(path, content)
}
}
private def create(path: String, content: String): Boolean = {
try {
client
.create()
.creatingParentsIfNeeded()
.withMode(mode)
.forPath(path, content.getBytes("utf-8"))
true
} catch {
case e: Throwable =>
error(s"create ( $path -> $content ) error: ${e.getMessage}")
false
}
}
private def update(path: String, content: String): Boolean = {
try {
client.setData().forPath(path, content.getBytes("utf-8"))
true
} catch {
case e: Throwable =>
error(s"update ( $path -> $content ) error: ${e.getMessage}")
false
}
}
private def read(path: String): Option[String] = {
try {
Some(new String(client.getData.forPath(path), "utf-8"))
} catch {
case e: Throwable =>
warn(s"read $path warn: ${e.getMessage}")
None
}
}
private def delete(path: String): Unit = {
try {
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
} catch {
case e: Throwable => error(s"delete $path error: ${e.getMessage}")
}
}
private def checkExists(path: String): Boolean = {
try {
client.checkExists().forPath(path) != null
} catch {
case e: Throwable =>
warn(s"check exists $path warn: ${e.getMessage}")
false
}
}
}