blob: abec851422614c53aed00e5d45257cf07090924f [file] [log] [blame]
/** Copyright 2015 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.data.storage
import grizzled.slf4j.Logging
import scala.collection.JavaConversions._
import scala.language.existentials
import scala.reflect.runtime.universe._
import scala.concurrent.ExecutionContext.Implicits.global
@deprecated("Use StorageException", "0.9.2")
private[prediction] case class StorageError(val message: String)
private[prediction] class StorageException(message: String, cause: Throwable)
extends Exception(message, cause) {
def this(message: String) = this(message, null)
}
/**
* Backend-agnostic data storage layer with lazy initialization and connection
* pooling. Use this object when you need to interface with Event Store in your
* engine.
*/
object Storage extends Logging {
private var errors = 0
private def prefixPath(prefix: String, body: String) = s"${prefix}_${body}"
private val sourcesPrefix = "PIO_STORAGE_SOURCES"
private def sourcesPrefixPath(body: String) =
prefixPath(sourcesPrefix, body)
private val sourceTypesRegex = """PIO_STORAGE_SOURCES_([^_]+)_TYPE""".r
private val sourceKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k =>
sourceTypesRegex findFirstIn k match {
case Some(sourceTypesRegex(sourceType)) => Seq(sourceType)
case None => Nil
}
}
if (sourceKeys.size == 0) warn("There is no properly configured data source.")
private case class ClientMeta(sourceType: String, client: BaseStorageClient)
private case class DataObjectMeta(sourceName: String, databaseName: String)
private val s2cm = scala.collection.mutable.Map[String, Option[ClientMeta]]()
private def updateS2CM(k: String, parallel: Boolean, test: Boolean):
Option[ClientMeta] = {
try {
val keyedPath = sourcesPrefixPath(k)
val sourceType = sys.env(prefixPath(keyedPath, "TYPE"))
// hosts and ports are to be deprecated
val hosts = sys.env(prefixPath(keyedPath, "HOSTS")).split(',')
val ports = sys.env(prefixPath(keyedPath, "PORTS")).split(',').
map(_.toInt)
val props = sys.env.filter(t => t._1.startsWith(keyedPath))
val clientConfig = StorageClientConfig(
hosts = hosts,
ports = ports,
properties = props,
parallel = parallel,
test = test)
val client = getClient(clientConfig, sourceType)
Some(ClientMeta(sourceType, client))
} catch {
case e: Throwable =>
error(s"Error initializing storage client for source ${k}")
error(e.getMessage)
errors += 1
None
}
}
private def sourcesToClientMeta(
source: String,
parallel: Boolean,
test: Boolean): Option[ClientMeta] = {
val sourceName = if (parallel) s"parallel-${source}" else source
s2cm.getOrElseUpdate(sourceName, updateS2CM(source, parallel, test))
}
/** Reference to the app data repository. */
private val EventDataRepository = "EVENTDATA"
private val ModelDataRepository = "MODELDATA"
private val MetaDataRepository = "METADATA"
private val repositoriesPrefix = "PIO_STORAGE_REPOSITORIES"
private def repositoriesPrefixPath(body: String) =
prefixPath(repositoriesPrefix, body)
private val repositoryNamesRegex =
"""PIO_STORAGE_REPOSITORIES_([^_]+)_NAME""".r
private val repositoryKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k =>
repositoryNamesRegex findFirstIn k match {
case Some(repositoryNamesRegex(repositoryName)) => Seq(repositoryName)
case None => Nil
}
}
if (repositoryKeys.size == 0) {
warn("There is no properly configured repository.")
}
private val requiredRepositories = Seq(MetaDataRepository)
requiredRepositories foreach { r =>
if (!repositoryKeys.contains(r)) {
error(s"Required repository (${r}) configuration is missing.")
errors += 1
}
}
private val repositoriesToDataObjectMeta: Map[String, DataObjectMeta] =
repositoryKeys.map(r =>
try {
val keyedPath = repositoriesPrefixPath(r)
val name = sys.env(prefixPath(keyedPath, "NAME"))
val sourceName = sys.env(prefixPath(keyedPath, "SOURCE"))
if (sourceKeys.contains(sourceName)) {
r -> DataObjectMeta(
sourceName = sourceName,
databaseName = name)
} else {
error(s"$sourceName is not a configured storage source.")
r -> DataObjectMeta("", "")
}
} catch {
case e: Throwable =>
error(e.getMessage)
errors += 1
r -> DataObjectMeta("", "")
}
).toMap
private def getClient(
clientConfig: StorageClientConfig,
pkg: String): BaseStorageClient = {
val className = "io.prediction.data.storage." + pkg + ".StorageClient"
try {
Class.forName(className).getConstructors()(0).newInstance(clientConfig).
asInstanceOf[BaseStorageClient]
} catch {
case e: ClassNotFoundException =>
val originalClassName = pkg + ".StorageClient"
Class.forName(originalClassName).getConstructors()(0).
newInstance(clientConfig).asInstanceOf[BaseStorageClient]
case e: java.lang.reflect.InvocationTargetException =>
throw e.getCause
}
}
private[prediction]
def getDataObject[T](repo: String, test: Boolean = false)
(implicit tag: TypeTag[T]): T = {
val repoDOMeta = repositoriesToDataObjectMeta(repo)
val repoDOSourceName = repoDOMeta.sourceName
getDataObject[T](repoDOSourceName, repoDOMeta.databaseName, test = test)
}
private[prediction]
def getPDataObject[T](repo: String)(implicit tag: TypeTag[T]): T = {
val repoDOMeta = repositoriesToDataObjectMeta(repo)
val repoDOSourceName = repoDOMeta.sourceName
getPDataObject[T](repoDOSourceName, repoDOMeta.databaseName)
}
private[prediction] def getDataObject[T](
sourceName: String,
databaseName: String,
parallel: Boolean = false,
test: Boolean = false)(implicit tag: TypeTag[T]): T = {
val clientMeta = sourcesToClientMeta(sourceName, parallel, test) getOrElse {
throw new StorageClientException(
s"Data source $sourceName was not properly initialized.")
}
val sourceType = clientMeta.sourceType
val ctorArgs = dataObjectCtorArgs(clientMeta.client, databaseName)
val classPrefix = clientMeta.client.prefix
val originalClassName = tag.tpe.toString.split('.')
val rawClassName = sourceType + "." + classPrefix + originalClassName.last
val className = "io.prediction.data.storage." + rawClassName
val clazz = try {
Class.forName(className)
} catch {
case e: ClassNotFoundException => Class.forName(rawClassName)
}
val constructor = clazz.getConstructors()(0)
try {
constructor.newInstance(ctorArgs: _*).
asInstanceOf[T]
} catch {
case e: IllegalArgumentException =>
error(
"Unable to instantiate data object with class '" +
constructor.getDeclaringClass.getName + " because its constructor" +
" does not have the right number of arguments." +
" Number of required constructor arguments: " +
ctorArgs.size + "." +
" Number of existing constructor arguments: " +
constructor.getParameterTypes.size + "." +
s" Storage source name: ${sourceName}." +
s" Exception message: ${e.getMessage}).")
errors += 1
throw e
case e: java.lang.reflect.InvocationTargetException =>
throw e.getCause
}
}
private def getPDataObject[T](
sourceName: String,
databaseName: String)(implicit tag: TypeTag[T]): T =
getDataObject[T](sourceName, databaseName, true)
private def dataObjectCtorArgs(
client: BaseStorageClient,
dbName: String): Seq[AnyRef] = {
Seq(client.client, dbName)
}
private[prediction] def verifyAllDataObjects(): Unit = {
println(" Verifying Meta Data Backend")
getMetaDataEngineManifests()
getMetaDataEngineInstances()
getMetaDataEvaluationInstances()
getMetaDataApps()
getMetaDataAccessKeys()
println(" Verifying Model Data Backend")
getModelDataModels()
println(" Verifying Event Data Backend")
val eventsDb = getLEvents(test = true)
println(" Test write Event Store (App Id 0)")
// use appId=0 for testing purpose
eventsDb.init(0)
eventsDb.insert(Event(
event="test",
entityType="test",
entityId="test"), 0)
eventsDb.remove(0)
eventsDb.close()
}
private[prediction] def getMetaDataEngineManifests(): EngineManifests =
getDataObject[EngineManifests](MetaDataRepository)
private[prediction] def getMetaDataEngineInstances(): EngineInstances =
getDataObject[EngineInstances](MetaDataRepository)
private[prediction] def getMetaDataEvaluationInstances(): EvaluationInstances =
getDataObject[EvaluationInstances](MetaDataRepository)
private[prediction] def getMetaDataApps(): Apps =
getDataObject[Apps](MetaDataRepository)
private[prediction] def getMetaDataAccessKeys(): AccessKeys =
getDataObject[AccessKeys](MetaDataRepository)
private[prediction] def getMetaDataChannels(): Channels =
getDataObject[Channels](MetaDataRepository)
private[prediction] def getModelDataModels(): Models =
getDataObject[Models](ModelDataRepository)
/** Obtains a data access object that returns [[Event]] related local data
* structure.
*/
def getLEvents(test: Boolean = false): LEvents =
getDataObject[LEvents](EventDataRepository, test = test)
/** Obtains a data access object that returns [[Event]] related RDD data
* structure.
*/
def getPEvents(): PEvents =
getPDataObject[PEvents](EventDataRepository)
if (errors > 0) {
error(s"There were $errors configuration errors. Exiting.")
sys.exit(errors)
}
}
private[prediction] trait BaseStorageClient {
val config: StorageClientConfig
val client: AnyRef
val prefix: String = ""
}
private[prediction] case class StorageClientConfig(
hosts: Seq[String] = Seq(), // deprecated
ports: Seq[Int] = Seq(), // deprecated
parallel: Boolean = false, // parallelized access (RDD)?
test: Boolean = false, // test mode config
properties: Map[String, String] = Map())
private[prediction] class StorageClientException(msg: String)
extends RuntimeException(msg)