[PIO-203] Fixes pio status warnings in ES storage (#507)
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 15f223f..eef83e4 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -42,12 +42,9 @@
private val estype = "accesskeys"
private val internalIndex = index + "_" + estype
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("key" -> ("type" -> "keyword")) ~
("events" -> ("type" -> "keyword"))))
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index cb17af8..26621cf 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -40,16 +40,12 @@
extends Apps with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "apps"
+ private val seq = new ESSequences(client, config, index)
private val internalIndex = index + "_" + estype
- private val seq = new ESSequences(client, config, internalIndex)
-
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("id" -> ("type" -> "keyword")) ~
("name" -> ("type" -> "keyword"))))
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index 63b108f..ac248de 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -39,15 +39,12 @@
extends Channels with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "channels"
- private val seq = new ESSequences(client, config, internalIndex)
+ private val seq = new ESSequences(client, config, index)
private val internalIndex = index + "_" + estype
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("name" -> ("type" -> "keyword"))))
ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 02f7b98..96f8a67 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -40,13 +40,11 @@
extends EngineInstances with Logging {
implicit val formats = DefaultFormats + new EngineInstanceSerializer
private val estype = "engine_instances"
-
- ESUtils.createIndex(client, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ private val internalIndex = index + "_" + estype
+
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("status" -> ("type" -> "keyword")) ~
("startTime" -> ("type" -> "date")) ~
@@ -61,7 +59,7 @@
("algorithmsParams" -> ("type" -> "keyword")) ~
("servingParams" -> ("type" -> "keyword"))
))
- ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+ ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
def insert(i: EngineInstance): String = {
val id = i.id match {
@@ -86,7 +84,7 @@
val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
val response = client.performRequest(
"POST",
- s"/$index/$estype/",
+ s"/$internalIndex/$estype/",
Map("refresh" -> "true").asJava,
entity)
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -95,12 +93,12 @@
case "created" =>
Some((jsonResponse \ "_id").extract[String])
case _ =>
- error(s"[$result] Failed to create $index/$estype")
+ error(s"[$result] Failed to create $internalIndex/$estype")
None
}
} catch {
case e: IOException =>
- error(s"Failed to create $index/$estype", e)
+ error(s"Failed to create $internalIndex/$estype", e)
None
}
}
@@ -109,7 +107,7 @@
try {
val response = client.performRequest(
"GET",
- s"/$index/$estype/$id",
+ s"/$internalIndex/$estype/$id",
Map.empty[String, String].asJava)
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
(jsonResponse \ "found").extract[Boolean] match {
@@ -123,11 +121,11 @@
e.getResponse.getStatusLine.getStatusCode match {
case 404 => None
case _ =>
- error(s"Failed to access to /$index/$estype/$id", e)
+ error(s"Failed to access to /$internalIndex/$estype/$id", e)
None
}
case e: IOException =>
- error(s"Failed to access to /$index/$estype/$id", e)
+ error(s"Failed to access to /$internalIndex/$estype/$id", e)
None
}
}
@@ -137,10 +135,10 @@
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+ ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json)))
} catch {
case e: IOException =>
- error("Failed to access to /$index/$estype/_search", e)
+ error(s"Failed to access to /$internalIndex/$estype/_search", e)
Nil
}
}
@@ -165,10 +163,10 @@
("sort" -> List(
("startTime" ->
("order" -> "desc"))))
- ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+ ESUtils.getAll[EngineInstance](client, internalIndex, estype, compact(render(json)))
} catch {
case e: IOException =>
- error(s"Failed to access to /$index/$estype/_search", e)
+ error(s"Failed to access to /$internalIndex/$estype/_search", e)
Nil
}
}
@@ -188,7 +186,7 @@
val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
val response = client.performRequest(
"POST",
- s"/$index/$estype/$id",
+ s"/$internalIndex/$estype/$id",
Map("refresh" -> "true").asJava,
entity)
val jsonResponse = parse(EntityUtils.toString(response.getEntity))
@@ -197,11 +195,11 @@
case "created" =>
case "updated" =>
case _ =>
- error(s"[$result] Failed to update $index/$estype/$id")
+ error(s"[$result] Failed to update $internalIndex/$estype/$id")
}
} catch {
case e: IOException =>
- error(s"Failed to update $index/$estype/$id", e)
+ error(s"Failed to update $internalIndex/$estype/$id", e)
}
}
@@ -209,18 +207,18 @@
try {
val response = client.performRequest(
"DELETE",
- s"/$index/$estype/$id",
+ s"/$internalIndex/$estype/$id",
Map("refresh" -> "true").asJava)
val json = parse(EntityUtils.toString(response.getEntity))
val result = (json \ "result").extract[String]
result match {
case "deleted" =>
case _ =>
- error(s"[$result] Failed to update $index/$estype/$id")
+ error(s"[$result] Failed to update $internalIndex/$estype/$id")
}
} catch {
case e: IOException =>
- error(s"Failed to update $index/$estype/$id", e)
+ error(s"Failed to update $internalIndex/$estype/$id", e)
}
}
}
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 03b851d..0025950 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -28,7 +28,6 @@
import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
import org.apache.predictionio.data.storage.EvaluationInstances
import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.StorageClientException
import org.elasticsearch.client.{ResponseException, RestClient}
import org.json4s._
import org.json4s.JsonDSL._
@@ -41,15 +40,12 @@
extends EvaluationInstances with Logging {
implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
private val estype = "evaluation_instances"
- private val seq = new ESSequences(client, config, internalIndex)
+ private val seq = new ESSequences(client, config, index)
private val internalIndex = index + "_" + estype
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("status" -> ("type" -> "keyword")) ~
("startTime" -> ("type" -> "date")) ~
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index f275ec9..708d3d3 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -53,12 +53,9 @@
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
val index = baseIndex + "_" + estype
- ESUtils.createIndex(client, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ ESUtils.createIndex(client, index)
val json =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("name" -> ("type" -> "keyword")) ~
("eventId" -> ("type" -> "keyword")) ~
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index d43ecc6..ade0f40 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -21,7 +21,6 @@
import scala.collection.JavaConverters._
-import org.apache.http.Header
import org.apache.http.entity.ContentType
import org.apache.http.nio.entity.NStringEntity
import org.apache.http.util.EntityUtils
@@ -40,12 +39,9 @@
private val estype = "sequences"
private val internalIndex = index + "_" + estype
- ESUtils.createIndex(client, internalIndex,
- ESUtils.getNumberOfShards(config, internalIndex.toUpperCase),
- ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase))
+ ESUtils.createIndex(client, internalIndex)
val mappingJson =
(estype ->
- ("_all" -> ("enabled" -> false)) ~
("properties" ->
("n" -> ("enabled" -> false))))
ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson)))
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index cd9aa53..93d5d94 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -21,7 +21,6 @@
import scala.collection.JavaConverters._
import org.apache.http.entity.ContentType
-import org.apache.http.entity.StringEntity
import org.apache.http.nio.entity.NStringEntity
import org.elasticsearch.client.RestClient
import org.json4s._
@@ -165,23 +164,16 @@
def createIndex(
client: RestClient,
- index: String,
- numberOfShards: Option[Int],
- numberOfReplicas: Option[Int]): Unit = {
+ index: String): Unit = {
client.performRequest(
"HEAD",
s"/$index",
Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
case 404 =>
- val json = ("settings" ->
- ("number_of_shards" -> numberOfShards) ~
- ("number_of_replicas" -> numberOfReplicas))
- val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
client.performRequest(
"PUT",
s"/$index",
- Map.empty[String, String].asJava,
- entity)
+ Map.empty[String, String].asJava)
case 200 =>
case _ =>
throw new IllegalStateException(s"/$index is invalid.")
@@ -269,14 +261,6 @@
(hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
}
- def getNumberOfShards(config: StorageClientConfig, index: String): Option[Int] = {
- config.properties.get(s"${index}_NUM_OF_SHARDS").map(_.toInt)
- }
-
- def getNumberOfReplicas(config: StorageClientConfig, index: String): Option[Int] = {
- config.properties.get(s"${index}_NUM_OF_REPLICAS").map(_.toInt)
- }
-
def getEventDataRefresh(config: StorageClientConfig): String = {
config.properties.getOrElse("EVENTDATA_REFRESH", "true")
}