blob: b95bd72e8653fc7434298fac5d77aa25730d53eb [file] [log] [blame]
/** Copyright 2014 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.data.storage
import com.github.nscala_time.time.Imports._
import org.json4s._
import org.apache.hadoop.io._
import scala.collection.JavaConversions._
package object stock {
type DailyTuple = Tuple8[DateTime, Double, Double, Double, Double, Double, Double, Boolean]
}
/**
* ItemTrend object.
*
* For now, there is only one time series attached to an ItemTrend object. We
* expect to add more, e.g. active, volume, etc..
*
* @param id ID.
* @param appid App ID that this itemTrend belongs to.
* @param ct Creation time.
* @param basetime Base time for the time series
* @param prices A time series for price.
* @param actives A time series for activeness. Set to false if field is
* missing.
*/
case class ItemTrend(
id: String,
appid: Int,
ct: DateTime,
//priceRaw: Seq[(DateTime, Double)] = List[(DateTime, Double)](),
// Date, Open, High Low, Close, Volume, Adj.Close, Active
// The active field is true if data originally not exists.
daily: Seq[stock.DailyTuple] = Vector[stock.DailyTuple]())
/**
* Base trait for implementations that interact with itemTrendTrends in the
* backend app data store.
*/
trait ItemTrends {
/** Insert a new itemTrendTrend. */
def insert(itemTrendTrend: ItemTrend): Unit
/** Get an itemTrendTrend by ID. */
def get(appid: Int, id: String): Option[ItemTrend]
/** Check if itemTrendTrend by ID exists. */
def contains(appid: Int, id: String): Boolean
/** Find all itemTrendTrends by App ID. */
def getByAppid(appid: Int): Iterator[ItemTrend]
/** Get itemTrendTrends by IDs. */
def getByIds(appid: Int, ids: Seq[String]): Seq[ItemTrend]
/** Update an itemTrend. */
//def update(itemTrend: ItemTrend): Unit
/*
def appendPrice(appid: Int, id: String, t: DateTime, p: Double): Unit
def extendPrice(appid: Int, id: String, newPrice: Seq[(DateTime, Double)]): Unit
*/
/** Delete an itemTrend. */
def delete(appid: Int, id: String): Unit
/** Delete an itemTrend. */
def delete(itemTrend: ItemTrend): Unit
/** Delete all itemTrendTrends by App ID */
def deleteByAppid(appid: Int): Unit
/** count number of records by App ID*/
def countByAppid(appid: Int): Long
}
object ItemTrendSerializer {
def apply(mapWritable: MapWritable): ItemTrend = {
val seed = ItemTrend(id = "", appid = 0, ct = DateTime.now)
mapWritable.entrySet.foldLeft(seed) { case (itemtrend, field) => {
val key: String = field.getKey.asInstanceOf[Text].toString
val value: Writable = field.getValue
key match {
case "appid" => {
itemtrend.copy(appid = value.asInstanceOf[LongWritable].get.toInt)
}
case "id" => {
itemtrend.copy(id = value.asInstanceOf[Text].toString)
}
case "ct" => {
val ctStr: String = value.asInstanceOf[Text].toString
val ct: DateTime = Utils.stringToDateTime(ctStr)
itemtrend.copy(ct = ct)
}
case "daily" => {
val array: Array[Writable] = value.asInstanceOf[ArrayWritable].get
val daily = array.map { d => {
val m: Map[String, Writable] = d.asInstanceOf[MapWritable]
.entrySet
.map(kv => (kv.getKey.toString, kv.getValue))
.toMap
new stock.DailyTuple(
Utils.stringToDateTime(m("date").asInstanceOf[Text].toString),
m("open").asInstanceOf[DoubleWritable].get,
m("high").asInstanceOf[DoubleWritable].get,
m("low").asInstanceOf[DoubleWritable].get,
m("close").asInstanceOf[DoubleWritable].get,
m("volume").asInstanceOf[DoubleWritable].get,
m("adjclose").asInstanceOf[DoubleWritable].get,
m("active").asInstanceOf[BooleanWritable].get)
}}
itemtrend.copy(daily = daily)
}
}
}}
}
}
class ItemTrendSerializer extends CustomSerializer[ItemTrend](formats => (
{
case JObject(fields) =>
val seed = ItemTrend(
id = "",
appid = 0,
ct = DateTime.now)
fields.foldLeft(seed) { case (itemtrend, field) =>
field match {
case JField("id", JString(id)) => itemtrend.copy(id = id)
case JField("appid", JInt(appid)) => itemtrend.copy(
appid = appid.intValue)
case JField("ct", JString(ct)) => itemtrend.copy(
ct = Utils.stringToDateTime(ct))
case JField("daily", JArray(d)) => itemtrend.copy(daily =
d map { _ match {
case JObject(List(
JField("date", JString(date)),
JField("open", JDouble(open)),
JField("high", JDouble(high)),
JField("low", JDouble(low)),
JField("close", JDouble(close)),
JField("volume", JDouble(volume)),
JField("adjclose", JDouble(adjclose)),
JField("active", JBool(active)))) => new stock.DailyTuple(
Utils.stringToDateTime(date),
open,
high,
low,
close,
volume,
adjclose,
active)
}
}
)
}
}
},
{
case itemtrend: ItemTrend =>
JObject(
JField("id", JString(itemtrend.id)) ::
JField("appid", JInt(itemtrend.appid)) ::
JField("ct", JString(itemtrend.ct.toString)) ::
JField("daily", JArray(itemtrend.daily.map(d =>
JObject(List(
JField("date", JString(d._1.toString)),
JField("open", JDouble(d._2)),
JField("high", JDouble(d._3)),
JField("low", JDouble(d._4)),
JField("close", JDouble(d._5)),
JField("volume", JDouble(d._6)),
JField("adjclose", JDouble(d._7)),
JField("active", JBool(d._8))))).toList)) :: Nil)
}
))