blob: 06c1641fd5cee43cfc0f3c8a4609b9ce5335722a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.predictionio.tools.commands
import org.apache.predictionio.data.storage
import org.apache.predictionio.data.storage.Channel
import org.apache.predictionio.tools.EitherLogging
import org.apache.predictionio.tools.ReturnTypes._
sealed case class AppDescription(
app: storage.App,
keys: Seq[storage.AccessKey])
object App extends EitherLogging {
def create(
name: String,
id: Option[Int] = None,
description: Option[String] = None,
accessKey: String = "") : Expected[AppDescription] = {
val apps = storage.Storage.getMetaDataApps()
// get the client in the beginning so error exit right away if can't access client
val events = storage.Storage.getLEvents()
var errStr = ""
apps.getByName(name) map { app =>
errStr = s"App ${name} already exists. Aborting."
error(errStr)
errStr
} orElse {
id.flatMap { id =>
apps.get(id) map { app =>
errStr = s"App ID ${id} already exists and maps to the app '${app.name}'. " +
"Aborting."
error(errStr)
errStr
}
}
} map {err => Left(err)} getOrElse {
val newApp = storage.App(
id = id.getOrElse(0),
name = name,
description = description)
val appid = apps.insert(newApp)
appid map { id =>
val dbInit = events.init(id)
val r = if (dbInit) {
info(s"Initialized Event Store for this app ID: ${id}.")
val accessKeys = storage.Storage.getMetaDataAccessKeys
val newKey = storage.AccessKey(
key = accessKey,
appid = id,
events = Nil)
accessKeys.insert(newKey)
.map { k =>
Right(AppDescription(
app = newApp.copy(id = id),
keys = Seq(newKey.copy(key = k))))
} getOrElse {
logAndFail(s"Unable to create new access key.")
}
} else {
errStr = s"Unable to initialize Event Store for this app ID: ${id}."
try {
apps.delete(id)
} catch {
case e: Exception =>
errStr += s"""
|Failed to revert back the App meta-data change.
|The app ${name} CANNOT be used!
|Please run 'pio app delete ${name}' to delete this app!""".stripMargin
}
logAndFail(errStr)
}
events.close()
r
} getOrElse {
logAndFail(s"Unable to create new app.")
}
}
}
def list: Seq[AppDescription] = {
val apps = storage.Storage.getMetaDataApps.getAll().sortBy(_.name)
val accessKeys = storage.Storage.getMetaDataAccessKeys
apps map { app =>
AppDescription(
app = app,
keys = accessKeys.getByAppid(app.id))
}
}
def show(appName: String): Expected[(AppDescription, Seq[Channel])] = {
val apps = storage.Storage.getMetaDataApps
val accessKeys = storage.Storage.getMetaDataAccessKeys
val channels = storage.Storage.getMetaDataChannels
apps.getByName(appName) map { app =>
Right(
(AppDescription(
app = app,
keys = accessKeys.getByAppid(app.id)),
channels.getByAppid(app.id))
)
} getOrElse {
logAndFail(s"App ${appName} does not exist. Aborting.")
}
}
def delete(name: String): MaybeError = {
val events = storage.Storage.getLEvents()
try {
show(name).right.flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) =>
val delChannelStatus: MaybeError =
channels.map { ch =>
if (events.remove(appDesc.app.id, Some(ch.id))) {
info(s"Removed Event Store of the channel ID: ${ch.id}")
try {
storage.Storage.getMetaDataChannels.delete(ch.id)
info(s"Deleted channel ${ch.name}")
None
} catch {
case e: Exception =>
val errStr = s"Error deleting channel ${ch.name}."
error(errStr, e)
Some(errStr)
}
} else {
val errStr = s"Error removing Event Store of the channel ID: ${ch.id}."
error(errStr)
Some(errStr)
}
}
.flatten
.reduceOption(_ + "\n" + _)
.map(Left(_)) getOrElse Success
if (delChannelStatus.isLeft) {
return delChannelStatus
}
try {
events.remove(appDesc.app.id)
info(s"Removed Event Store for this app ID: ${appDesc.app.id}")
} catch {
case e: Exception =>
logAndFail(s"Error removing Event Store for this app. Aborting.")
}
appDesc.keys foreach { key =>
try {
storage.Storage.getMetaDataAccessKeys.delete(key.key)
info(s"Removed access key ${key.key}")
} catch {
case e: Exception =>
logAndFail(s"Error removing access key ${key.key}. Aborting.")
}
}
try {
storage.Storage.getMetaDataApps.delete(appDesc.app.id)
info(s"Deleted app ${appDesc.app.name}.")
} catch {
case e: Exception =>
logAndFail(s"Error deleting app ${appDesc.app.name}. Aborting.")
}
logAndSucceed("Done.")
}
} finally {
events.close()
}
}
def dataDelete(
name: String,
channel: Option[String] = None,
all: Boolean = false): MaybeError = {
var errStr = ""
val events = storage.Storage.getLEvents()
try {
show(name).right.flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) =>
val chanIdsToRemove: Seq[Option[Int]] =
if (all) {
channels.map(ch => Some(ch.id)) :+ None // remove default channel too
} else {
channel.map { chName =>
channels.find(ch => ch.name == chName) match {
case None =>
return logAndFail(s"""Unable to delete data for channel.
|Channel ${chName} doesn't exist.""".stripMargin)
case Some(ch) => Seq(Some(ch.id))
}
} getOrElse {
Seq(None) // for default channel
}
}
chanIdsToRemove.map { chId: Option[Int] =>
val r1 = if (events.remove(appDesc.app.id, chId)) {
if (chId.isDefined) {
info(s"Removed Event Store for the channel ID: ${chId.get}")
} else {
info(s"Removed Event Store for the app ID: ${appDesc.app.id}")
}
None
} else {
errStr =
if (chId.isDefined) s"Error removing Event Store for the channel ID: ${chId.get}."
else s"Error removing Event Store for the app ID: ${appDesc.app.id}."
error(errStr)
Some(errStr)
}
// re-create table
val dbInit = events.init(appDesc.app.id, chId)
val r2 = if (dbInit) {
if (chId.isDefined) {
info(s"Initialized Event Store for the channel ID: ${chId.get}")
} else {
info(s"Initialized Event Store for the app ID: ${appDesc.app.id}")
}
None
} else {
errStr =
if (chId.isDefined) {
s"Unable to initialize Event Store for the channel ID: ${chId.get}."
} else {
s"Unable to initialize Event tore for the app ID: ${appDesc.app.id}."
}
error(errStr)
Some(errStr)
}
Seq(r1, r2)
}
.flatten.flatten
.reduceOption(_ + "\n" + _)
.toLeft(Ok())
}
} finally {
events.close()
}
}
def channelNew(appName: String, newChannel: String): Expected[Channel] = {
val events = storage.Storage.getLEvents()
val chanStorage = storage.Storage.getMetaDataChannels
var errStr = ""
try {
show(appName).right flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) =>
if (channels.find(ch => ch.name == newChannel).isDefined) {
logAndFail(s"""Channel ${newChannel} already exists.
|Unable to create new channel.""".stripMargin)
} else if (!storage.Channel.isValidName(newChannel)) {
logAndFail(s"""Unable to create new channel.
|The channel name ${newChannel} is invalid.
|${storage.Channel.nameConstraint}""".stripMargin)
} else {
val channel = Channel(
id = 0,
appid = appDesc.app.id,
name = newChannel)
chanStorage.insert(channel) map { chanId =>
info(s"Updated Channel meta-data.")
// initialize storage
val dbInit = events.init(appDesc.app.id, Some(chanId))
if (dbInit) {
info(s"Initialized Event Store for the channel: ${newChannel}.")
info(s"Created new channel:")
info(s" Channel Name: ${newChannel}")
info(s" Channel ID: ${chanId}")
info(s" App ID: ${appDesc.app.id}")
Right(channel.copy(id = chanId))
} else {
errStr = s"""Unable to create new channel.
|Failed to initialize Event Store.""".stripMargin
error(errStr)
// reverted back the meta data
try {
chanStorage.delete(chanId)
Left(errStr)
} catch {
case e: Exception =>
val nextErrStr = (s"""
|Failed to revert back the Channel meta-data change.
|The channel ${newChannel} CANNOT be used!
|Please run 'pio app channel-delete ${appName} ${newChannel}'""" +
" to delete this channel!").stripMargin
logAndFail(errStr + nextErrStr)
}
}
} getOrElse {
logAndFail(s"""Unable to create new channel.
|Failed to update Channel meta-data.""".stripMargin)
}
}
}
} finally {
events.close()
}
}
def channelDelete(appName: String, deleteChannel: String): MaybeError = {
val chanStorage = storage.Storage.getMetaDataChannels
val events = storage.Storage.getLEvents()
try {
show(appName).right.flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) =>
val foundChannel = channels.find(ch => ch.name == deleteChannel)
foundChannel match {
case None =>
logAndFail(s"""Unable to delete channel
|Channel ${deleteChannel} doesn't exists.""".stripMargin)
case Some(channel) =>
val dbRemoved = events.remove(appDesc.app.id, Some(channel.id))
if (dbRemoved) {
info(s"Removed Event Store for this channel: ${deleteChannel}")
try {
chanStorage.delete(channel.id)
logAndSucceed(s"Deleted channel: ${deleteChannel}.")
} catch {
case e: Exception =>
logAndFail((s"""Unable to delete channel.
|Failed to update Channel meta-data.
|The channel ${deleteChannel} CANNOT be used!
|Please run 'pio app channel-delete ${appDesc.app.name} ${deleteChannel}'""" +
" to delete this channel again!").stripMargin)
}
} else {
logAndFail(s"""Unable to delete channel.
|Error removing Event Store for this channel.""".stripMargin)
}
}
}
} finally {
events.close()
}
}
}