blob: 6f33e56886cefa9b5ea2743c768c5f40ae710bea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.adservice.db
import java.sql.{Connection, SQLException}
import org.apache.ambari.metrics.adservice.common.{ADServiceConfiguration, PhoenixQueryConstants}
import org.apache.hadoop.hbase.util.RetryCounterFactory
import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider}
import java.util.concurrent.TimeUnit.SECONDS
object PhoenixAnomalyStoreAccessor {
private var datasource: PhoenixConnectionProvider = _
def initAnomalyMetricSchema(): Unit = {
val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(ADServiceConfiguration.getHBaseConf)
val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt)
val ttl = ADServiceConfiguration.getAnomalyDataTtl
try {
var conn = datasource.getConnectionRetryingOnException(retryCounterFactory)
var stmt = conn.createStatement
val methodParametersSql = String.format(PhoenixQueryConstants.CREATE_METHOD_PARAMETERS_TABLE,
PhoenixQueryConstants.METHOD_PARAMETERS_TABLE_NAME)
stmt.executeUpdate(methodParametersSql)
val pointInTimeAnomalySql = String.format(PhoenixQueryConstants.CREATE_PIT_ANOMALY_METRICS_TABLE_SQL,
PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME,
ttl.asInstanceOf[Object])
stmt.executeUpdate(pointInTimeAnomalySql)
val trendAnomalySql = String.format(PhoenixQueryConstants.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL,
PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME,
ttl.asInstanceOf[Object])
stmt.executeUpdate(trendAnomalySql)
val snapshotSql = String.format(PhoenixQueryConstants.CREATE_MODEL_SNAPSHOT_TABLE,
PhoenixQueryConstants.MODEL_SNAPSHOT)
stmt.executeUpdate(snapshotSql)
conn.commit()
} catch {
case e: SQLException => throw e
}
}
@throws[SQLException]
def getConnection: Connection = datasource.getConnection
}