blob: d61f70b3cc6fdb49ee50fa4a1926dfd60d1f1390 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.s2graph.counter.helper
import com.typesafe.config.Config
import org.apache
import org.apache.s2graph.core.S2Graph
import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.counter
import org.apache.s2graph.counter.config.S2CounterConfig
import org.apache.s2graph.counter.core.{RankingCounter, ExactCounter}
import org.apache.s2graph.counter.core.v1.{RankingStorageRedis, ExactStorageAsyncHBase}
import org.apache.s2graph.counter.core.v2.{RankingStorageGraph, ExactStorageGraph, GraphOperation}
import org.apache.s2graph.counter.models.{Counter, CounterModel}
import play.api.libs.json.Json
import scala.util.Try
class CounterAdmin(config: Config) {
val s2config = new S2CounterConfig(config)
val counterModel = new CounterModel(config)
val graphOp = new GraphOperation(config)
val s2graph = new S2Graph(config)(scala.concurrent.ExecutionContext.global)
val storageManagement = new org.apache.s2graph.core.Management(s2graph)
def setupCounterOnGraph(): Unit = {
// create s2counter service
val service = "s2counter"
storageManagement.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"$service-${config.getString("phase")}", 1, None, "gz")
// create bucket label
val label = "s2counter_topK_bucket"
if (Label.findByName(label, useCache = false).isEmpty) {
val strJs =
s"""
|{
| "label": "$label",
| "srcServiceName": "s2counter",
| "srcColumnName": "dimension",
| "srcColumnType": "string",
| "tgtServiceName": "s2counter",
| "tgtColumnName": "bucket",
| "tgtColumnType": "string",
| "indices": [
| {"name": "time", "propNames": ["time_unit", "date_time"]}
| ],
| "props": [
| {"name": "time_unit", "dataType": "string", "defaultValue": ""},
| {"name": "date_time", "dataType": "long", "defaultValue": 0}
| ],
| "hTableName": "s2counter_60",
| "hTableTTL": 5184000
|}
""".stripMargin
graphOp.createLabel(Json.parse(strJs))
}
}
def createCounter(policy: Counter): Unit = {
val newPolicy = policy.copy(hbaseTable = Some(makeHTableName(policy)))
prepareStorage(newPolicy)
counterModel.createServiceAction(newPolicy)
}
def deleteCounter(service: String, action: String): Option[Try[Unit]] = {
for {
policy <- counterModel.findByServiceAction(service, action, useCache = false)
} yield {
Try {
exactCounter(policy).destroy(policy)
if (policy.useRank) {
rankingCounter(policy).destroy(policy)
}
counterModel.deleteServiceAction(policy)
}
}
}
def prepareStorage(policy: Counter): Unit = {
if (policy.rateActionId.isEmpty) {
// if defined rate action, do not use exact counter
exactCounter(policy).prepare(policy)
}
if (policy.useRank) {
rankingCounter(policy).prepare(policy)
}
}
def prepareStorage(policy: Counter, version: Byte): Unit = {
// this function to prepare storage by version parameter instead of policy.version
prepareStorage(policy.copy(version = version))
}
private val exactCounterMap = Map(
counter.VERSION_1 -> new ExactCounter(config, new ExactStorageAsyncHBase(config)),
counter.VERSION_2 -> new ExactCounter(config, new ExactStorageGraph(config))
)
private val rankingCounterMap = Map(
apache.s2graph.counter.VERSION_1 -> new RankingCounter(config, new RankingStorageRedis(config)),
apache.s2graph.counter.VERSION_2 -> new RankingCounter(config, new RankingStorageGraph(config))
)
private val tablePrefixMap = Map (
apache.s2graph.counter.VERSION_1 -> "s2counter",
apache.s2graph.counter.VERSION_2 -> "s2counter_v2"
)
def exactCounter(version: Byte): ExactCounter = exactCounterMap(version)
def exactCounter(policy: Counter): ExactCounter = exactCounter(policy.version)
def rankingCounter(version: Byte): RankingCounter = rankingCounterMap(version)
def rankingCounter(policy: Counter): RankingCounter = rankingCounter(policy.version)
def makeHTableName(policy: Counter): String = {
Seq(tablePrefixMap(policy.version), policy.service, policy.ttl) ++ policy.dailyTtl mkString "_"
}
}