- rename mysqls package to schema.
- remove GlobalIndex.
- add toBytes, fromBytes on Schema.
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index bfb5a96..ab8aec8 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -29,7 +29,7 @@
import org.apache.hadoop.hbase.regionserver.BloomType
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
import org.apache.s2graph.loader.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
index 6918ce4..21ce920 100644
--- a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
+++ b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
@@ -27,7 +27,7 @@
import org.apache.hadoop.util.ToolRunner
import org.apache.s2graph.core.{Management, PostProcess}
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
import org.apache.s2graph.core.storage.CanSKeyValue
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.types.HBaseType
diff --git a/project/Common.scala b/project/Common.scala
index 02a64bf..96109d3 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -31,6 +31,8 @@
val elastic4sVersion = "6.1.1"
+ val KafkaVersion = "0.10.2.1"
+
/** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging libraries to forward JCL and JUL logs to SLF4j */
val loggingRuntime = Seq(
"log4j" % "log4j" % "1.2.17",
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 110d5e5..cc70e97 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -34,7 +34,8 @@
"org.apache.hbase" % "hbase-server" % hbaseVersion excludeLogging() exclude("com.google.protobuf", "protobuf*"),
"org.apache.hbase" % "hbase-hadoop-compat" % hbaseVersion excludeLogging(),
"org.apache.hbase" % "hbase-hadoop2-compat" % hbaseVersion excludeLogging(),
- "org.apache.kafka" % "kafka-clients" % "0.8.2.0" excludeLogging() exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", "j*"),
+ "org.apache.kafka" % "kafka-clients" % Common.KafkaVersion excludeLogging() exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", "j*"),
+ "org.apache.kafka" %% "kafka" % Common.KafkaVersion excludeLogging() exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", "j*"),
"commons-pool" % "commons-pool" % "1.6",
"org.scalikejdbc" %% "scalikejdbc" % "2.1.4",
"com.h2database" % "h2" % "1.4.192",
@@ -53,7 +54,8 @@
"org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0",
"com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion excludeLogging(),
"com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(),
- "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging()
+ "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(),
+ "org.scala-lang.modules" %% "scala-pickling" % "0.10.1"
)
libraryDependencies := {
diff --git a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
index 00c277b..c8339c8 100644
--- a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
+++ b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
@@ -21,24 +21,15 @@
import org.apache.s2graph.core.EdgeId;
-import org.apache.s2graph.core.QueryParam;
import org.apache.s2graph.core.S2Graph;
-import org.apache.s2graph.core.index.IndexProvider;
-import org.apache.s2graph.core.index.IndexProvider$;
-import org.apache.s2graph.core.mysqls.Label;
import org.apache.s2graph.core.types.VertexId;
-import org.apache.s2graph.core.utils.logger;
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.HasContainerHolder;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Profiling;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.NoOpBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.IdentityStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
import org.apache.tinkerpop.gremlin.structure.Element;
-import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
similarity index 100%
rename from s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
rename to s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/setup.sql b/s2core/src/main/resources/org/apache/s2graph/core/schema/setup.sql
similarity index 100%
rename from s2core/src/main/resources/org/apache/s2graph/core/mysqls/setup.sql
rename to s2core/src/main/resources/org/apache/s2graph/core/schema/setup.sql
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index eb5b1da..24a4eb9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -67,6 +67,9 @@
}
object ExceptionHandler {
+ val mainBrokerKey = "kafka.metadata.broker.list"
+ val subBrokerKey = "kafka_sub.metadata.broker.list"
+
type Key = String
type Val = String
@@ -93,21 +96,39 @@
case class KafkaMessage(msg: ProducerRecord[Key, Val])
- private def toKafkaProp(config: Config) = {
- val props = new Properties()
+ def toKafkaProducer(config: Config): Option[KafkaProducer[String, String]] = {
+ val brokerKey = "kafka.producer.brokers"
+ if (config.hasPath(brokerKey)) {
+ val brokers = config.getString("kafka.producer.brokers")
+ Option(new KafkaProducer[String, String](toKafkaProp(brokers)))
+ } else {
+ None
+ }
+ }
+ def toKafkaProp(config: Config): Properties = {
/* all default configuration for new producer */
val brokers =
if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
else "localhost"
+ toKafkaProp(brokers)
+ }
+
+ /*
+ * http://kafka.apache.org/082/documentation.html#producerconfigs
+ * if we change our kafka version, make sure right configuration is set.
+ */
+ def toKafkaProp(brokers: String): Properties = {
+ val props = new Properties()
+
props.put("bootstrap.servers", brokers)
props.put("acks", "1")
props.put("buffer.memory", "33554432")
props.put("compression.type", "snappy")
props.put("retries", "0")
props.put("batch.size", "16384")
- props.put("linger.ms", "0")
+ props.put("linger.ms", "100")
props.put("max.request.size", "1048576")
props.put("receive.buffer.bytes", "32768")
props.put("send.buffer.bytes", "131072")
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
index 0478413..a73f5c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
@@ -22,7 +22,7 @@
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.JSONParser.{fromJsonToProperties, toInnerVal}
import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
import org.apache.tinkerpop.gremlin.structure.T
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
index b92e47b..81b3c13 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core
import org.apache.s2graph.core.GraphExceptions.IllegalDataTypeException
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core.rest.TemplateHelper
import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs, VertexId}
import org.apache.s2graph.core.utils.logger
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 6a12f0a..d026e5b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -24,7 +24,7 @@
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException}
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types.HBaseType._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.JSONParser._
@@ -95,7 +95,7 @@
props: Seq[Prop],
schemaVersion: String = DEFAULT_VERSION) = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
val serviceOpt = Service.findByName(serviceName, useCache = false)
serviceOpt match {
case None => throw new RuntimeException(s"create service $serviceName has not been created.")
@@ -113,7 +113,7 @@
}
def deleteColumn(serviceName: String, columnName: String, schemaVersion: String = DEFAULT_VERSION) = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
val service = Service.findByName(serviceName, useCache = false).getOrElse(throw new RuntimeException("Service not Found"))
val serviceColumns = ServiceColumn.find(service.id.get, columnName, useCache = false)
val columnNames = serviceColumns.map { serviceColumn =>
@@ -130,7 +130,7 @@
}
def deleteLabel(labelName: String): Try[Label] = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
val label = Label.findByName(labelName, useCache = false).getOrElse(throw GraphExceptions.LabelNotExistException(labelName))
Label.deleteAll(label)
label
@@ -138,7 +138,7 @@
}
def markDeletedLabel(labelName: String) = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
Label.findByName(labelName, useCache = false).foreach { label =>
// rename & delete_at column filled with current time
Label.markDeleted(label)
@@ -148,7 +148,7 @@
}
def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found"))
val labelMetaMap = label.metaPropsInvMap
@@ -162,7 +162,7 @@
}
def addProp(labelStr: String, prop: Prop) = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
val labelOpt = Label.findByName(labelStr)
val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
@@ -171,7 +171,7 @@
}
def addProps(labelStr: String, props: Seq[Prop]) = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
val labelOpt = Label.findByName(labelStr)
val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
@@ -251,7 +251,7 @@
* update label name.
*/
def updateLabelName(oldLabelName: String, newLabelName: String) = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
for {
old <- Label.findByName(oldLabelName, useCache = false)
} {
@@ -269,7 +269,7 @@
* swap label names.
*/
def swapLabelNames(leftLabel: String, rightLabel: String) = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
val tempLabel = "_" + leftLabel + "_"
Label.updateName(leftLabel, tempLabel)
Label.updateName(rightLabel, leftLabel)
@@ -338,7 +338,7 @@
preSplitSize: Int, hTableTTL: Option[Int],
compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = {
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false)
val config = toConfig(Map(
ZookeeperQuorum -> service.cluster,
@@ -366,7 +366,7 @@
props: Seq[Prop],
schemaVersion: String = DEFAULT_VERSION): ServiceColumn = {
- val serviceColumnTry = Model withTx { implicit session =>
+ val serviceColumnTry = Schema withTx { implicit session =>
val serviceOpt = Service.findByName(serviceName, useCache = false)
serviceOpt match {
case None => throw new RuntimeException(s"create service $serviceName has not been created.")
@@ -432,7 +432,7 @@
if (hTableName.isEmpty && hTableTTL.isDefined) throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
val labelOpt = Label.findByName(label, useCache = false)
- Model withTx { implicit session =>
+ Schema withTx { implicit session =>
if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.")
/* create all models */
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 8e4be5b..f7a54a9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -23,7 +23,7 @@
import com.google.protobuf.ByteString
import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 6d24c3f..21aca12 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -23,7 +23,7 @@
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.DuplicatePolicy.DuplicatePolicy
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.parsers.{Where, WhereParser}
import org.apache.s2graph.core.rest.TemplateHelper
import org.apache.s2graph.core.storage.serde.StorageSerializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 37ddf06..be57017 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
import org.apache.s2graph.core.utils.logger
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 58b1ce1..10752da 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -22,7 +22,7 @@
import org.apache.s2graph.core.S2Edge.{Props, State}
import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.io.Conversions._
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, Property, T, Vertex}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
index 85321d3..a8c92df 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core
import org.apache.s2graph.core.S2Edge.State
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.types.{InnerValLike, TargetVertexId, VertexId}
import org.apache.tinkerpop.gremlin.structure.Property
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index 2321ac8..413c1e9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -22,7 +22,7 @@
import java.util.function.BiConsumer
import org.apache.s2graph.core.S2Edge.{Props, State}
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.types._
import org.apache.tinkerpop.gremlin.structure
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, Property, T, Vertex}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
index 1e0a95b..8c609a0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
@@ -22,7 +22,7 @@
import java.util.function.BiConsumer
import org.apache.s2graph.core.S2Edge.Props
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
import org.apache.tinkerpop.gremlin.structure.Property
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 7f19cb4..7816a63 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -27,7 +27,7 @@
import org.apache.commons.configuration.{BaseConfiguration, Configuration}
import org.apache.s2graph.core.index.IndexProvider
import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.storage.rocks.RocksStorage
import org.apache.s2graph.core.storage.{MutateResponse, Storage}
@@ -181,8 +181,8 @@
config.getString("s2graph.storage.backend")
}.getOrElse("hbase")
- Model.apply(config)
- Model.loadCache()
+ Schema.apply(config)
+ Schema.loadCache()
override val management = new Management(this)
@@ -258,7 +258,7 @@
override def shutdown(modelDataDelete: Boolean = false): Unit =
if (running.compareAndSet(true, false)) {
flushStorage()
- Model.shutdown(modelDataDelete)
+ Schema.shutdown(modelDataDelete)
defaultStorage.shutdown()
localLongId.set(0l)
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
index 64108db..cce05af 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
@@ -22,7 +22,7 @@
import org.apache.commons.configuration.BaseConfiguration
import org.apache.s2graph.core.Management.JsonModel.Prop
import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
-import org.apache.s2graph.core.mysqls.{ColumnMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, ServiceColumn}
import org.apache.s2graph.core.types.HBaseType
import org.apache.tinkerpop.gremlin.structure.T
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index f639e84..cbd31cc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -31,7 +31,7 @@
import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
import org.apache.s2graph.core.index.IndexProvider
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn}
import org.apache.s2graph.core.storage.{MutateResponse, Storage}
import org.apache.s2graph.core.types.{InnerValLike, VertexId}
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index 50b94de..874924c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core.types.{CanInnerValLike, InnerValLikeWithTs, VertexId}
import org.apache.tinkerpop.gremlin.structure.Graph.Features
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
index 954bab0..2de8e92 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
@@ -22,7 +22,7 @@
import java.util.function.BiConsumer
import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types._
import org.apache.tinkerpop.gremlin.structure.Vertex
import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
index 50d6526..ffd16e9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
@@ -23,7 +23,7 @@
import java.util.function.BiConsumer
import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
import org.apache.s2graph.core.types.VertexId
class S2VertexBuilder(vertex: S2VertexLike) {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index 4608ce7..7ece8c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -22,7 +22,7 @@
import java.util.function.{BiConsumer, Consumer}
import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Label, Service, ServiceColumn}
import org.apache.s2graph.core.types.{InnerValLike, VertexId}
import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, T, Vertex, VertexProperty}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
index e0abfba..01842d7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
@@ -21,7 +21,7 @@
import java.util
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
import org.apache.s2graph.core.types.{CanInnerValLike, InnerValLike}
import org.apache.tinkerpop.gremlin.structure.{Property, VertexProperty}
import play.api.libs.json.Json
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
index bdb3c00..c46dc9c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
import org.apache.s2graph.core.types.InnerValLike
object S2VertexPropertyHelper {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index c40078e..0dc2aa2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -22,7 +22,7 @@
import java.util
import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, VertexId}
import org.apache.s2graph.core.utils.{Extensions, logger}
import org.apache.tinkerpop.gremlin.structure.Edge
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
index e67d529..be1ad6e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
@@ -25,13 +25,10 @@
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.HttpClient
import com.typesafe.config.Config
-import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait.Predicate
import org.apache.s2graph.core.io.Conversions
-import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.VertexId
import org.apache.s2graph.core._
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import org.apache.tinkerpop.gremlin.structure.Property
import play.api.libs.json.{Json, Reads}
import scala.collection.JavaConverters._
@@ -40,14 +37,10 @@
import scala.util.Try
class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends IndexProvider {
-
- import GlobalIndex._
import IndexProvider._
import scala.collection.mutable
- implicit val executor = ec
-
val esClientUri = Try(config.getString("es.index.provider.client.uri")).getOrElse("localhost")
val client = HttpClient(ElasticsearchClientUri(esClientUri, 9200))
@@ -106,10 +99,10 @@
override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = {
val bulkRequests = vertices.flatMap { vertex =>
- toFields(vertex, forceToIndex).toSeq.map { fields =>
- update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields)
+ toFields(vertex, forceToIndex).toSeq.map { fields =>
+ update(vertex.id.toString()).in(new IndexAndType(VertexIndexName, TypeName)).docAsUpsert(fields)
+ }
}
- }
if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true))
else {
@@ -135,7 +128,7 @@
override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = {
val bulkRequests = edges.flatMap { edge =>
toFields(edge, forceToIndex).toSeq.map { fields =>
- update(edge.edgeId.toString()).in(new IndexAndType(GlobalIndex.EdgeIndexName, GlobalIndex.TypeName)).docAsUpsert(fields)
+ update(edge.edgeId.toString()).in(new IndexAndType(EdgeIndexName, TypeName)).docAsUpsert(fields)
}
}
@@ -185,7 +178,7 @@
queryString,
0,
1000,
- GlobalIndex.EdgeIndexName,
+ EdgeIndexName,
field,
Conversions.s2EdgeIdReads)(e => EdgeId.isValid(e).isDefined)
}
@@ -200,7 +193,7 @@
fetchInner[VertexId](queryString,
0,
1000,
- GlobalIndex.VertexIndexName,
+ VertexIndexName,
field,
Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
}
@@ -215,7 +208,7 @@
queryString,
vertexQueryParam.offset,
vertexQueryParam.limit,
- GlobalIndex.VertexIndexName,
+ VertexIndexName,
field,
Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
case None => Future.successful(empty)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index f573ff6..0220ff8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -23,7 +23,7 @@
import com.typesafe.config.Config
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types.VertexId
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
import org.apache.tinkerpop.gremlin.process.traversal.util.{AndP, OrP}
@@ -34,8 +34,23 @@
import scala.util.Try
object IndexProvider {
- import GlobalIndex._
+
+ //TODO: Fix Me
+ val hitsPerPage = 100000
val IdField = "id"
+ val vidField = "_vid_"
+ val eidField = "_eid_"
+ val labelField = "_label_"
+ val serviceField = "_service_"
+ val serviceColumnField = "_serviceColumn_"
+ val EdgeType = "edge"
+ val VertexType = "vertex"
+ val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField)
+
+ // val IndexName = "global_indices"
+ val VertexIndexName = "global_vertex_index"
+ val EdgeIndexName = "global_edge_index"
+ val TypeName = "test"
def apply(config: Config)(implicit ec: ExecutionContext): IndexProvider = {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
index c417022..4a3d044 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
@@ -31,7 +31,6 @@
import org.apache.lucene.store.{BaseDirectory, RAMDirectory, SimpleFSDirectory}
import org.apache.lucene.search.TopScoreDocCollector
import org.apache.s2graph.core.io.Conversions
-import org.apache.s2graph.core.mysqls.GlobalIndex
import org.apache.s2graph.core.types.VertexId
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike, VertexQueryParam}
@@ -42,8 +41,6 @@
class LuceneIndexProvider(config: Config) extends IndexProvider {
-
- import GlobalIndex._
import IndexProvider._
import scala.collection.JavaConverters._
@@ -140,7 +137,7 @@
Future.successful(mutateEdges(edges, forceToIndex))
override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] = {
- val writer = getOrElseCreateIndexWriter(GlobalIndex.VertexIndexName)
+ val writer = getOrElseCreateIndexWriter(VertexIndexName)
vertices.foreach { vertex =>
toDocument(vertex, forceToIndex).foreach { doc =>
@@ -160,7 +157,7 @@
Future.successful(mutateVertices(vertices, forceToIndex))
override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Seq[Boolean] = {
- val writer = getOrElseCreateIndexWriter(GlobalIndex.EdgeIndexName)
+ val writer = getOrElseCreateIndexWriter(EdgeIndexName)
edges.foreach { edge =>
toDocument(edge, forceToIndex).foreach { doc =>
@@ -208,7 +205,7 @@
try {
val q = new QueryParser(field, analyzer).parse(queryString)
- fetchInner[VertexId](q, 0, 100, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads)
+ fetchInner[VertexId](q, 0, 100, VertexIndexName, vidField, Conversions.s2VertexIdReads)
} catch {
case ex: ParseException =>
logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
@@ -222,7 +219,7 @@
try {
val q = new QueryParser(field, analyzer).parse(queryString)
- fetchInner[EdgeId](q, 0, 100, GlobalIndex.EdgeIndexName, field, Conversions.s2EdgeIdReads)
+ fetchInner[EdgeId](q, 0, 100, EdgeIndexName, field, Conversions.s2EdgeIdReads)
} catch {
case ex: ParseException =>
logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
@@ -239,7 +236,7 @@
val field = vidField
try {
val q = new QueryParser(field, analyzer).parse(queryString)
- fetchInner[VertexId](q, vertexQueryParam.offset, vertexQueryParam.limit, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads)
+ fetchInner[VertexId](q, vertexQueryParam.offset, vertexQueryParam.limit, VertexIndexName, vidField, Conversions.s2VertexIdReads)
} catch {
case ex: ParseException =>
logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
index 83159e2..15f1231 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.io
import org.apache.s2graph.core.{EdgeId, JSONParser, S2VertexPropertyId}
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Service, ServiceColumn}
import org.apache.s2graph.core.types.{HBaseType, InnerValLike, VertexId}
import play.api.libs.json._
import play.api.libs.json.Reads._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
deleted file mode 100644
index e71bbce..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import scalikejdbc._
-
-import scala.util.Try
-
-object Bucket extends Model[Bucket] {
-
- val rangeDelimiter = "~"
- val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.")
-
- def apply(rs: WrappedResultSet): Bucket = {
- Bucket(rs.intOpt("id"),
- rs.int("experiment_id"),
- rs.string("modular"),
- rs.string("http_verb"),
- rs.string("api_path"),
- rs.string("request_body"),
- rs.int("timeout"),
- rs.string("impression_id"),
- rs.boolean("is_graph_query"),
- rs.boolean("is_empty"))
- }
-
- def finds(experimentId: Int)(implicit session: DBSession = AutoSession): List[Bucket] = {
- val cacheKey = "experimentId=" + experimentId
- withCaches(cacheKey) {
- sql"""select * from buckets where experiment_id = $experimentId"""
- .map { rs => Bucket(rs) }.list().apply()
- }
- }
-
- def toRange(str: String): Option[(Int, Int)] = {
- val range = str.split(rangeDelimiter)
- if (range.length == 2) Option((range.head.toInt, range.last.toInt))
- else None
- }
-
- def findByImpressionId(impressionId: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Bucket] = {
- val cacheKey = "impressionId=" + impressionId
- val sql = sql"""select * from buckets where impression_id=$impressionId"""
- .map { rs => Bucket(rs)}
- if (useCache) {
- withCache(cacheKey) {
- sql.single().apply()
- }
- } else {
- sql.single().apply()
- }
- }
-
- def insert(experiment: Experiment, modular: String, httpVerb: String, apiPath: String,
- requestBody: String, timeout: Int, impressionId: String,
- isGraphQuery: Boolean, isEmpty: Boolean)
- (implicit session: DBSession = AutoSession): Try[Bucket] = {
- Try {
- sql"""
- INSERT INTO buckets(experiment_id, modular, http_verb, api_path, request_body, timeout, impression_id,
- is_graph_query, is_empty)
- VALUES (${experiment.id.get}, $modular, $httpVerb, $apiPath, $requestBody, $timeout, $impressionId,
- $isGraphQuery, $isEmpty)
- """
- .updateAndReturnGeneratedKey().apply()
- }.map { newId =>
- Bucket(Some(newId.toInt), experiment.id.get, modular, httpVerb, apiPath, requestBody, timeout, impressionId,
- isGraphQuery, isEmpty)
- }
- }
-}
-
-case class Bucket(id: Option[Int],
- experimentId: Int,
- modular: String,
- httpVerb: String, apiPath: String,
- requestBody: String, timeout: Int, impressionId: String,
- isGraphQuery: Boolean = true,
- isEmpty: Boolean = false) {
-
- import Bucket._
-
- lazy val rangeOpt = toRange(modular)
-}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
deleted file mode 100644
index 501a964..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import scalikejdbc.{AutoSession, DBSession, WrappedResultSet}
-import scalikejdbc._
-
-object GlobalIndex extends Model[GlobalIndex] {
- val vidField = "_vid_"
- val eidField = "_eid_"
- val labelField = "_label_"
- val serviceField = "_service_"
- val serviceColumnField = "_serviceColumn_"
- val EdgeType = "edge"
- val VertexType = "vertex"
- val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField)
-
-// val IndexName = "global_indices"
- val VertexIndexName = "global_vertex_index"
- val EdgeIndexName = "global_edge_index"
- val TypeName = "test"
-
- def apply(rs: WrappedResultSet): GlobalIndex = {
- GlobalIndex(rs.intOpt("id"),
- rs.string("element_type"),
- rs.string("prop_names").split(",").sorted,
- rs.string("index_name"))
- }
-
- def findBy(elementType: String, indexName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[GlobalIndex] = {
- val cacheKey = s"elementType=$elementType:indexName=$indexName"
- lazy val sql = sql"""select * from global_indices where element_type = ${elementType} and index_name = $indexName""".map { rs => GlobalIndex(rs) }.single.apply()
-
- if (useCache) withCache(cacheKey){sql}
- else sql
- }
-
- def insert(elementType: String, indexName: String, propNames: Seq[String])(implicit session: DBSession = AutoSession): Long = {
- val allPropNames = (hiddenIndexFields.toSeq ++ propNames).sorted
- sql"""insert into global_indices(element_type, prop_names, index_name)
- values($elementType, ${allPropNames.mkString(",")}, $indexName)"""
- .updateAndReturnGeneratedKey.apply()
- }
-
- def findAll(elementType: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[GlobalIndex] = {
- lazy val ls = sql"""select * from global_indices where element_type = $elementType""".map { rs => GlobalIndex(rs) }.list.apply
- if (useCache) {
- listCache.withCache(s"findAll:elementType=$elementType") {
- putsToCache(ls.map { globalIndex =>
- val cacheKey = s"elementType=${globalIndex.elementType}:indexName=${globalIndex.indexName}"
- cacheKey -> globalIndex
- })
- ls
- }
- } else {
- ls
- }
- }
-
- def findGlobalIndex(elementType: String, hasContainers: java.util.List[HasContainer])(implicit session: DBSession = AutoSession): Option[GlobalIndex] = {
- import scala.collection.JavaConversions._
- val indices = findAll(elementType, useCache = true)
- val keys = hasContainers.map(_.getKey)
-
- val sorted = indices.map { index =>
- val matched = keys.filter(index.propNamesSet)
- index -> matched.length
- }.filter(_._2 > 0).sortBy(_._2 * -1)
-
- sorted.headOption.map(_._1)
- }
-
-}
-
-case class GlobalIndex(id: Option[Int],
- elementType: String,
- propNames: Seq[String],
- indexName: String) {
- val backendIndexName = indexName + "_" + elementType
- val backendIndexNameWithType = backendIndexName + "/test1"
- lazy val propNamesSet = propNames.toSet
-}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
deleted file mode 100644
index 38e1761..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import scalikejdbc.{AutoSession, DBSession, WrappedResultSet, _}
-
-object ServiceColumnIndex extends Model[ServiceColumnIndex] {
- val dbTableName = "service_column_indices"
- val DefaultName = "_PK"
- val DefaultSeq = 1.toByte
- val MaxOrderSeq = 7
-
- def apply(rs: WrappedResultSet): ServiceColumnIndex = {
- ServiceColumnIndex(rs.intOpt("id"), rs.int("service_id"), rs.int("service_column_id"),
- rs.string("name"),
- rs.byte("seq"), rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
- case metaSeqsList => metaSeqsList
- },
- rs.stringOpt("options")
- )
- }
-
- def findById(id: Int)(implicit session: DBSession = AutoSession) = {
- val cacheKey = "id=" + id
- lazy val sql = sql"""select * from $dbTableName where id = ${id}"""
- withCache(cacheKey) {
- sql.map { rs => ServiceColumnIndex(rs) }.single.apply
- }.get
- }
-
- def findBySeqs(serviceId: Int, serviceColumnId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[ServiceColumnIndex] = {
- val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seqs=" + seqs.mkString(",")
- lazy val sql =
- sql"""
- select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and meta_seqs = ${seqs.mkString(",")}
- """
- withCache(cacheKey) {
- sql.map { rs => ServiceColumnIndex(rs) }.single.apply
- }
- }
-
- def findBySeq(serviceId: Int,
- serviceColumnId: Int,
- seq: Byte,
- useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
- val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seq=" + seq
- lazy val sql =
- sql"""
- select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and seq = ${seq}
- """
- if (useCache) {
- withCache(cacheKey)(sql.map { rs => ServiceColumnIndex(rs) }.single.apply)
- } else {
- sql.map { rs => ServiceColumnIndex(rs) }.single.apply
- }
- }
-
-
- def findAll(serviceId: Int, serviceColumnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
- val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId"
- lazy val sql =
- sql"""
- select * from $dbTableName where service_id = ${serviceId} and seq > 0 order by seq ASC
- """
- if (useCache) {
- withCaches(cacheKey)(
- sql.map { rs => ServiceColumnIndex(rs) }.list.apply
- )
- } else {
- sql.map { rs => LabelIndex(rs) }.list.apply
- }
- }
-
- def insert(serviceId: Int,
- serviceColumnId: Int,
- indexName: String,
- seq: Byte, metaSeqs: List[Byte], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
- sql"""
- insert into $dbTableName(service_id, service_column_id, name, seq, meta_seqs, options)
- values (${serviceId}, ${serviceColumnId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${options})
- """
- .updateAndReturnGeneratedKey.apply()
- }
-
- def findOrInsert(serviceId: Int,
- serviceColumnId: Int,
- indexName: String,
- metaSeqs: List[Byte],
- options: Option[String])(implicit session: DBSession = AutoSession): ServiceColumnIndex = {
- findBySeqs(serviceId, serviceColumnId, metaSeqs) match {
- case Some(s) => s
- case None =>
- val orders = findAll(serviceId, serviceColumnId, false)
- val seq = (orders.size + 1).toByte
- assert(seq <= MaxOrderSeq)
- val createdId = insert(serviceId, serviceColumnId, indexName, seq, metaSeqs, options)
- val cacheKeys = toCacheKeys(createdId.toInt, serviceId, serviceColumnId, seq, metaSeqs)
-
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- findBySeq(serviceId, serviceColumnId, seq).get
- }
- }
-
- def toCacheKeys(id: Int, serviceId: Int, serviceColumnId: Int, seq: Byte, seqs: Seq[Byte]): Seq[String] = {
- Seq(s"id=$id",
- s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seq=$seq",
- s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seqs=$seqs",
- s"serviceId=$serviceId:serviceColumnId=$serviceColumnId")
- }
-
- def delete(id: Int)(implicit session: DBSession = AutoSession) = {
- val me = findById(id)
- val seqs = me.metaSeqs.mkString(",")
- val (serviceId, serviceColumnId, seq) = (me.serviceId, me.serviceColumnId, me.seq)
- lazy val sql = sql"""delete from $dbTableName where id = ${id}"""
-
- sql.execute.apply()
-
- val cacheKeys = toCacheKeys(id, serviceId, serviceColumnId, seq, me.metaSeqs)
-
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- }
-
-// def findAll()(implicit session: DBSession = AutoSession) = {
-// val ls = sql"""select * from $dbTableName""".map { rs => ServiceColumnIndex(rs) }.list.apply
-// val singles = ls.flatMap { x =>
-// val cacheKeys = toCacheKeys(x.id.get, x.serviceId, x.serviceColumnId, x.seq, x.metaSeqs).dropRight(1)
-// cacheKeys.map { cacheKey =>
-// cacheKey -> x
-// }
-// }
-// val multies = ls.groupBy(x => (x.serviceId, x.serviceColumnId)).map { case ((serviceId, serviceColumnId), ls) =>
-// val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId"
-// cacheKey -> ls
-// }.toList
-//
-// putsToCache(singles)
-// putsToCaches(multies)
-//
-// }
-}
-
-case class ServiceColumnIndex(id: Option[Int],
- serviceId: Int,
- serviceColumnId: Int,
- name: String,
- seq: Byte,
- metaSeqs: Seq[Byte],
- options: Option[String]) {
-
-}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index 00ef233..75e9657 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.parsers
import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException}
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.types.InnerValLike
import org.apache.s2graph.core.{S2EdgeLike}
import org.apache.s2graph.core.JSONParser._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 6b1ce75..f59abc0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -26,7 +26,7 @@
import org.apache.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException}
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.parsers.{Where, WhereParser}
import org.apache.s2graph.core.types._
import play.api.libs.json._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 460c627..c768d81 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -24,7 +24,7 @@
import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import org.apache.s2graph.core.schema.{Bucket, Experiment, Service}
import org.apache.s2graph.core.utils.logger
import play.api.libs.json._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
new file mode 100644
index 0000000..c88f854
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import play.api.libs.json.{JsValue, Json}
+import scalikejdbc._
+
+import scala.util.Try
+
+object Bucket extends SQLSyntaxSupport[Bucket] {
+ import Schema._
+ val className = Bucket.getClass.getSimpleName
+
+ val rangeDelimiter = "~"
+ val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.")
+ val InActiveModulars = Set("0~0")
+
+ def valueOf(rs: WrappedResultSet): Bucket = {
+ Bucket(rs.intOpt("id"),
+ rs.int("experiment_id"),
+ rs.string("modular"),
+ rs.string("http_verb"),
+ rs.string("api_path"),
+ rs.string("request_body"),
+ rs.int("timeout"),
+ rs.string("impression_id"),
+ rs.boolean("is_graph_query"),
+ rs.boolean("is_empty"))
+ }
+
+ def finds(experimentId: Int)(implicit session: DBSession = AutoSession): List[Bucket] = {
+ val cacheKey = className + "experimentId=" + experimentId
+
+ withCaches(cacheKey, broadcast = false) {
+ sql"""select * from buckets where experiment_id = $experimentId"""
+ .map { rs => Bucket.valueOf(rs) }.list().apply()
+ }
+ }
+
+ def toRange(str: String): Option[(Int, Int)] = {
+ val range = str.split(rangeDelimiter)
+ if (range.length == 2) Option((range.head.toInt, range.last.toInt))
+ else None
+ }
+
+ def findByImpressionId(impressionId: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Bucket] = {
+ val cacheKey = className + "impressionId=" + impressionId
+
+ lazy val sql = sql"""select * from buckets where impression_id=$impressionId"""
+ .map { rs => Bucket.valueOf(rs)}.single().apply()
+
+ if (useCache) withCache(cacheKey)(sql)
+ else sql
+
+ }
+
+ def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Bucket = {
+ val cacheKey = className + "id=" + id
+ lazy val sql = sql"""select * from buckets where id = $id""".map { rs => Bucket.valueOf(rs)}.single().apply()
+ if (useCache) withCache(cacheKey, false) { sql }.get
+ else sql.get
+ }
+
+ def update(id: Int,
+ experimentId: Int,
+ modular: String,
+ httpVerb: String,
+ apiPath: String,
+ requestBody: String,
+ timeout: Int,
+ impressionId: String,
+ isGraphQuery: Boolean,
+ isEmpty: Boolean)(implicit session: DBSession = AutoSession): Try[Bucket] = {
+ Try {
+ sql"""
+ UPDATE buckets set experiment_id = $experimentId, modular = $modular, http_verb = $httpVerb, api_path = $apiPath,
+ request_body = $requestBody, timeout = $timeout, impression_id = $impressionId,
+ is_graph_query = $isGraphQuery, is_empty = $isEmpty WHERE id = $id
+ """
+ .update().apply()
+ }.map { cnt =>
+ findById(id)
+ }
+ }
+
+ def insert(experimentId: Int, modular: String, httpVerb: String, apiPath: String,
+ requestBody: String, timeout: Int, impressionId: String,
+ isGraphQuery: Boolean, isEmpty: Boolean)
+ (implicit session: DBSession = AutoSession): Try[Bucket] = {
+ Try {
+ sql"""
+ INSERT INTO buckets(experiment_id, modular, http_verb, api_path, request_body, timeout, impression_id,
+ is_graph_query, is_empty)
+ VALUES (${experimentId}, $modular, $httpVerb, $apiPath, $requestBody, $timeout, $impressionId,
+ $isGraphQuery, $isEmpty)
+ """
+ .updateAndReturnGeneratedKey().apply()
+ }.map { newId =>
+ Bucket(Some(newId.toInt), experimentId, modular, httpVerb, apiPath, requestBody, timeout, impressionId,
+ isGraphQuery, isEmpty)
+ }
+ }
+}
+
+case class Bucket(id: Option[Int],
+ experimentId: Int,
+ modular: String,
+ httpVerb: String, apiPath: String,
+ requestBody: String, timeout: Int, impressionId: String,
+ isGraphQuery: Boolean = true,
+ isEmpty: Boolean = false) {
+
+ import Bucket._
+
+ lazy val rangeOpt = toRange(modular)
+
+ def toJson(): JsValue =
+ Json.obj("id" -> id, "experimentId" -> experimentId, "modular" -> modular, "httpVerb" -> httpVerb,
+ "requestBody" -> requestBody, "isGraphQuery" -> isGraphQuery, "isEmpty" -> isEmpty)
+
+}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
similarity index 81%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
index 51f4a93..e850541 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
@@ -17,16 +17,19 @@
* under the License.
*/
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
import play.api.libs.json.Json
import scalikejdbc._
import scala.util.Try
-object ColumnMeta extends Model[ColumnMeta] {
+object ColumnMeta extends SQLSyntaxSupport[ColumnMeta] {
+ import Schema._
+ val className = ColumnMeta.getClass.getSimpleName
- val timeStampSeq = -1.toByte
+ val timeStampSeq = 0.toByte
+ val countSeq = -1.toByte
val lastModifiedAtColumnSeq = 0.toByte
val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", lastModifiedAtColumnSeq, "long", "-1")
val maxValue = Byte.MaxValue
@@ -44,8 +47,7 @@
}
def findById(id: Int)(implicit session: DBSession = AutoSession) = {
- // val cacheKey = s"id=$id"
- val cacheKey = "id=" + id
+ val cacheKey = className + "id=" + id
withCache(cacheKey) {
sql"""select * from column_metas where id = ${id}""".map { rs => ColumnMeta.valueOf(rs) }.single.apply
}.get
@@ -53,7 +55,7 @@
def findAllByColumn(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
// val cacheKey = s"columnId=$columnId"
- val cacheKey = "columnId=" + columnId
+ val cacheKey = className + "columnId=" + columnId
if (useCache) {
withCaches(cacheKey)( sql"""select *from column_metas where column_id = ${columnId} order by seq ASC"""
.map { rs => ColumnMeta.valueOf(rs) }.list.apply())
@@ -65,14 +67,12 @@
def findByName(columnId: Int, name: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
// val cacheKey = s"columnId=$columnId:name=$name"
- val cacheKey = "columnId=" + columnId + ":name=" + name
- if (useCache) {
- withCache(cacheKey)( sql"""select * from column_metas where column_id = ${columnId} and name = ${name}"""
- .map { rs => ColumnMeta.valueOf(rs) }.single.apply())
- } else {
- sql"""select * from column_metas where column_id = ${columnId} and name = ${name}"""
- .map { rs => ColumnMeta.valueOf(rs) }.single.apply()
- }
+ val cacheKey = className + "columnId=" + columnId + ":name=" + name
+ lazy val sql = sql"""select * from column_metas where column_id = ${columnId} and name = ${name}"""
+ .map { rs => ColumnMeta.valueOf(rs) }.single.apply()
+
+ if (useCache) withCache(cacheKey)(sql)
+ else sql
}
def insert(columnId: Int, name: String, dataType: String, defaultValue: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = {
@@ -95,13 +95,13 @@
case Some(c) => c
case None =>
insert(columnId, name, dataType, defaultValue, storeInGlobalIndex)
- expireCache(s"columnId=$columnId:name=$name")
+ expireCache(className + s"columnId=$columnId:name=$name")
findByName(columnId, name).get
}
}
def findByIdAndSeq(columnId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
- val cacheKey = "columnId=" + columnId + ":seq=" + seq
+ val cacheKey = className + "columnId=" + columnId + ":seq=" + seq
lazy val columnMetaOpt = sql"""
select * from column_metas where column_id = ${columnId} and seq = ${seq}
""".map { rs => ColumnMeta.valueOf(rs) }.single.apply()
@@ -114,30 +114,26 @@
val columnMeta = findById(id)
val (columnId, name) = (columnMeta.columnId, columnMeta.name)
sql"""delete from column_metas where id = ${id}""".execute.apply()
- val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", s"columnId=$columnId")
+ val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", s"colunmId=$columnId")
cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
+ expireCache(className + key)
+ expireCaches(className + key)
}
}
def findAll()(implicit session: DBSession = AutoSession) = {
val ls = sql"""select * from column_metas""".map { rs => ColumnMeta.valueOf(rs) }.list().apply()
- putsToCache(ls.map { x =>
- val cacheKey = s"id=${x.id.get}"
- (cacheKey -> x)
+ putsToCacheOption(ls.flatMap { x =>
+ Seq(
+ s"id=${x.id.get}",
+ s"columnId=${x.columnId}:name=${x.name}",
+ s"columnId=${x.columnId}:seq=${x.seq}"
+ ).map(cacheKey => (className + cacheKey, x))
})
- putsToCache(ls.map { x =>
- val cacheKey = s"columnId=${x.columnId}:name=${x.name}"
- (cacheKey -> x)
- })
- putsToCache(ls.map { x =>
- val cacheKey = s"columnId=${x.columnId}:seq=${x.seq}"
- (cacheKey -> x)
- })
+
putsToCaches(ls.groupBy(x => x.columnId).map { case (columnId, ls) =>
- val cacheKey = s"columnId=${columnId}"
+ val cacheKey = className + s"columnId=${columnId}"
(cacheKey -> ls)
}.toList)
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala
similarity index 88%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala
index 0b16449..860bf70 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala
@@ -17,14 +17,17 @@
* under the License.
*/
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
import org.apache.s2graph.core.GraphUtil
import scalikejdbc._
import scala.util.{Try, Random}
-object Experiment extends Model[Experiment] {
+object Experiment extends SQLSyntaxSupport[Experiment] {
+ import Schema._
+ val className = Experiment.getClass.getSimpleName
+
val ImpressionKey = "S2-Impression-Id"
val ImpressionId = "Impression-Id"
@@ -38,24 +41,24 @@
}
def finds(serviceId: Int)(implicit session: DBSession = AutoSession): List[Experiment] = {
- val cacheKey = "serviceId=" + serviceId
- withCaches(cacheKey) {
+ val cacheKey = className + "serviceId=" + serviceId
+ withCaches(cacheKey, false) {
sql"""select * from experiments where service_id = ${serviceId}"""
.map { rs => Experiment(rs) }.list().apply()
}
}
def findBy(serviceId: Int, name: String)(implicit session: DBSession = AutoSession): Option[Experiment] = {
- val cacheKey = "serviceId=" + serviceId + ":name=" + name
- withCache(cacheKey) {
+ val cacheKey = className + "serviceId=" + serviceId + ":name=" + name
+ withCache(cacheKey, false) {
sql"""select * from experiments where service_id = ${serviceId} and name = ${name}"""
.map { rs => Experiment(rs) }.single.apply
}
}
def findById(id: Int)(implicit session: DBSession = AutoSession): Option[Experiment] = {
- val cacheKey = "id=" + id
- withCache(cacheKey)(
+ val cacheKey = className + "id=" + id
+ withCache(cacheKey, false)(
sql"""select * from experiments where id = ${id}"""
.map { rs => Experiment(rs) }.single.apply
)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
similarity index 93%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
index c128163..7fb1183 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
import java.util.Calendar
@@ -31,7 +31,9 @@
import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
import scalikejdbc._
-object Label extends Model[Label] {
+object Label extends SQLSyntaxSupport[Label] {
+ import Schema._
+ val className = Label.getClass.getSimpleName
val maxHBaseTableNames = 2
@@ -51,9 +53,8 @@
Label.delete(id.get)
}
-
def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = {
- val cacheKey = "label=" + labelName
+ val cacheKey = className + "label=" + labelName
lazy val labelOpt =
sql"""
select *
@@ -98,7 +99,7 @@
}
def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): Option[Label] = {
- val cacheKey = "id=" + id
+ val cacheKey = className + "id=" + id
withCache(cacheKey)(
sql"""
select *
@@ -109,7 +110,7 @@
}
def findById(id: Int)(implicit session: DBSession = AutoSession): Label = {
- val cacheKey = "id=" + id
+ val cacheKey = className + "id=" + id
withCache(cacheKey)(
sql"""
select *
@@ -120,7 +121,7 @@
}
def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
- val cacheKey = "tgtColumnId=" + columnId
+ val cacheKey = className + "tgtColumnId=" + columnId
val col = ServiceColumn.findById(columnId)
withCaches(cacheKey)(
sql"""
@@ -133,7 +134,7 @@
}
def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
- val cacheKey = "srcColumnId=" + columnId
+ val cacheKey = className + "srcColumnId=" + columnId
val col = ServiceColumn.findById(columnId)
withCaches(cacheKey)(
sql"""
@@ -146,14 +147,14 @@
}
def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
- val cacheKey = "srcServiceId=" + serviceId
+ val cacheKey = className + "srcServiceId=" + serviceId
withCaches(cacheKey)(
sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
)
}
def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
- val cacheKey = "tgtServiceId=" + serviceId
+ val cacheKey = className + "tgtServiceId=" + serviceId
withCaches(cacheKey)(
sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
)
@@ -189,9 +190,9 @@
val tgtServiceId = tgtService.id.get
val serviceId = service.id.get
- /* insert serviceColumn */
- val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType))
- val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType))
+ /** insert serviceColumn */
+ val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion)
+ val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion)
if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}")
if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}")
@@ -221,7 +222,7 @@
val cacheKeys = List(s"id=$createdId", s"label=$labelName")
val ret = findByName(labelName, useCache = false).get
- putsToCache(cacheKeys.map(k => k -> ret))
+ putsToCacheOption(cacheKeys.map(k => className + k -> ret))
ret
}
}
@@ -231,17 +232,12 @@
def findAll()(implicit session: DBSession = AutoSession) = {
val ls = sql"""select * from labels where deleted_at is null""".map { rs => Label(rs) }.list().apply()
-
- putsToCache(ls.map { x =>
- val cacheKey = s"id=${x.id.get}"
- (cacheKey -> x)
+ putsToCacheOption(ls.flatMap { x =>
+ Seq(
+ s"id=${x.id.get}",
+ s"label=${x.label}"
+ ).map(cacheKey => (className + cacheKey, x))
})
-
- putsToCache(ls.map { x =>
- val cacheKey = s"label=${x.label}"
- (cacheKey -> x)
- })
-
ls
}
@@ -255,10 +251,10 @@
val cnt = sql"""update labels set hbase_table_name = $newHTableName where label = $labelName""".update().apply()
val label = Label.findByName(labelName, useCache = false).get
- val cacheKeys = List(s"id=${label.id}", s"label=${label.label}")
+ val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}")
cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
+ expireCache(className + key)
+ expireCaches(className + key)
}
cnt
}
@@ -269,8 +265,8 @@
val cnt = sql"""delete from labels where id = ${label.id.get}""".update().apply()
val cacheKeys = List(s"id=$id", s"label=${label.label}")
cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
+ expireCache(className + key)
+ expireCaches(className + key)
}
cnt
}
@@ -282,10 +278,10 @@
val now = Calendar.getInstance().getTime
val newName = s"deleted_${now.getTime}_"+ label.label
val cnt = sql"""update labels set label = ${newName}, deleted_at = ${now} where id = ${label.id.get}""".update.apply()
- val cacheKeys = List(s"id=${label.id}", s"label=${oldName}")
+ val cacheKeys = List(s"id=${label.id.get}", s"label=${oldName}")
cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
+ expireCache(className + key)
+ expireCaches(className + key)
}
cnt
}
@@ -387,17 +383,16 @@
Set.empty[String]
}
- lazy val extraOptions = Model.extraOptions(options)
+ lazy val extraOptions = Schema.extraOptions(options)
lazy val durability = extraOptions.get("durability").map(_.as[Boolean]).getOrElse(true)
lazy val storageConfigOpt: Option[Config] = toStorageConfig
def toStorageConfig: Option[Config] = {
- Model.toStorageConfig(extraOptions)
+ Schema.toStorageConfig(extraOptions)
}
-
def srcColumnWithDir(dir: Int) = {
// GraphUtil.directions("out"
if (dir == 0) srcColumn else tgtColumn
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
similarity index 79%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
index 1da0e55..bb8425f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
@@ -17,15 +17,18 @@
* under the License.
*/
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.LabelIndex.LabelIndexMutateOption
+import org.apache.s2graph.core.schema.LabelIndex.LabelIndexMutateOption
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{JsObject, JsString, Json}
import scalikejdbc._
-object LabelIndex extends Model[LabelIndex] {
+object LabelIndex extends SQLSyntaxSupport[LabelIndex] {
+ import Schema._
+ val className = LabelIndex.getClass.getSimpleName
+
val DefaultName = "_PK"
val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq)
val DefaultSeq = 1.toByte
@@ -64,14 +67,14 @@
}
def findById(id: Int)(implicit session: DBSession = AutoSession) = {
- val cacheKey = "id=" + id
+ val cacheKey = className + "id=" + id
withCache(cacheKey) {
sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply
}.get
}
def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
- val cacheKey = "labelId=" + labelId
+ val cacheKey = className + "labelId=" + labelId
if (useCache) {
withCaches(cacheKey)( sql"""
select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
@@ -105,8 +108,8 @@
s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId")
cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
+ expireCache(className + key)
+ expireCaches(className + key)
}
findByLabelIdAndSeq(labelId, seq).get
@@ -114,7 +117,7 @@
}
def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
- val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
+ val cacheKey = className + "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
withCache(cacheKey) {
sql"""
select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction}
@@ -124,7 +127,7 @@
def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
// val cacheKey = s"labelId=$labelId:seq=$seq"
- val cacheKey = "labelId=" + labelId + ":seq=" + seq
+ val cacheKey = className + "labelId=" + labelId + ":seq=" + seq
if (useCache) {
withCache(cacheKey)( sql"""
select * from label_indices where label_id = ${labelId} and seq = ${seq}
@@ -144,69 +147,29 @@
val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}")
cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
+ expireCache(className + key)
+ expireCaches(className + key)
}
}
def findAll()(implicit session: DBSession = AutoSession) = {
val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply
- putsToCache(ls.map { x =>
- var cacheKey = s"id=${x.id.get}"
- (cacheKey -> x)
+ putsToCacheOption(ls.flatMap { x =>
+ Seq(
+ s"id=${x.id.get}",
+ s"labelId=${x.labelId}:seq=${x.seq}",
+ s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
+ ).map(cacheKey => (className + cacheKey, x))
})
- putsToCache(ls.map { x =>
- var cacheKey = s"labelId=${x.labelId}:seq=${x.seq}}"
- (cacheKey -> x)
- })
- putsToCache(ls.map { x =>
- var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
- (cacheKey -> x)
- })
+
putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
val cacheKey = s"labelId=${labelId}"
- (cacheKey -> ls)
+ (className + cacheKey -> ls)
}.toList)
}
}
-/**
-mgmt.buildIndex('nameAndAge',Vertex.class)
-.addKey(name,Mapping.TEXT.getParameter())
-.addKey(age,Mapping.TEXT.getParameter())
-.buildMixedIndex("search")
-v: {name: abc} - E1: {age: 20}, E2, E3....
-Management.createServiceColumn(
- serviceName = serviceName, columnName = "person", columnType = "integer",
- props = Seq(
- Prop("name", "-", "string"),
- Prop("age", "0", "integer"),
- Prop("location", "-", "string")
- )
-)
-
-management.createLabel(
- label = "bought",
- srcServiceName = serviceName, srcColumnName = "person", srcColumnType = "integer",
- tgtServiceName = serviceName, tgtColumnName = "product", tgtColumnType = "integer", idDirected = true,
- serviceName = serviceName,
- indices = Seq(
- Index("PK", Seq("amount", "created_at"), IndexType("mixed", propsMapping: Map[String, String]),
-{"in": {}, "out": {}})
- ),
- props = Seq(
- Prop("amount", "0.0", "double"),
- Prop("created_at", "2000-01-01", "string")
- ),
- consistencyLevel = "strong"
-)
-
-mgmt.buildIndex('PK', Edge.class)
- .addKey(amount, Double)
- .buildCompositeIndex
-
-*/
case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String,
dir: Option[Int], options: Option[String]) {
// both
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala
similarity index 91%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala
index 3f54f49..f25be5f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
/**
* Created by shon on 6/3/15.
@@ -30,8 +30,9 @@
import scala.util.Try
-object LabelMeta extends Model[LabelMeta] {
-
+object LabelMeta extends SQLSyntaxSupport[LabelMeta] {
+ import Schema._
+ val className = LabelMeta.getClass.getSimpleName
/** dummy sequences */
val fromSeq = (-4).toByte
@@ -93,7 +94,7 @@
def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq // || seq == fromHashSeq
def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta = {
- val cacheKey = "id=" + id
+ val cacheKey = className + "id=" + id
withCache(cacheKey) {
sql"""select * from label_metas where id = ${id}""".map { rs => LabelMeta(rs) }.single.apply
@@ -101,7 +102,7 @@
}
def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[LabelMeta] = {
- val cacheKey = "labelId=" + labelId
+ val cacheKey = className + "labelId=" + labelId
lazy val labelMetas = sql"""select *
from label_metas
where label_id = ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply()
@@ -116,7 +117,7 @@
case from.name => Some(from)
case to.name => Some(to)
case _ =>
- val cacheKey = "labelId=" + labelId + ":name=" + name
+ val cacheKey = className + "labelId=" + labelId + ":name=" + name
lazy val labelMeta = sql"""
select *
from label_metas where label_id = ${labelId} and name = ${name}"""
@@ -132,8 +133,8 @@
val seq = ls.size + 1
if (seq < maxValue) {
- sql"""insert into label_metas(label_id, name, seq, default_value, data_type, store_in_global_index)
- select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}, ${storeInGlobalIndex}""".updateAndReturnGeneratedKey.apply()
+ sql"""insert into label_metas(label_id, name, seq, default_value, data_type)
+ select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}""".updateAndReturnGeneratedKey.apply()
} else {
throw MaxPropSizeReachedException("max property size reached")
}
@@ -151,8 +152,8 @@
insert(labelId, name, defaultValue, dataType, storeInGlobalIndex)
val cacheKey = "labelId=" + labelId + ":name=" + name
val cacheKeys = "labelId=" + labelId
- expireCache(cacheKey)
- expireCaches(cacheKeys)
+ expireCache(className + cacheKey)
+ expireCaches(className + cacheKeys)
findByName(labelId, name, useCache = false).get
}
}
@@ -163,24 +164,19 @@
sql"""delete from label_metas where id = ${id}""".execute.apply()
val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:name=$name")
cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
+ expireCache(className + key)
+ expireCaches(className + key)
}
}
def findAll()(implicit session: DBSession = AutoSession) = {
val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) }.list.apply
- putsToCache(ls.map { x =>
- val cacheKey = s"id=${x.id.get}"
- cacheKey -> x
- })
- putsToCache(ls.map { x =>
- val cacheKey = s"labelId=${x.labelId}:name=${x.name}"
- cacheKey -> x
- })
- putsToCache(ls.map { x =>
- val cacheKey = s"labelId=${x.labelId}:seq=${x.seq}"
- cacheKey -> x
+ putsToCacheOption(ls.flatMap { x =>
+ Seq(
+ s"id=${x.id.get}",
+ s"labelId=${x.labelId}:name=${x.name}",
+ s"labelId=${x.labelId}:seq=${x.seq}"
+ ).map(cacheKey => (className + cacheKey, x))
})
putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
similarity index 79%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
index e21072e..5b9e0ae 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong
@@ -33,19 +33,21 @@
import scala.language.{higherKinds, implicitConversions}
import scala.util.{Failure, Success, Try}
-object Model {
+object Schema {
var maxSize = 10000
var ttl = 60
+ var safeUpdateCache: SafeUpdateCache = _
+
val numOfThread = Runtime.getRuntime.availableProcessors()
val threadPool = Executors.newFixedThreadPool(numOfThread)
val ec = ExecutionContext.fromExecutor(threadPool)
- val useUTF8Encoding = "?useUnicode=true&characterEncoding=utf8"
private val ModelReferenceCount = new AtomicLong(0L)
def apply(config: Config) = {
maxSize = config.getInt("cache.max.size")
ttl = config.getInt("cache.ttl.seconds")
+
Class.forName(config.getString("db.default.driver"))
val settings = ConnectionPoolSettings(
@@ -61,7 +63,7 @@
settings)
checkSchema()
-
+ safeUpdateCache = new SafeUpdateCache(maxSize, ttl)(ec)
ModelReferenceCount.incrementAndGet()
}
@@ -140,7 +142,8 @@
throw new IllegalStateException(s"Failed to list models", e)
}
}
- clearCache()
+// clearCache()
+ safeUpdateCache.shutdown()
ConnectionPool.closeAll()
}
@@ -153,14 +156,14 @@
ColumnMeta.findAll()
}
- def clearCache() = {
- Service.expireAll()
- ServiceColumn.expireAll()
- Label.expireAll()
- LabelMeta.expireAll()
- LabelIndex.expireAll()
- ColumnMeta.expireAll()
- }
+// def clearCache() = {
+// Service.expireAll()
+// ServiceColumn.expireAll()
+// Label.expireAll()
+// LabelMeta.expireAll()
+// LabelIndex.expireAll()
+// ColumnMeta.expireAll()
+// }
def extraOptions(options: Option[String]): Map[String, JsValue] = options match {
case None => Map.empty
@@ -181,6 +184,7 @@
val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, value) =>
key -> JSONParser.jsValueToAny(value).getOrElse(throw new RuntimeException("!!"))
}
+
ConfigFactory.parseMap(configMap.asJava)
}
} catch {
@@ -189,43 +193,33 @@
None
}
}
-}
-trait Model[V] extends SQLSyntaxSupport[V] {
+ private def toMultiKey(key: String): String = key + ".__m__"
- import Model._
+ def withCache[T <: AnyRef](key: String, broadcast: Boolean = true)(op: => T) = safeUpdateCache.withCache(key, broadcast)(op)
- implicit val ec: ExecutionContext = Model.ec
+ def withCaches[T <: AnyRef](key: String, broadcast: Boolean = true)(op: => T) = safeUpdateCache.withCache(toMultiKey(key), broadcast)(op)
- val cName = this.getClass.getSimpleName()
- logger.info(s"LocalCache[$cName]: TTL[$ttl], MaxSize[$maxSize]")
+ def expireCache(key: String) = safeUpdateCache.invalidate(key)
- val optionCache = new SafeUpdateCache[Option[V]](cName, maxSize, ttl)
- val listCache = new SafeUpdateCache[List[V]](cName, maxSize, ttl)
+ def expireCaches(key: String) = safeUpdateCache.invalidate(toMultiKey(key))
- val withCache = optionCache.withCache _
-
- val withCaches = listCache.withCache _
-
- val expireCache = optionCache.invalidate _
-
- val expireCaches = listCache.invalidate _
-
- def expireAll() = {
- listCache.invalidateAll()
- optionCache.invalidateAll()
+ def putsToCacheOption[T <: AnyRef](kvs: List[(String, T)]) = kvs.foreach {
+ case (key, value) => safeUpdateCache.put(key, Option(value))
}
- def putsToCache(kvs: List[(String, V)]) = kvs.foreach {
- case (key, value) => optionCache.put(key, Option(value))
+ def putsToCaches[T <: AnyRef](kvs: List[(String, T)]) = kvs.foreach {
+ case (key, values) => safeUpdateCache.put(toMultiKey(key), values)
}
- def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach {
- case (key, values) => listCache.put(key, values)
+ def getCacheSize(): Int = safeUpdateCache.asMap().size()
+
+ def getAllCacheData[T <: AnyRef](): (List[(String, T)], List[(String, List[T])]) = {
+ (Nil, Nil)
}
- def getAllCacheData() : (List[(String, Option[_])], List[(String, List[_])]) = {
- (optionCache.getAllData(), listCache.getAllData())
- }
+ def toBytes(): Array[Byte] = safeUpdateCache.toBytes()
+
+ def fromBytes(safeUpdateCache: SafeUpdateCache, bytes: Array[Byte]): Unit = SafeUpdateCache.fromBytes(safeUpdateCache, bytes)
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
similarity index 82%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
index 5b4f494..611a746 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
import java.util.UUID
@@ -26,24 +26,32 @@
import play.api.libs.json.Json
import scalikejdbc._
-object Service extends Model[Service] {
+object Service extends SQLSyntaxSupport[Service] {
+ import Schema._
+ val className = Service.getClass.getSimpleName
+
def valueOf(rs: WrappedResultSet): Service = {
- Service(rs.intOpt("id"), rs.string("service_name"), rs.string("access_token"),
- rs.string("cluster"), rs.string("hbase_table_name"), rs.int("pre_split_size"), rs.intOpt("hbase_table_ttl"))
+ Service(rs.intOpt("id"),
+ rs.string("service_name"),
+ rs.string("access_token"),
+ rs.string("cluster"),
+ rs.string("hbase_table_name"),
+ rs.int("pre_split_size"),
+ rs.intOpt("hbase_table_ttl"))
}
def findByAccessToken(accessToken: String)(implicit session: DBSession = AutoSession): Option[Service] = {
- val cacheKey = s"accessToken=$accessToken"
+ val cacheKey = className + s"accessToken=$accessToken"
withCache(cacheKey)( sql"""select * from services where access_token = ${accessToken}""".map { rs => Service.valueOf(rs) }.single.apply)
}
def findById(id: Int)(implicit session: DBSession = AutoSession): Service = {
- val cacheKey = "id=" + id
+ val cacheKey = className + "id=" + id
withCache(cacheKey)( sql"""select * from services where id = ${id}""".map { rs => Service.valueOf(rs) }.single.apply).get
}
def findByName(serviceName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Service] = {
- val cacheKey = "serviceName=" + serviceName
+ val cacheKey = className + "serviceName=" + serviceName
lazy val serviceOpt = sql"""
select * from services where service_name = ${serviceName}
""".map { rs => Service.valueOf(rs) }.single.apply()
@@ -67,8 +75,8 @@
sql"""delete from service_columns where id = ${id}""".execute.apply()
val cacheKeys = List(s"id=$id", s"serviceName=$serviceName")
cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
+ expireCache(className + key)
+ expireCaches(className + key)
}
}
@@ -79,21 +87,18 @@
case None =>
insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
val cacheKey = "serviceName=" + serviceName
- expireCache(cacheKey)
+ expireCache(className + cacheKey)
findByName(serviceName).get
}
}
def findAll()(implicit session: DBSession = AutoSession) = {
val ls = sql"""select * from services""".map { rs => Service.valueOf(rs) }.list.apply
- putsToCache(ls.map { x =>
- val cacheKey = s"id=${x.id.get}"
- (cacheKey -> x)
- })
-
- putsToCache(ls.map { x =>
- val cacheKey = s"serviceName=${x.serviceName}"
- (cacheKey -> x)
+ putsToCacheOption(ls.flatMap { x =>
+ Seq(
+ s"id=${x.id.get}",
+ s"serviceName=${x.serviceName}"
+ ).map(cacheKey => (className + cacheKey, x))
})
ls
@@ -121,8 +126,8 @@
Json.parse("{}")
}
- lazy val extraOptions = Model.extraOptions(options)
+ lazy val extraOptions = Schema.extraOptions(options)
lazy val storageConfigOpt: Option[Config] = toStorageConfig
def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache)
- def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions)
+ def toStorageConfig: Option[Config] = Schema.toStorageConfig(extraOptions)
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
similarity index 86%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
index e8bec06..cc1698a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
import org.apache.s2graph.core.JSONParser
import org.apache.s2graph.core.JSONParser._
@@ -25,7 +25,10 @@
import play.api.libs.json.Json
import scalikejdbc._
-object ServiceColumn extends Model[ServiceColumn] {
+object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
+ import Schema._
+ val className = ServiceColumn.getClass.getSimpleName
+
val Default = ServiceColumn(Option(0), -1, "default", "string", "v4")
def valueOf(rs: WrappedResultSet): ServiceColumn = {
@@ -42,18 +45,14 @@
}
def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
- val cacheKey = "id=" + id
-
- if (useCache) {
- withCache(cacheKey)(sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply).get
- } else {
- sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply.get
- }
+ val cacheKey = className + "id=" + id
+ lazy val sql = sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply
+ if (useCache) withCache(cacheKey)(sql).get
+ else sql.get
}
def find(serviceId: Int, columnName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = {
-// val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
- val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
+ val cacheKey = className + "serviceId=" + serviceId + ":columnName=" + columnName
if (useCache) {
withCache(cacheKey) {
sql"""
@@ -76,8 +75,8 @@
sql"""delete from service_columns where id = ${id}""".execute.apply()
val cacheKeys = List(s"id=$id", s"serviceId=$serviceId:columnName=$columnName")
cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
+ expireCache(className + key)
+ expireCaches(className + key)
}
}
def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
@@ -87,20 +86,19 @@
insert(serviceId, columnName, columnType, schemaVersion)
// val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
- expireCache(cacheKey)
+ expireCache(className + cacheKey)
find(serviceId, columnName).get
}
}
def findAll()(implicit session: DBSession = AutoSession) = {
val ls = sql"""select * from service_columns""".map { rs => ServiceColumn.valueOf(rs) }.list.apply
- putsToCache(ls.map { x =>
- var cacheKey = s"id=${x.id.get}"
- (cacheKey -> x)
+ putsToCacheOption(ls.flatMap { x =>
+ Seq(
+ s"id=${x.id.get}",
+ s"serviceId=${x.serviceId}:columnName=${x.columnName}"
+ ).map(cacheKey => (className + cacheKey, x))
})
- putsToCache(ls.map { x =>
- var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}"
- (cacheKey -> x)
- })
+
ls
}
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 2d74a7c..2b02bdd 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -22,7 +22,7 @@
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.TraversalHelper._
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core.parsers.WhereParser
import org.apache.s2graph.core.utils.logger
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index b1fdd7c..0526042 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -26,7 +26,7 @@
import com.typesafe.config.Config
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.storage.serde._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
index e575c5b..5db02cc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
@@ -24,7 +24,7 @@
import com.typesafe.config.Config
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange}
import org.apache.s2graph.core.storage.serde.StorageSerializable
import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, StorageSerDe}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
index 9871b36..0748efb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core.storage.serde
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core._
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.utils.logger
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
index dc7690b..884ca11 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.storage.serde
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Label, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.storage.CanSKeyValue
import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
index 219d097..20e676a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.storage.serde
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{ColumnMeta, LabelMeta}
+import org.apache.s2graph.core.schema.{ColumnMeta, LabelMeta}
import org.apache.s2graph.core.storage.SKeyValue
import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 945f246..44ae52f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -21,7 +21,7 @@
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.storage.CanSKeyValue
import org.apache.s2graph.core.storage.serde.Deserializable
import org.apache.s2graph.core.storage.serde.StorageDeserializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
index 28982dc..6e467b6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.storage.serde.indexedge.tall
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core.types.VertexId
import org.apache.s2graph.core.{GraphUtil, IndexEdge}
import org.apache.s2graph.core.storage.serde.StorageSerializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index e533b4b..1051c6e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -21,7 +21,7 @@
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.storage.serde.StorageDeserializable._
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.storage.serde.Deserializable
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 34e9a6e..91127f9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.storage.serde.indexedge.wide
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
import org.apache.s2graph.core.types.VertexId
import org.apache.s2graph.core.{GraphUtil, IndexEdge}
import org.apache.s2graph.core.storage.serde.StorageSerializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index f5c10a7..580acd7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.storage.serde.snapshotedge.tall
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
import org.apache.s2graph.core.storage.serde.StorageDeserializable._
import org.apache.s2graph.core.storage.CanSKeyValue
import org.apache.s2graph.core.types._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 02b2977..7937de8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -21,7 +21,7 @@
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.{S2Edge, SnapshotEdge}
-import org.apache.s2graph.core.mysqls.LabelIndex
+import org.apache.s2graph.core.schema.LabelIndex
import org.apache.s2graph.core.storage.serde._
import org.apache.s2graph.core.storage.serde.StorageSerializable._
import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index 7dec6d9..e0243cc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.storage.serde.snapshotedge.wide
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.storage.serde.StorageDeserializable._
import org.apache.s2graph.core.storage.CanSKeyValue
import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceVertexId, TargetVertexId}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index 44d4a2a..c5daa99 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -21,7 +21,7 @@
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.{S2Edge, SnapshotEdge}
-import org.apache.s2graph.core.mysqls.LabelIndex
+import org.apache.s2graph.core.schema.LabelIndex
import org.apache.s2graph.core.storage.serde.Serializable
import org.apache.s2graph.core.storage.serde.StorageSerializable._
import org.apache.s2graph.core.types.VertexId
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
index 2a7cd6a..a1f5705 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core.storage.serde.vertex.tall
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
import org.apache.s2graph.core.storage.CanSKeyValue
import org.apache.s2graph.core.storage.serde.Deserializable
import org.apache.s2graph.core.storage.serde.StorageDeserializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
index d1d4d7d..f27cc5c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core.storage.serde.vertex.wide
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
import org.apache.s2graph.core.storage.CanSKeyValue
import org.apache.s2graph.core.storage.serde.Deserializable
import org.apache.s2graph.core.storage.serde.StorageDeserializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
index 77951c0..f88509f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.types
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
object HBaseType {
val VERSION4 = "v4"
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
index b1de0c4..1ad56b3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
@@ -21,7 +21,7 @@
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.{GraphUtil, S2Vertex}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types.HBaseType._
import org.apache.s2graph.core.io.Conversions._
import play.api.libs.json.Json
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
index b402c0f..36c1a6d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
@@ -19,12 +19,15 @@
package org.apache.s2graph.core.utils
+import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory
import play.api.libs.json.JsValue
import scala.language.{higherKinds, implicitConversions}
+import scala.util.{Random, Try}
object logger {
+ val conf = ConfigFactory.load()
trait Loggable[T] {
def toLogMessage(msg: T): String
@@ -53,12 +56,17 @@
private val metricLogger = LoggerFactory.getLogger("metrics")
private val queryLogger = LoggerFactory.getLogger("query")
private val malformedLogger = LoggerFactory.getLogger("malformed")
+ private val syncLogger = LoggerFactory.getLogger("meta_sync")
def metric[T: Loggable](msg: => T) = metricLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
+ def syncInfo[T: Loggable](msg: => T) = syncLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
+
def info[T: Loggable](msg: => T) = logger.info(implicitly[Loggable[T]].toLogMessage(msg))
- def debug[T: Loggable](msg: => T) = logger.debug(implicitly[Loggable[T]].toLogMessage(msg))
+ def warn[T: Loggable](msg: => T) = logger.warn(implicitly[Loggable[T]].toLogMessage(msg))
+
+ def debug[T: Loggable](msg: => T) = if (logger.isDebugEnabled) logger.debug(implicitly[Loggable[T]].toLogMessage(msg))
def error[T: Loggable](msg: => T, exception: => Throwable) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index a98104c..cc9bd4b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -19,60 +19,123 @@
package org.apache.s2graph.core.utils
+import java.io._
import java.util.concurrent.atomic.AtomicBoolean
import com.google.common.cache.CacheBuilder
-
+import com.google.common.hash.Hashing
import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
object SafeUpdateCache {
case class CacheKey(key: String)
-}
+ def serialise(value: AnyRef): Try[Array[Byte]] = {
+ import scala.pickling.Defaults._
+ import scala.pickling.binary._
-class SafeUpdateCache[T](prefix: String, maxSize: Int, ttl: Int)(implicit executionContext: ExecutionContext) {
+ val result = Try(value.pickle.value)
+ result.failed.foreach { e =>
+ logger.syncInfo(s"[Serialise failed]: ${value}, ${e}")
+ }
- import SafeUpdateCache._
-
- implicit class StringOps(key: String) {
- def toCacheKey = new CacheKey(prefix + ":" + key)
+ result
}
- def toTs() = (System.currentTimeMillis() / 1000).toInt
+ def deserialise(bytes: Array[Byte]): Try[AnyRef] = {
+ import scala.pickling.Defaults._
+ import scala.pickling.binary._
- private val cache = CacheBuilder.newBuilder().maximumSize(maxSize).build[CacheKey, (T, Int, AtomicBoolean)]()
+ Try(BinaryPickle(bytes).unpickle[AnyRef])
+ }
- def put(key: String, value: T) = cache.put(key.toCacheKey, (value, toTs, new AtomicBoolean(false)))
+ def fromBytes(cache: SafeUpdateCache, bytes: Array[Byte]): Unit = {
+ import org.apache.hadoop.io.WritableUtils
- def invalidate(key: String) = cache.invalidate(key.toCacheKey)
+ val bais = new ByteArrayInputStream(bytes)
+ val input = new DataInputStream(bais)
- def withCache(key: String)(op: => T): T = {
- val cacheKey = key.toCacheKey
+ try {
+ val size = WritableUtils.readVInt(input)
+ (1 to size).foreach { ith =>
+ val cacheKey = WritableUtils.readVLong(input)
+ val value = deserialise(WritableUtils.readCompressedByteArray(input))
+ value.foreach { dsv =>
+ cache.putInner(cacheKey, dsv)
+ }
+ }
+ } finally {
+ bais.close()
+ input.close()
+ }
+ }
+}
+
+class SafeUpdateCache(maxSize: Int,
+ ttl: Int)
+ (implicit executionContext: ExecutionContext) {
+
+ import java.lang.{Long => JLong}
+ import SafeUpdateCache._
+
+ private val cache = CacheBuilder.newBuilder().maximumSize(maxSize)
+ .build[JLong, (AnyRef, Int, AtomicBoolean)]()
+
+ private def toCacheKey(key: String): Long = {
+ Hashing.murmur3_128().hashBytes(key.getBytes("UTF-8")).asLong()
+ }
+
+ private def toTs() = (System.currentTimeMillis() / 1000).toInt
+
+ def put(key: String, value: AnyRef, broadcast: Boolean = false): Unit = {
+ val cacheKey = toCacheKey(key)
+ cache.put(cacheKey, (value, toTs, new AtomicBoolean(false)))
+ }
+
+ def putInner(cacheKey: Long, value: AnyRef): Unit = {
+ cache.put(cacheKey, (value, toTs, new AtomicBoolean(false)))
+ }
+
+ def invalidate(key: String, broadcast: Boolean = true) = {
+ val cacheKey = toCacheKey(key)
+ cache.invalidate(cacheKey)
+ }
+
+ def invalidateInner(cacheKey: Long): Unit = {
+ cache.invalidate(cacheKey)
+ }
+
+ def withCache[T <: AnyRef](key: String, broadcast: Boolean)(op: => T): T = {
+ val cacheKey = toCacheKey(key)
val cachedValWithTs = cache.getIfPresent(cacheKey)
if (cachedValWithTs == null) {
+ val value = op
// fetch and update cache.
- val newValue = op
- cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false)))
- newValue
+ put(key, value, broadcast)
+ value
} else {
- val (cachedVal, updatedAt, isUpdating) = cachedValWithTs
+ val (_cachedVal, updatedAt, isUpdating) = cachedValWithTs
+ val cachedVal = _cachedVal.asInstanceOf[T]
+
if (toTs() < updatedAt + ttl) cachedVal // in cache TTL
else {
val running = isUpdating.getAndSet(true)
+
if (running) cachedVal
else {
- Future(op)(executionContext) onComplete {
+ val value = op
+ Future(value)(executionContext) onComplete {
case Failure(ex) =>
- cache.put(cacheKey, (cachedVal, toTs(), new AtomicBoolean(false))) // keep old value
- logger.error(s"withCache update failed: $cacheKey")
+ put(key, cachedVal, false)
+ logger.error(s"withCache update failed: $cacheKey", ex)
case Success(newValue) =>
- cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false))) // update new value
+ put(key, newValue, broadcast = (broadcast && newValue != cachedVal))
logger.info(s"withCache update success: $cacheKey")
}
+
cachedVal
}
}
@@ -81,10 +144,35 @@
def invalidateAll() = cache.invalidateAll()
- def getAllData() : List[(String, T)] = {
- cache.asMap().map { case (key, value) =>
- (key.key.substring(prefix.size + 1), value._1)
- }.toList
+ def asMap() = cache.asMap()
+
+ def get(key: String) = cache.asMap().get(toCacheKey(key))
+
+ def toBytes(): Array[Byte] = {
+ import org.apache.hadoop.io.WritableUtils
+ val baos = new ByteArrayOutputStream()
+ val output = new DataOutputStream(baos)
+
+ try {
+ val m = cache.asMap()
+ val size = m.size()
+
+ WritableUtils.writeVInt(output, size)
+ for (key <- m.keys) {
+ val (value, _, _) = m.get(key)
+ WritableUtils.writeVLong(output, key)
+ serialise(value).foreach { sv =>
+ WritableUtils.writeCompressedByteArray(output, sv)
+ }
+ }
+ output.flush()
+ baos.toByteArray
+ } finally {
+ baos.close()
+ output.close()
+ }
}
+
+ def shutdown() = {}
}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala
new file mode 100644
index 0000000..fb5dbe6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala
@@ -0,0 +1,181 @@
+//package org.apache.s2graph.core.utils
+//
+//import java.util.Properties
+//
+//import io.reactivex._
+//import io.reactivex.schedulers.Schedulers
+//import org.apache.kafka.clients.consumer.KafkaConsumer
+//import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
+//import org.apache.s2graph.core.ExceptionHandler
+//
+//
+//trait Sync[VALUE] {
+// type KV = (String, VALUE)
+//
+// val EmptyValue: VALUE
+// val SyncId: String
+// val SkipSelfBroadcast: Boolean
+//
+// val delim = "___" // ! do not change
+//
+// // public
+// val emitter = io.reactivex.processors.PublishProcessor.create[KV]()
+//
+// def send(key: String): Unit = send(key, EmptyValue)
+//
+// def send(key: String, value: VALUE): Unit = sendImpl(encodeKey(key), value)
+//
+// def shutdown(): Unit = {
+// emitter.onComplete()
+// }
+//
+// protected def emit(kv: KV): Unit = {
+// val (encodedKey, value) = kv
+// val (sid, key) = decodeKey(encodedKey)
+//
+// if (SkipSelfBroadcast && sid == SyncId) {
+// logger.syncInfo(s"[Message ignore] sent by self: ${SyncId}")
+// // pass
+// } else {
+// emitter.onNext(key -> value)
+// logger.syncInfo(s"[Message emit success]: selfId: ${SyncId}, from: ${sid}")
+// }
+// }
+//
+// protected def sendImpl(key: String, value: VALUE): Unit
+//
+// private def encodeKey(key: String): String = s"${SyncId}${delim}${key}"
+//
+// private def decodeKey(key: String): (String, String) = {
+// val Array(cid, decodedKey) = key.split(delim)
+// cid -> decodedKey
+// }
+//}
+//
+//class MemorySync(syncId: String) extends Sync[Array[Byte]] {
+//
+// override val EmptyValue: Array[Byte] = Array.empty[Byte]
+//
+// override val SyncId: String = syncId
+//
+// override val SkipSelfBroadcast: Boolean = false
+//
+// override protected def sendImpl(key: String, value: Array[Byte]): Unit = emit(key -> value)
+//}
+//
+//object KafkaSync {
+//
+// import java.util.Properties
+//
+// val keySerializer = "org.apache.kafka.common.serialization.StringSerializer"
+// val valueSerializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
+// val keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"
+// val valueDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+//
+// val maxMessageSize = "15728640" // 15 MB
+//
+// def producerConfig(brokers: String): Properties = {
+// val props = ExceptionHandler.toKafkaProp(brokers)
+//
+// props.setProperty("key.serializer", keySerializer)
+// props.setProperty("value.serializer", valueSerializer)
+// props.setProperty("message.max.bytes", maxMessageSize)
+// props.setProperty("max.request.size", maxMessageSize)
+//
+//
+// props
+// }
+//
+// def consumerConfig(brokers: String, groupId: String): Properties = {
+// val props = new Properties()
+//
+// props.put("bootstrap.servers", brokers)
+// props.put("group.id", groupId)
+// props.put("enable.auto.commit", "false")
+// props.put("key.deserializer", keyDeserializer)
+// props.put("value.deserializer", valueDeserializer)
+// props.put("max.partition.fetch.bytes", maxMessageSize)
+// props.put("fetch.message.max.bytes", maxMessageSize)
+//
+// props
+// }
+//}
+//
+//class KafkaSync(topic: String,
+// syncId: String,
+// producerConfig: Properties,
+// consumerConfig: Properties,
+// skipSelfBroadcast: Boolean = true,
+// seekTo: String = "end"
+// ) extends Sync[Array[Byte]] {
+//
+// type VALUE = Array[Byte]
+//
+// val consumerTimeout: Int = 1000
+//
+// override val EmptyValue = Array.empty[Byte]
+//
+// override val SyncId = syncId
+//
+// override val SkipSelfBroadcast: Boolean = skipSelfBroadcast
+//
+// lazy val producer = new KafkaProducer[String, VALUE](producerConfig)
+//
+// lazy val consumer = {
+// import scala.collection.JavaConverters._
+//
+// val kc = new KafkaConsumer[String, VALUE](consumerConfig)
+// kc.subscribe(Seq(topic).asJava)
+// kc.poll(consumerTimeout * 10) // Just for meta info sync
+//
+// if (seekTo == "end") {
+// kc.seekToEnd(kc.assignment())
+// } else if (seekTo == "beginning") {
+// kc.seekToBeginning(kc.assignment())
+// } else {
+// // pass
+// }
+//
+// kc
+// }
+//
+// // Emit event from kafka consumer: Flowable is just for while loop, not for Observabl
+// Flowable.create(new FlowableOnSubscribe[KV] {
+// import scala.collection.JavaConverters._
+//
+// override def subscribe(e: FlowableEmitter[KV]) = {
+// while (true) {
+// val ls = consumer.poll(consumerTimeout).asScala.map(record => record.key() -> record.value())
+// ls.foreach(emit)
+//
+// if (ls.nonEmpty) {
+// consumer.commitSync()
+// logger.syncInfo(s"[Kafka consume success]: message size: ${ls.size}]")
+// }
+// }
+//
+// e.onComplete()
+// }
+// }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.single()).subscribe()
+//
+// override protected def sendImpl(key: String, value: VALUE): Unit = {
+// val record = new ProducerRecord[String, VALUE](topic, key, value)
+//
+// producer.send(record, new Callback {
+// override def onCompletion(metadata: RecordMetadata, exception: Exception) = {
+// if (exception != null) {
+// logger.syncInfo(s"[Kafka produce failed]: from: key: ${key} e: ${exception}")
+// } else {
+// logger.syncInfo(s"[Kafka produce success]: from: ${SyncId} ${metadata}")
+// }
+// }
+// })
+// }
+//
+// override def shutdown(): Unit = {
+// super.shutdown()
+//
+// producer.close()
+// consumer.close()
+// }
+//}
diff --git a/s2core/src/test/resources/reference.conf b/s2core/src/test/resources/reference.conf
new file mode 100644
index 0000000..d7fe546
--- /dev/null
+++ b/s2core/src/test/resources/reference.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# APP PHASE
+phase = test
+
+host = localhost
+
+# Hbase
+hbase.table.compression.algorithm="gz"
+hbase.zookeeper.quorum=${host}
+
+# Asynchbase
+hbase.client.retries.number=100
+hbase.rpcs.buffered_flush_interval=100
+hbase.rpc.timeout=0
+
+# local retry number
+max.retry.number=100
+max.back.off=50
+
+# Future cache.
+future.cache.max.size=100000
+future.cache.expire.after.write=10000
+future.cache.expire.after.access=5000
+future.cache.metric.interval=60000
+
+# Local Cache
+cache.ttl.seconds=60
+cache.max.size=100000
+
+# DB
+s2graph.models.table.name = "models-dev"
+
+db.default.driver = "org.h2.Driver"
+db.default.url = "jdbc:h2:file:./var/metastore;MODE=MYSQL"
+db.default.user = "sa"
+db.default.password = "sa"
+
+
+akka {
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
+ loglevel = "DEBUG"
+}
+
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
index 4ed7905..1baa89d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core.Integrate
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.utils.logger
import play.api.libs.json.{JsObject, Json}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index c7d9f59..0177543 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.Integrate
import com.typesafe.config._
-import org.apache.s2graph.core.mysqls.{GlobalIndex, Label, Model}
+import org.apache.s2graph.core.schema.{GlobalIndex, Label, Schema}
import org.apache.s2graph.core.rest.{RequestParser, RestHandler}
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.core._
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala
index b2c1113..c2ca367 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core
import org.apache.s2graph.core.Integrate.IntegrateCommon
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
import play.api.libs.json.Json
class ManagementTest extends IntegrateCommon {
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
index 2b439bc..05096e1 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core
import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{ServiceColumn, LabelMeta}
+import org.apache.s2graph.core.schema.{ServiceColumn, LabelMeta}
import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
import org.apache.s2graph.core.utils.logger
import org.scalatest.FunSuite
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
index 3f6cd2a..ea43a76 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import org.apache.s2graph.core.schema.{LabelIndex, LabelMeta}
import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLikeWithTs, LabelWithDirection}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 6ac77e4..488b294 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -21,7 +21,7 @@
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, Service, ServiceColumn}
import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
import scalikejdbc.AutoSession
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
index 03ea50a..87e09bc 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
@@ -21,7 +21,7 @@
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.ServiceColumn
+import org.apache.s2graph.core.schema.ServiceColumn
import org.apache.s2graph.core.types.{HBaseType, InnerVal, SourceVertexId}
import scala.collection.mutable
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
index a5349dc..4a87cb2 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
@@ -23,7 +23,7 @@
import org.apache.s2graph.core._
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
import org.apache.tinkerpop.gremlin.process.traversal.{Order, P}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper
import org.apache.tinkerpop.gremlin.structure.T
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/io/ConversionTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/io/ConversionTest.scala
index bc53e2f..d44bbe7 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/io/ConversionTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/io/ConversionTest.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.io
//import org.apache.s2graph.core.{EdgeId, S2VertexPropertyId}
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Service, ServiceColumn}
import org.apache.s2graph.core.types.{InnerVal, VertexId}
import org.apache.s2graph.core.utils.logger
import org.scalatest.{FunSuite, Matchers}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
index 0920a89..342d9c6 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.parsers
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelMeta}
+import org.apache.s2graph.core.schema.{ServiceColumn, Label, LabelMeta}
import org.apache.s2graph.core.rest.TemplateHelper
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
similarity index 69%
rename from s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala
rename to s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
index 87a84ae..1843a57 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
@@ -17,13 +17,13 @@
* under the License.
*/
-package org.apache.s2graph.core.models
+package org.apache.s2graph.core.schema
import org.apache.s2graph.core.TestCommonWithModels
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.utils.SafeUpdateCache
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-class ModelTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll {
+class SchemaTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll {
override def beforeAll(): Unit = {
initTests()
}
@@ -56,4 +56,22 @@
val tgtColumn = labelOpt.get.tgtService
println(tgtColumn)
}
+
+ test("serialize/deserialize Schema.") {
+ import scala.collection.JavaConverters._
+ val originalMap = Schema.safeUpdateCache.asMap().asScala
+ val newCache = new SafeUpdateCache(Schema.maxSize, Schema.ttl)(scala.concurrent.ExecutionContext.Implicits.global)
+ Schema.fromBytes(newCache, Schema.toBytes())
+ val newMap = newCache.asMap().asScala
+
+ originalMap.size shouldBe newMap.size
+ originalMap.keySet shouldBe newMap.keySet
+
+ originalMap.keySet.foreach { key =>
+ val (originalVal, _, _) = originalMap(key)
+ val (newVal, _, _) = newMap(key)
+
+ originalVal shouldBe newVal
+ }
+ }
}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala
index f1331e3..05dfbf3 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core.storage
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageSerDe
import org.apache.s2graph.core.storage.serde.{StorageDeserializable, StorageSerializable}
import org.apache.s2graph.core.{S2Vertex, S2VertexLike, TestCommonWithModels}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
index 4f38e18..23ad143 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core.storage.hbase
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.TestCommonWithModels
import org.scalatest.{FunSuite, Matchers}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
index 4aecb33..b871dd6 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.storage.rocks
import org.apache.s2graph.core.TestCommonWithModels
-import org.apache.s2graph.core.mysqls.{Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{Service, ServiceColumn}
import org.apache.tinkerpop.gremlin.structure.T
import org.scalatest.{FunSuite, Matchers}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
index f52f86a..0467f7d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
@@ -24,7 +24,7 @@
import org.apache.s2graph.core.Management.JsonModel.Prop
import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types.{HBaseType, InnerVal, VertexId}
import org.apache.s2graph.core.utils.logger
import org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
index 45060cc..04268d6 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
@@ -24,7 +24,7 @@
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.Management.JsonModel.Prop
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{GlobalIndex, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{GlobalIndex, Service, ServiceColumn}
import org.apache.s2graph.core.tinkerpop.S2GraphProvider
import org.apache.s2graph.core.utils.logger
import org.apache.tinkerpop.gremlin.process.traversal.{Compare, P, Scope}
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
index caddf8d..2f599e0 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
@@ -23,7 +23,7 @@
import akka.stream.ActorMaterializer
import com.typesafe.config.Config
import org.apache.http.HttpStatus
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.counter.config.S2CounterConfig
import org.apache.s2graph.counter.core
import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
index b9ca9a4..1439118 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
@@ -24,7 +24,7 @@
import com.typesafe.config.Config
import org.apache.commons.httpclient.HttpStatus
import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.counter.config.S2CounterConfig
import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap
import org.apache.s2graph.counter.core.v2.ExactStorageGraph._
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
index 17ecc87..d61f70b 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
@@ -22,7 +22,7 @@
import com.typesafe.config.Config
import org.apache
import org.apache.s2graph.core.S2Graph
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.counter
import org.apache.s2graph.counter.config.S2CounterConfig
import org.apache.s2graph.counter.core.{RankingCounter, ExactCounter}
diff --git a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
index 2ab4823..aba035a 100644
--- a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
+++ b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.counter.core
import com.typesafe.config.ConfigFactory
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.core.{Management, S2Graph, S2Graph$}
import org.apache.s2graph.counter.config.S2CounterConfig
import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
diff --git a/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSchemaSpec.scala
similarity index 97%
rename from s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala
rename to s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSchemaSpec.scala
index be79e23..2de7248 100644
--- a/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala
+++ b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSchemaSpec.scala
@@ -22,7 +22,7 @@
import com.typesafe.config.ConfigFactory
import org.specs2.mutable.Specification
-class CounterModelSpec extends Specification {
+class CounterSchemaSpec extends Specification {
val config = ConfigFactory.load()
DBModel.initialize(config)
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
index f941224..4d9f5f5 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
@@ -22,7 +22,7 @@
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.apache.commons.httpclient.HttpStatus
-import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import org.apache.s2graph.core.schema.{Bucket, Experiment, Service}
import org.apache.s2graph.counter.loader.config.StreamingConfig
import org.apache.s2graph.counter.models.Counter
import org.apache.s2graph.counter.util.{RetryAsync, CollectionCache, CollectionCacheConfig}
diff --git a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
index 35575fb..8d58692 100644
--- a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
+++ b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.counter.loader.core
import com.typesafe.config.ConfigFactory
-import org.apache.s2graph.core.mysqls.{Label, Service}
+import org.apache.s2graph.core.schema.{Label, Service}
import org.apache.s2graph.core.types.HBaseType
import org.apache.s2graph.core.{S2Graph, Management}
import org.apache.s2graph.counter.models.DBModel
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index c2597d6..ddb21b2 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -44,6 +44,7 @@
object GraphQLServer {
+ val className = Schema.getClass.getName
val logger = LoggerFactory.getLogger(this.getClass)
// Init s2graph
@@ -56,7 +57,7 @@
val s2graph = new S2Graph(config)
val schemaCacheTTL = Try(config.getInt("schemaCacheTTL")).getOrElse(-1)
val s2Repository = new GraphRepository(s2graph)
- val schemaCache = new SafeUpdateCache[Schema[GraphRepository, Any]]("schema", maxSize = 1, ttl = schemaCacheTTL)
+ val schemaCache = new SafeUpdateCache(1, ttl = schemaCacheTTL)
def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
@@ -97,7 +98,8 @@
}
private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
- val s2schema = schemaCache.withCache("s2Schema")(createNewSchema())
+ val cacheKey = className + "s2Schema"
+ val s2schema = schemaCache.withCache(cacheKey, broadcast = false)(createNewSchema())
import GraphRepository._
val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/bind/Unmarshaller.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/bind/Unmarshaller.scala
index 807fe48..61c58e8 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/bind/Unmarshaller.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/bind/Unmarshaller.scala
@@ -20,8 +20,8 @@
package org.apache.s2graph.graphql.bind
import org.apache.s2graph.core.Management.JsonModel._
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
-import org.apache.s2graph.core.{QueryParam, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.{S2EdgeLike, S2VertexLike}
import org.apache.s2graph.graphql.repository.GraphRepository
import org.apache.s2graph.graphql.types.S2Type._
import sangria.marshalling._
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
index 3bfa556..90037cf 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
@@ -21,7 +21,7 @@
import org.apache.s2graph.core.Management.JsonModel._
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.storage.MutateResponse
import org.apache.s2graph.core.types._
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
index cb6238f..478517f 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
@@ -20,7 +20,8 @@
package org.apache.s2graph.graphql.types
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.index.IndexProvider
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.graphql.bind.AstHelper
import org.apache.s2graph.graphql.repository.GraphRepository
import org.apache.s2graph.graphql.types.S2Type.EdgeQueryParam
@@ -70,7 +71,7 @@
}
def serviceColumnOnService(column: ServiceColumn, c: Context[GraphRepository, Any]): VertexQueryParam = {
- val prefix = s"${GlobalIndex.serviceField}:${column.service.serviceName} AND ${GlobalIndex.serviceColumnField}:${column.columnName}"
+ val prefix = s"${IndexProvider.serviceField}:${column.service.serviceName} AND ${IndexProvider.serviceColumnField}:${column.columnName}"
val ids = c.argOpt[Any]("id").toSeq ++ c.argOpt[List[Any]]("ids").toList.flatten
val offset = c.arg[Int]("offset")
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
index 0312abf..1475280 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.graphql.types
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.graphql.repository.GraphRepository
import sangria.schema._
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
index c10d85e..49e6b82 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
@@ -21,7 +21,7 @@
import org.apache.s2graph.core.Management.JsonModel._
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.graphql
import org.apache.s2graph.graphql.repository.GraphRepository
import sangria.schema._
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/StaticType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/StaticType.scala
index aca832c..83168f4 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/StaticType.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/StaticType.scala
@@ -21,7 +21,7 @@
import org.apache.s2graph.core.Management.JsonModel._
import org.apache.s2graph.core.{JSONParser, S2EdgeLike, S2VertexLike}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.storage.MutateResponse
import org.apache.s2graph.graphql.repository.GraphRepository
import sangria.macros.derive._
diff --git a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
index acd6f66..72412f5 100644
--- a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
+++ b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
@@ -22,7 +22,7 @@
import com.typesafe.config.Config
import org.apache.s2graph
import org.apache.s2graph.core.Management.JsonModel.Prop
-import org.apache.s2graph.core.mysqls.{Label, Model, Service}
+import org.apache.s2graph.core.schema.{Label, Schema, Service}
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.{Management, S2Graph}
import org.apache.s2graph.graphql
@@ -77,7 +77,7 @@
}
class EmptyGraph(config: Config) extends TestGraph {
- Model.apply(config)
+ Schema.apply(config)
lazy val graph = new S2Graph(config)(scala.concurrent.ExecutionContext.Implicits.global)
lazy val management = new Management(graph)
@@ -92,7 +92,7 @@
override def repository: GraphRepository = s2Repository
override def open(): Unit = {
- Model.shutdown(true)
+ Schema.shutdown(true)
}
}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
index 561c676..27747f6 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
@@ -22,7 +22,7 @@
import org.apache.s2graph.core._
import org.apache.s2graph.core.storage.SKeyValue
import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
import play.api.libs.json.Json
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index b0b4aed..975714a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -21,6 +21,7 @@
import com.typesafe.config.Config
import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.storage.SKeyValue
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 3fa8cea..e8a3e86 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -23,9 +23,9 @@
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
-import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
import org.apache.s2graph.core.{Management, S2Graph}
+import org.apache.s2graph.core.types.HBaseType
import org.apache.s2graph.s2jobs.loader.GraphFileOptions
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index 126f193..6615cec 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -34,7 +34,7 @@
import io.netty.handler.logging.{LogLevel, LoggingHandler}
import io.netty.util.CharsetUtil
import org.apache.s2graph.core.GraphExceptions.{BadQueryException}
-import org.apache.s2graph.core.mysqls.Experiment
+import org.apache.s2graph.core.schema.Experiment
import org.apache.s2graph.core.rest.RestHandler
import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult}
import org.apache.s2graph.core.utils.Extensions._
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
index 23bda0a..00702cf 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.rest.play.controllers
import org.apache.s2graph.core.Management
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.utils.logger
import play.api.libs.functional.syntax._
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
index b3ac89d..c751250 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
@@ -22,7 +22,7 @@
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.s2graph.core.ExceptionHandler
import org.apache.s2graph.core.ExceptionHandler.KafkaMessage
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.counter
import org.apache.s2graph.counter.config.S2CounterConfig
import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 69878f8..310b098 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -22,7 +22,7 @@
import com.fasterxml.jackson.databind.JsonMappingException
import org.apache.s2graph.core.ExceptionHandler.KafkaMessage
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.utils.logger
import org.apache.s2graph.core.storage.{IncrementResponse, MutateResponse}