| /** Copyright 2015 TappingStone, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package io.prediction.data.storage |
| |
| import java.lang.reflect.InvocationTargetException |
| |
| import grizzled.slf4j.Logging |
| import io.prediction.annotation.DeveloperApi |
| |
| import scala.concurrent.ExecutionContext.Implicits.global |
| import scala.language.existentials |
| import scala.reflect.runtime.universe._ |
| |
| /** :: DeveloperApi :: |
| * Any storage backend drivers will need to implement this trait with exactly |
| * '''StorageClient''' as the class name. PredictionIO storage layer will look |
| * for this class when it instantiates the actual backend for use by higher |
| * level storage access APIs. |
| * |
| * @group Storage System |
| */ |
| @DeveloperApi |
| trait BaseStorageClient { |
| /** Configuration of the '''StorageClient''' */ |
| val config: StorageClientConfig |
| |
| /** The actual client object. This could be a database connection or any kind |
| * of database access object. |
| */ |
| val client: AnyRef |
| |
| /** Set a prefix for storage class discovery. As an example, if this prefix |
| * is set as ''JDBC'', when the storage layer instantiates an implementation |
| * of [[Apps]], it will try to look for a class named ''JDBCApps''. |
| */ |
| val prefix: String = "" |
| } |
| |
| /** :: DeveloperApi :: |
| * A wrapper of storage client configuration that will be populated by |
| * PredictionIO automatically, and passed to the StorageClient during |
| * instantiation. |
| * |
| * @param parallel This is set to true by PredictionIO when the storage client |
| * is instantiated in a parallel data source. |
| * @param test This is set to true by PredictionIO when tests are being run. |
| * @param properties This is populated by PredictionIO automatically from |
| * environmental configuration variables. If you have these |
| * variables, |
| * - PIO_STORAGE_SOURCES_PGSQL_TYPE=jdbc |
| * - PIO_STORAGE_SOURCES_PGSQL_USERNAME=abc |
| * - PIO_STOARGE_SOURCES_PGSQL_PASSWORD=xyz |
| * |
| * this field will be filled as a map of string to string: |
| * - TYPE -> jdbc |
| * - USERNAME -> abc |
| * - PASSWORD -> xyz |
| * |
| * @group Storage System |
| */ |
| @DeveloperApi |
| case class StorageClientConfig( |
| parallel: Boolean = false, // parallelized access (RDD)? |
| test: Boolean = false, // test mode config |
| properties: Map[String, String] = Map()) |
| |
| /** :: DeveloperApi :: |
| * Thrown when a StorageClient runs into an exceptional condition |
| * |
| * @param message Exception error message |
| * @param cause The underlying exception that caused the exception |
| * @group Storage System |
| */ |
| @DeveloperApi |
| class StorageClientException(message: String, cause: Throwable) |
| extends RuntimeException(message, cause) |
| |
| @deprecated("Use StorageException", "0.9.2") |
| private[prediction] case class StorageError(message: String) |
| |
| /** :: DeveloperApi :: |
| * Thrown by data access objects when they run into exceptional conditions |
| * |
| * @param message Exception error message |
| * @param cause The underlying exception that caused the exception |
| * |
| * @group Storage System |
| */ |
| @DeveloperApi |
| class StorageException(message: String, cause: Throwable) |
| extends Exception(message, cause) { |
| |
| def this(message: String) = this(message, null) |
| } |
| |
| /** Backend-agnostic data storage layer with lazy initialization. Use this |
| * object when you need to interface with Event Store in your engine. |
| * |
| * @group Storage System |
| */ |
| object Storage extends Logging { |
| private case class ClientMeta( |
| sourceType: String, |
| client: BaseStorageClient, |
| config: StorageClientConfig) |
| |
| private case class DataObjectMeta(sourceName: String, namespace: String) |
| |
| private var errors = 0 |
| |
| private val sourcesPrefix = "PIO_STORAGE_SOURCES" |
| |
| private val sourceTypesRegex = """PIO_STORAGE_SOURCES_([^_]+)_TYPE""".r |
| |
| private val sourceKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k => |
| sourceTypesRegex findFirstIn k match { |
| case Some(sourceTypesRegex(sourceType)) => Seq(sourceType) |
| case None => Nil |
| } |
| } |
| |
| if (sourceKeys.size == 0) warn("There is no properly configured data source.") |
| |
| private val s2cm = scala.collection.mutable.Map[String, Option[ClientMeta]]() |
| |
| /** Reference to the app data repository. */ |
| private val EventDataRepository = "EVENTDATA" |
| private val ModelDataRepository = "MODELDATA" |
| private val MetaDataRepository = "METADATA" |
| |
| private val repositoriesPrefix = "PIO_STORAGE_REPOSITORIES" |
| |
| private val repositoryNamesRegex = |
| """PIO_STORAGE_REPOSITORIES_([^_]+)_NAME""".r |
| |
| private val repositoryKeys: Seq[String] = sys.env.keys.toSeq.flatMap { k => |
| repositoryNamesRegex findFirstIn k match { |
| case Some(repositoryNamesRegex(repositoryName)) => Seq(repositoryName) |
| case None => Nil |
| } |
| } |
| |
| if (repositoryKeys.size == 0) { |
| warn("There is no properly configured repository.") |
| } |
| |
| private val requiredRepositories = Seq(MetaDataRepository) |
| |
| requiredRepositories foreach { r => |
| if (!repositoryKeys.contains(r)) { |
| error(s"Required repository (${r}) configuration is missing.") |
| errors += 1 |
| } |
| } |
| private val repositoriesToDataObjectMeta: Map[String, DataObjectMeta] = |
| repositoryKeys.map(r => |
| try { |
| val keyedPath = repositoriesPrefixPath(r) |
| val name = sys.env(prefixPath(keyedPath, "NAME")) |
| val sourceName = sys.env(prefixPath(keyedPath, "SOURCE")) |
| if (sourceKeys.contains(sourceName)) { |
| r -> DataObjectMeta( |
| sourceName = sourceName, |
| namespace = name) |
| } else { |
| error(s"$sourceName is not a configured storage source.") |
| r -> DataObjectMeta("", "") |
| } |
| } catch { |
| case e: Throwable => |
| error(e.getMessage) |
| errors += 1 |
| r -> DataObjectMeta("", "") |
| } |
| ).toMap |
| |
| if (errors > 0) { |
| error(s"There were $errors configuration errors. Exiting.") |
| sys.exit(errors) |
| } |
| |
| // End of constructor and field definitions and begin method definitions |
| |
| private def prefixPath(prefix: String, body: String) = s"${prefix}_$body" |
| |
| private def sourcesPrefixPath(body: String) = prefixPath(sourcesPrefix, body) |
| |
| private def repositoriesPrefixPath(body: String) = |
| prefixPath(repositoriesPrefix, body) |
| |
| private def sourcesToClientMeta( |
| source: String, |
| parallel: Boolean, |
| test: Boolean): Option[ClientMeta] = { |
| val sourceName = if (parallel) s"parallel-$source" else source |
| s2cm.getOrElseUpdate(sourceName, updateS2CM(source, parallel, test)) |
| } |
| |
| private def getClient( |
| clientConfig: StorageClientConfig, |
| pkg: String): BaseStorageClient = { |
| val className = "io.prediction.data.storage." + pkg + ".StorageClient" |
| try { |
| Class.forName(className).getConstructors()(0).newInstance(clientConfig). |
| asInstanceOf[BaseStorageClient] |
| } catch { |
| case e: ClassNotFoundException => |
| val originalClassName = pkg + ".StorageClient" |
| Class.forName(originalClassName).getConstructors()(0). |
| newInstance(clientConfig).asInstanceOf[BaseStorageClient] |
| case e: java.lang.reflect.InvocationTargetException => |
| throw e.getCause |
| } |
| } |
| |
| /** Get the StorageClient config data from PIO Framework's environment variables */ |
| def getConfig(sourceName: String): Option[StorageClientConfig] = { |
| if (s2cm.contains(sourceName) && s2cm.get(sourceName).nonEmpty |
| && s2cm.get(sourceName).get.nonEmpty) { |
| Some(s2cm.get(sourceName).get.get.config) |
| } else None |
| } |
| |
| private def updateS2CM(k: String, parallel: Boolean, test: Boolean): |
| Option[ClientMeta] = { |
| try { |
| val keyedPath = sourcesPrefixPath(k) |
| val sourceType = sys.env(prefixPath(keyedPath, "TYPE")) |
| val props = sys.env.filter(t => t._1.startsWith(keyedPath)).map( |
| t => t._1.replace(s"${keyedPath}_", "") -> t._2) |
| val clientConfig = StorageClientConfig( |
| properties = props, |
| parallel = parallel, |
| test = test) |
| val client = getClient(clientConfig, sourceType) |
| Some(ClientMeta(sourceType, client, clientConfig)) |
| } catch { |
| case e: Throwable => |
| error(s"Error initializing storage client for source ${k}", e) |
| errors += 1 |
| None |
| } |
| } |
| |
| private[prediction] |
| def getDataObjectFromRepo[T](repo: String, test: Boolean = false) |
| (implicit tag: TypeTag[T]): T = { |
| val repoDOMeta = repositoriesToDataObjectMeta(repo) |
| val repoDOSourceName = repoDOMeta.sourceName |
| getDataObject[T](repoDOSourceName, repoDOMeta.namespace, test = test) |
| } |
| |
| private[prediction] |
| def getPDataObject[T](repo: String)(implicit tag: TypeTag[T]): T = { |
| val repoDOMeta = repositoriesToDataObjectMeta(repo) |
| val repoDOSourceName = repoDOMeta.sourceName |
| getPDataObject[T](repoDOSourceName, repoDOMeta.namespace) |
| } |
| |
| private[prediction] def getDataObject[T]( |
| sourceName: String, |
| namespace: String, |
| parallel: Boolean = false, |
| test: Boolean = false)(implicit tag: TypeTag[T]): T = { |
| val clientMeta = sourcesToClientMeta(sourceName, parallel, test) getOrElse { |
| throw new StorageClientException( |
| s"Data source $sourceName was not properly initialized.", null) |
| } |
| val sourceType = clientMeta.sourceType |
| val ctorArgs = dataObjectCtorArgs(clientMeta.client, namespace) |
| val classPrefix = clientMeta.client.prefix |
| val originalClassName = tag.tpe.toString.split('.') |
| val rawClassName = sourceType + "." + classPrefix + originalClassName.last |
| val className = "io.prediction.data.storage." + rawClassName |
| val clazz = try { |
| Class.forName(className) |
| } catch { |
| case e: ClassNotFoundException => |
| try { |
| Class.forName(rawClassName) |
| } catch { |
| case e: ClassNotFoundException => |
| throw new StorageClientException("No storage backend " + |
| "implementation can be found (tried both " + |
| s"$className and $rawClassName)", e) |
| } |
| } |
| val constructor = clazz.getConstructors()(0) |
| try { |
| constructor.newInstance(ctorArgs: _*). |
| asInstanceOf[T] |
| } catch { |
| case e: IllegalArgumentException => |
| error( |
| "Unable to instantiate data object with class '" + |
| constructor.getDeclaringClass.getName + " because its constructor" + |
| " does not have the right number of arguments." + |
| " Number of required constructor arguments: " + |
| ctorArgs.size + "." + |
| " Number of existing constructor arguments: " + |
| constructor.getParameterTypes.size + "." + |
| s" Storage source name: ${sourceName}." + |
| s" Exception message: ${e.getMessage}).", e) |
| errors += 1 |
| throw e |
| case e: java.lang.reflect.InvocationTargetException => |
| throw e.getCause |
| } |
| } |
| |
| private def getPDataObject[T]( |
| sourceName: String, |
| databaseName: String)(implicit tag: TypeTag[T]): T = |
| getDataObject[T](sourceName, databaseName, true) |
| |
| private def dataObjectCtorArgs( |
| client: BaseStorageClient, |
| namespace: String): Seq[AnyRef] = { |
| Seq(client.client, client.config, namespace) |
| } |
| |
| private[prediction] def verifyAllDataObjects(): Unit = { |
| info("Verifying Meta Data Backend (Source: " + |
| s"${repositoriesToDataObjectMeta(MetaDataRepository).sourceName})...") |
| getMetaDataEngineManifests() |
| getMetaDataEngineInstances() |
| getMetaDataEvaluationInstances() |
| getMetaDataApps() |
| getMetaDataAccessKeys() |
| info("Verifying Model Data Backend (Source: " + |
| s"${repositoriesToDataObjectMeta(ModelDataRepository).sourceName})...") |
| getModelDataModels() |
| info("Verifying Event Data Backend (Source: " + |
| s"${repositoriesToDataObjectMeta(EventDataRepository).sourceName})...") |
| val eventsDb = getLEvents(test = true) |
| info("Test writing to Event Store (App Id 0)...") |
| // use appId=0 for testing purpose |
| eventsDb.init(0) |
| eventsDb.insert(Event( |
| event = "test", |
| entityType = "test", |
| entityId = "test"), 0) |
| eventsDb.remove(0) |
| eventsDb.close() |
| } |
| |
| private[prediction] def getMetaDataEngineManifests(): EngineManifests = |
| getDataObjectFromRepo[EngineManifests](MetaDataRepository) |
| |
| private[prediction] def getMetaDataEngineInstances(): EngineInstances = |
| getDataObjectFromRepo[EngineInstances](MetaDataRepository) |
| |
| private[prediction] def getMetaDataEvaluationInstances(): EvaluationInstances = |
| getDataObjectFromRepo[EvaluationInstances](MetaDataRepository) |
| |
| private[prediction] def getMetaDataApps(): Apps = |
| getDataObjectFromRepo[Apps](MetaDataRepository) |
| |
| private[prediction] def getMetaDataAccessKeys(): AccessKeys = |
| getDataObjectFromRepo[AccessKeys](MetaDataRepository) |
| |
| private[prediction] def getMetaDataChannels(): Channels = |
| getDataObjectFromRepo[Channels](MetaDataRepository) |
| |
| private[prediction] def getModelDataModels(): Models = |
| getDataObjectFromRepo[Models](ModelDataRepository) |
| |
| /** Obtains a data access object that returns [[Event]] related local data |
| * structure. |
| */ |
| def getLEvents(test: Boolean = false): LEvents = |
| getDataObjectFromRepo[LEvents](EventDataRepository, test = test) |
| |
| /** Obtains a data access object that returns [[Event]] related RDD data |
| * structure. |
| */ |
| def getPEvents(): PEvents = |
| getPDataObject[PEvents](EventDataRepository) |
| |
| def config: Map[String, Map[String, Map[String, String]]] = Map( |
| "sources" -> s2cm.toMap.map { case (source, clientMeta) => |
| source -> clientMeta.map { cm => |
| Map( |
| "type" -> cm.sourceType, |
| "config" -> cm.config.properties.map(t => s"${t._1} -> ${t._2}").mkString(", ") |
| ) |
| }.getOrElse(Map.empty) |
| } |
| ) |
| } |