| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.griffin.measure.context.streaming.checkpoint.offset |
| |
| import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} |
| import org.apache.curator.framework.imps.CuratorFrameworkState |
| import org.apache.curator.framework.recipes.locks.InterProcessMutex |
| import org.apache.curator.retry.ExponentialBackoffRetry |
| import org.apache.curator.utils.ZKPaths |
| import org.apache.zookeeper.CreateMode |
| import scala.collection.JavaConverters._ |
| import scala.util.matching.Regex |
| |
| import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK |
| |
| /** |
| * leverage zookeeper for info cache |
| * @param config |
| * @param metricName |
| */ |
| case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) |
| extends OffsetCheckpoint |
| with OffsetOps { |
| |
| val Hosts = "hosts" |
| val Namespace = "namespace" |
| val Mode = "mode" |
| val InitClear = "init.clear" |
| val CloseClear = "close.clear" |
| val LockPath = "lock.path" |
| |
| val PersistentRegex: Regex = """^(?i)persist(ent)?$""".r |
| val EphemeralRegex: Regex = """^(?i)ephemeral$""".r |
| |
| final val separator = ZKPaths.PATH_SEPARATOR |
| |
| val hosts: String = config.getOrElse(Hosts, "").toString |
| val namespace: String = config.getOrElse(Namespace, "").toString |
| val mode: CreateMode = config.get(Mode) match { |
| case Some(s: String) => |
| s match { |
| case PersistentRegex() => CreateMode.PERSISTENT |
| case EphemeralRegex() => CreateMode.EPHEMERAL |
| case _ => CreateMode.PERSISTENT |
| } |
| case _ => CreateMode.PERSISTENT |
| } |
| val initClear: Boolean = config.get(InitClear) match { |
| case Some(b: Boolean) => b |
| case _ => true |
| } |
| val closeClear: Boolean = config.get(CloseClear) match { |
| case Some(b: Boolean) => b |
| case _ => false |
| } |
| val lockPath: String = config.getOrElse(LockPath, "lock").toString |
| |
| private val cacheNamespace: String = |
| if (namespace.isEmpty) metricName else namespace + separator + metricName |
| |
| private val builder = CuratorFrameworkFactory |
| .builder() |
| .connectString(hosts) |
| .retryPolicy(new ExponentialBackoffRetry(1000, 3)) |
| .namespace(cacheNamespace) |
| private val client: CuratorFramework = builder.build |
| |
| def init(): Unit = { |
| client.start() |
| info("start zk info cache") |
| client.usingNamespace(cacheNamespace) |
| info(s"init with namespace: $cacheNamespace") |
| delete(lockPath :: Nil) |
| if (initClear) { |
| clear() |
| } |
| } |
| |
| def available(): Boolean = { |
| client.getState match { |
| case CuratorFrameworkState.STARTED => true |
| case _ => false |
| } |
| } |
| |
| def close(): Unit = { |
| if (closeClear) { |
| clear() |
| } |
| info("close zk info cache") |
| client.close() |
| } |
| |
| def cache(kvs: Map[String, String]): Unit = { |
| kvs.foreach(kv => createOrUpdate(path(kv._1), kv._2)) |
| } |
| |
| def read(keys: Iterable[String]): Map[String, String] = { |
| keys.flatMap { key => |
| read(path(key)) match { |
| case Some(v) => Some((key, v)) |
| case _ => None |
| } |
| }.toMap |
| } |
| |
| def delete(keys: Iterable[String]): Unit = { |
| keys.foreach { key => |
| delete(path(key)) |
| } |
| } |
| |
| def clear(): Unit = { |
| // delete("/") |
| delete(finalCacheInfoPath :: Nil) |
| delete(infoPath :: Nil) |
| info("clear info") |
| } |
| |
| def listKeys(p: String): List[String] = { |
| children(path(p)) |
| } |
| |
| def genLock(s: String): CheckpointLockInZK = { |
| val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s |
| CheckpointLockInZK(new InterProcessMutex(client, lpt)) |
| } |
| |
| private def path(k: String): String = { |
| if (k.startsWith(separator)) k else separator + k |
| } |
| |
| private def children(path: String): List[String] = { |
| try { |
| client.getChildren.forPath(path).asScala.toList |
| } catch { |
| case e: Throwable => |
| warn(s"list $path warn: ${e.getMessage}") |
| Nil |
| } |
| } |
| |
| private def createOrUpdate(path: String, content: String): Boolean = { |
| if (checkExists(path)) { |
| update(path, content) |
| } else { |
| create(path, content) |
| } |
| } |
| |
| private def create(path: String, content: String): Boolean = { |
| try { |
| client |
| .create() |
| .creatingParentsIfNeeded() |
| .withMode(mode) |
| .forPath(path, content.getBytes("utf-8")) |
| true |
| } catch { |
| case e: Throwable => |
| error(s"create ( $path -> $content ) error: ${e.getMessage}") |
| false |
| } |
| } |
| |
| private def update(path: String, content: String): Boolean = { |
| try { |
| client.setData().forPath(path, content.getBytes("utf-8")) |
| true |
| } catch { |
| case e: Throwable => |
| error(s"update ( $path -> $content ) error: ${e.getMessage}") |
| false |
| } |
| } |
| |
| private def read(path: String): Option[String] = { |
| try { |
| Some(new String(client.getData.forPath(path), "utf-8")) |
| } catch { |
| case e: Throwable => |
| warn(s"read $path warn: ${e.getMessage}") |
| None |
| } |
| } |
| |
| private def delete(path: String): Unit = { |
| try { |
| client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path) |
| } catch { |
| case e: Throwable => error(s"delete $path error: ${e.getMessage}") |
| } |
| } |
| |
| private def checkExists(path: String): Boolean = { |
| try { |
| client.checkExists().forPath(path) != null |
| } catch { |
| case e: Throwable => |
| warn(s"check exists $path warn: ${e.getMessage}") |
| false |
| } |
| } |
| |
| } |