blob: 49ef272844679b9bb606d745a97e0260571e07c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.metrics.adservice.leveldb
import java.io.File
import javax.inject.Inject
import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration
import org.apache.ambari.metrics.adservice.db.MetadataDatasource
import org.iq80.leveldb.{DB, Options, WriteOptions}
import org.iq80.leveldb.impl.Iq80DBFactory
import com.google.inject.Singleton
@Singleton
class LevelDBDataSource() extends MetadataDatasource {
private var db: DB = _
@volatile var isInitialized: Boolean = false
var appConfig: AnomalyDetectionAppConfig = _
@Inject
def this(appConfig: AnomalyDetectionAppConfig) = {
this
this.appConfig = appConfig
}
override def initialize(): Unit = {
if (isInitialized) return
val configuration: MetricDefinitionDBConfiguration = appConfig.getMetricDefinitionDBConfiguration
db = createDB(new LevelDbConfig {
override val createIfMissing: Boolean = true
override val verifyChecksums: Boolean = configuration.getVerifyChecksums
override val paranoidChecks: Boolean = configuration.getPerformParanoidChecks
override val path: String = configuration.getDbDirPath
})
isInitialized = true
}
private def createDB(levelDbConfig: LevelDbConfig): DB = {
import levelDbConfig._
val options = new Options()
.createIfMissing(createIfMissing)
.paranoidChecks(paranoidChecks) // raise an error as soon as it detects an internal corruption
.verifyChecksums(verifyChecksums) // force checksum verification of all data that is read from the file system on behalf of a particular read
Iq80DBFactory.factory.open(new File(path), options)
}
override def close(): Unit = {
db.close()
}
/**
* This function obtains the associated value to a key, if there exists one.
*
* @param key
* @return the value associated with the passed key.
*/
override def get(key: Key): Option[Value] = Option(db.get(key))
/**
* This function obtains all the values
*
* @return the list of values
*/
def getAll: List[Value] = {
val values = scala.collection.mutable.MutableList.empty[Value]
val iterator = db.iterator()
iterator.seekToFirst()
while (iterator.hasNext) {
val entry: java.util.Map.Entry[Key, Value] = iterator.next()
values.+=(entry.getValue)
}
values.toList
}
/**
* This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
*
* @param toRemove which includes all the keys to be removed from the DataSource.
* @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource.
* If a key is already in the DataSource its value will be updated.
*/
override def update(toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): Unit = {
val batch = db.createWriteBatch()
toRemove.foreach { key => batch.delete(key) }
toUpsert.foreach { item => batch.put(item._1, item._2) }
db.write(batch, new WriteOptions())
}
override def put(key: Key, value: Value): Unit = {
db.put(key, value)
}
override def delete(key: Key): Unit = {
db.delete(key)
}
}
trait LevelDbConfig {
val createIfMissing: Boolean
val paranoidChecks: Boolean
val verifyChecksums: Boolean
val path: String
}