blob: dc2f5cc91d6be1043664c20389fc4ee5c29619cf [file] [log] [blame]
/** Copyright 2014 TappingStone, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prediction.data.storage.elasticsearch
import io.prediction.data.storage.Event
import io.prediction.data.storage.StorageError
import io.prediction.data.storage.Events
import io.prediction.data.storage.EventJson4sSupport
import grizzled.slf4j.Logging
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.client.Client
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.node.NodeBuilder.nodeBuilder
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.ConnectTransportException
import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse
import org.elasticsearch.action.ActionListener
import org.elasticsearch.index.query.FilterBuilders
import org.elasticsearch.index.query.QueryBuilders
import org.json4s.DefaultFormats
import org.json4s.native.Serialization.{ read, write }
//import org.json4s.ext.JodaTimeSerializers
import scala.util.Try
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.ExecutionContext
class ESEvents(client: Client, index: String) extends Events with Logging {
implicit val formats = DefaultFormats + new EventJson4sSupport.DBSerializer
//implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
val typeName = "events"
override
def futureInsert(event: Event)(implicit ec: ExecutionContext):
Future[Either[StorageError, String]] = {
val response = Promise[IndexResponse]
client.prepareIndex(index, typeName)
.setSource(write(event))
.execute(new ESActionListener(response))
response.future
.map(r => Right(r.getId()))
.recover {
case e: Exception => Left(StorageError(e.toString))
}
}
override
def futureGet(eventId: String)(implicit ec: ExecutionContext):
Future[Either[StorageError, Option[Event]]] = {
val response = Promise[GetResponse]
client.prepareGet(index, typeName, eventId)
.execute(new ESActionListener(response))
response.future
.map { r =>
if (r.isExists)
Right(Some(read[Event](r.getSourceAsString)))
else
Right(None)
}.recover {
case e: Exception => Left(StorageError(e.toString))
}
}
override
def futureDelete(eventId: String)(implicit ec: ExecutionContext):
Future[Either[StorageError, Boolean]] = {
val response = Promise[DeleteResponse]
client.prepareDelete(index, typeName, eventId)
.execute(new ESActionListener(response))
response.future
.map(r => Right(r.isFound()))
.recover {
case e: Exception => Left(StorageError(e.toString))
}
}
override
def futureGetByAppId(appId: Int)(implicit ec: ExecutionContext):
Future[Either[StorageError, Iterator[Event]]] = {
val response = Promise[SearchResponse]
client.prepareSearch(index).setTypes(typeName)
.setPostFilter(FilterBuilders.termFilter("appId", appId))
.execute(new ESActionListener(response))
response.future
.map{ r =>
val dataIt = r.getHits().hits()
.map(h => read[Event](h.getSourceAsString)).toIterator
Right(dataIt)
}.recover {
case e: Exception => Left(StorageError(e.toString))
}
}
override
def futureDeleteByAppId(appId: Int)(implicit ec: ExecutionContext):
Future[Either[StorageError, Unit]] = {
val response = Promise[DeleteByQueryResponse]
client.prepareDeleteByQuery(index).setTypes(typeName)
.setQuery(QueryBuilders.termQuery("appId", appId))
.execute(new ESActionListener(response))
response.future
.map { r =>
val indexResponse = r.getIndex(index)
val numFailures = indexResponse.getFailedShards()
if (numFailures != 0)
Left(StorageError(s"Failed to delete ${numFailures} shards."))
else
Right(())
}.recover {
case e: Exception => Left(StorageError(e.toString))
}
}
}
class ESActionListener[T](val p: Promise[T]) extends ActionListener[T]{
override def onResponse(r: T) = {
p.success(r)
}
override def onFailure(e: Throwable) = {
p.failure(e)
}
}