- rename mysqls package to schema.
- remove GlobalIndex.
- add toBytes, fromBytes on Schema.
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index bfb5a96..ab8aec8 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -29,7 +29,7 @@
 import org.apache.hadoop.hbase.regionserver.BloomType
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageManagement
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
 import org.apache.s2graph.loader.spark.{FamilyHFileWriteOptions, HBaseContext, KeyFamilyQualifier}
diff --git a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
index 6918ce4..21ce920 100644
--- a/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
+++ b/loader/src/test/scala/org/apache/s2graph/loader/subscriber/TransferToHFileTest.scala
@@ -27,7 +27,7 @@
 import org.apache.hadoop.util.ToolRunner
 import org.apache.s2graph.core.{Management, PostProcess}
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.types.HBaseType
diff --git a/project/Common.scala b/project/Common.scala
index 02a64bf..96109d3 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -31,6 +31,8 @@
 
   val elastic4sVersion = "6.1.1"
 
+  val KafkaVersion = "0.10.2.1"
+
   /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging libraries to forward JCL and JUL logs to SLF4j */
   val loggingRuntime = Seq(
     "log4j" % "log4j" % "1.2.17",
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 110d5e5..cc70e97 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -34,7 +34,8 @@
   "org.apache.hbase" % "hbase-server" % hbaseVersion excludeLogging() exclude("com.google.protobuf", "protobuf*"),
   "org.apache.hbase" % "hbase-hadoop-compat" % hbaseVersion excludeLogging(),
   "org.apache.hbase" % "hbase-hadoop2-compat" % hbaseVersion excludeLogging(),
-  "org.apache.kafka" % "kafka-clients" % "0.8.2.0" excludeLogging() exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", "j*"),
+  "org.apache.kafka" % "kafka-clients" % Common.KafkaVersion excludeLogging() exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", "j*"),
+  "org.apache.kafka" %% "kafka" % Common.KafkaVersion excludeLogging() exclude("com.sun.jdmk", "j*") exclude("com.sun.jmx", "j*") exclude("javax.jms", "j*"),
   "commons-pool" % "commons-pool" % "1.6",
   "org.scalikejdbc" %% "scalikejdbc" % "2.1.4",
   "com.h2database" % "h2" % "1.4.192",
@@ -53,7 +54,8 @@
   "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0",
   "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion excludeLogging(),
   "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(),
-  "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging()
+  "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(),
+  "org.scala-lang.modules" %% "scala-pickling" % "0.10.1"
 )
 
 libraryDependencies := {
diff --git a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
index 00c277b..c8339c8 100644
--- a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
+++ b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
@@ -21,24 +21,15 @@
 
 
 import org.apache.s2graph.core.EdgeId;
-import org.apache.s2graph.core.QueryParam;
 import org.apache.s2graph.core.S2Graph;
-import org.apache.s2graph.core.index.IndexProvider;
-import org.apache.s2graph.core.index.IndexProvider$;
-import org.apache.s2graph.core.mysqls.Label;
 import org.apache.s2graph.core.types.VertexId;
-import org.apache.s2graph.core.utils.logger;
-import org.apache.tinkerpop.gremlin.process.traversal.Order;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.HasContainerHolder;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Profiling;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.NoOpBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.IdentityStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
 import org.apache.tinkerpop.gremlin.structure.Element;
-import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.T;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
similarity index 100%
rename from s2core/src/main/resources/org/apache/s2graph/core/mysqls/schema.sql
rename to s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/mysqls/setup.sql b/s2core/src/main/resources/org/apache/s2graph/core/schema/setup.sql
similarity index 100%
rename from s2core/src/main/resources/org/apache/s2graph/core/mysqls/setup.sql
rename to s2core/src/main/resources/org/apache/s2graph/core/schema/setup.sql
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
index eb5b1da..24a4eb9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala
@@ -67,6 +67,9 @@
 }
 
 object ExceptionHandler {
+  val mainBrokerKey = "kafka.metadata.broker.list"
+  val subBrokerKey = "kafka_sub.metadata.broker.list"
+
   type Key = String
   type Val = String
 
@@ -93,21 +96,39 @@
 
   case class KafkaMessage(msg: ProducerRecord[Key, Val])
 
-  private def toKafkaProp(config: Config) = {
-    val props = new Properties()
+  def toKafkaProducer(config: Config): Option[KafkaProducer[String, String]] = {
+    val brokerKey = "kafka.producer.brokers"
+    if (config.hasPath(brokerKey)) {
+      val brokers = config.getString("kafka.producer.brokers")
+      Option(new KafkaProducer[String, String](toKafkaProp(brokers)))
+    } else {
+      None
+    }
+  }
 
+  def toKafkaProp(config: Config): Properties = {
     /* all default configuration for new producer */
     val brokers =
       if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list")
       else "localhost"
 
+    toKafkaProp(brokers)
+  }
+
+  /*
+   * http://kafka.apache.org/082/documentation.html#producerconfigs
+   * if we change our kafka version, make sure right configuration is set.
+   */
+  def toKafkaProp(brokers: String): Properties = {
+    val props = new Properties()
+
     props.put("bootstrap.servers", brokers)
     props.put("acks", "1")
     props.put("buffer.memory", "33554432")
     props.put("compression.type", "snappy")
     props.put("retries", "0")
     props.put("batch.size", "16384")
-    props.put("linger.ms", "0")
+    props.put("linger.ms", "100")
     props.put("max.request.size", "1048576")
     props.put("receive.buffer.bytes", "32768")
     props.put("send.buffer.bytes", "131072")
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
index 0478413..a73f5c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala
@@ -22,7 +22,7 @@
 import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.JSONParser.{fromJsonToProperties, toInnerVal}
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
 import org.apache.tinkerpop.gremlin.structure.T
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
index b92e47b..81b3c13 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.GraphExceptions.IllegalDataTypeException
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.rest.TemplateHelper
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs, VertexId}
 import org.apache.s2graph.core.utils.logger
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 6a12f0a..d026e5b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -24,7 +24,7 @@
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException}
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.HBaseType._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.JSONParser._
@@ -95,7 +95,7 @@
                           props: Seq[Prop],
                           schemaVersion: String = DEFAULT_VERSION) = {
 
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val serviceOpt = Service.findByName(serviceName, useCache = false)
       serviceOpt match {
         case None => throw new RuntimeException(s"create service $serviceName has not been created.")
@@ -113,7 +113,7 @@
   }
 
   def deleteColumn(serviceName: String, columnName: String, schemaVersion: String = DEFAULT_VERSION) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val service = Service.findByName(serviceName, useCache = false).getOrElse(throw new RuntimeException("Service not Found"))
       val serviceColumns = ServiceColumn.find(service.id.get, columnName, useCache = false)
       val columnNames = serviceColumns.map { serviceColumn =>
@@ -130,7 +130,7 @@
   }
 
   def deleteLabel(labelName: String): Try[Label] = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val label = Label.findByName(labelName, useCache = false).getOrElse(throw GraphExceptions.LabelNotExistException(labelName))
       Label.deleteAll(label)
       label
@@ -138,7 +138,7 @@
   }
 
   def markDeletedLabel(labelName: String) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       Label.findByName(labelName, useCache = false).foreach { label =>
         // rename & delete_at column filled with current time
         Label.markDeleted(label)
@@ -148,7 +148,7 @@
   }
 
   def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found"))
       val labelMetaMap = label.metaPropsInvMap
 
@@ -162,7 +162,7 @@
   }
 
   def addProp(labelStr: String, prop: Prop) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val labelOpt = Label.findByName(labelStr)
       val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
 
@@ -171,7 +171,7 @@
   }
 
   def addProps(labelStr: String, props: Seq[Prop]) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val labelOpt = Label.findByName(labelStr)
       val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found"))
 
@@ -251,7 +251,7 @@
    * update label name.
    */
   def updateLabelName(oldLabelName: String, newLabelName: String) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       for {
         old <- Label.findByName(oldLabelName, useCache = false)
       } {
@@ -269,7 +269,7 @@
    * swap label names.
    */
   def swapLabelNames(leftLabel: String, rightLabel: String) = {
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val tempLabel = "_" + leftLabel + "_"
       Label.updateName(leftLabel, tempLabel)
       Label.updateName(rightLabel, leftLabel)
@@ -338,7 +338,7 @@
                     preSplitSize: Int, hTableTTL: Option[Int],
                     compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = {
 
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false)
       val config = toConfig(Map(
         ZookeeperQuorum -> service.cluster,
@@ -366,7 +366,7 @@
                           props: Seq[Prop],
                           schemaVersion: String = DEFAULT_VERSION): ServiceColumn = {
 
-    val serviceColumnTry = Model withTx { implicit session =>
+    val serviceColumnTry = Schema withTx { implicit session =>
       val serviceOpt = Service.findByName(serviceName, useCache = false)
       serviceOpt match {
         case None => throw new RuntimeException(s"create service $serviceName has not been created.")
@@ -432,7 +432,7 @@
     if (hTableName.isEmpty && hTableTTL.isDefined) throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
 
     val labelOpt = Label.findByName(label, useCache = false)
-    Model withTx { implicit session =>
+    Schema withTx { implicit session =>
       if (labelOpt.isDefined) throw new LabelAlreadyExistException(s"Label name ${label} already exist.")
 
       /* create all models */
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 8e4be5b..f7a54a9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -23,7 +23,7 @@
 
 import com.google.protobuf.ByteString
 import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 6d24c3f..21aca12 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -23,7 +23,7 @@
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.DuplicatePolicy.DuplicatePolicy
 import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
 import org.apache.s2graph.core.rest.TemplateHelper
 import org.apache.s2graph.core.storage.serde.StorageSerializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 37ddf06..be57017 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
 import org.apache.s2graph.core.utils.logger
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 58b1ce1..10752da 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -22,7 +22,7 @@
 
 import org.apache.s2graph.core.S2Edge.{Props, State}
 import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.io.Conversions._
 import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, Property, T, Vertex}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
index 85321d3..a8c92df 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.S2Edge.State
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.types.{InnerValLike, TargetVertexId, VertexId}
 import org.apache.tinkerpop.gremlin.structure.Property
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index 2321ac8..413c1e9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -22,7 +22,7 @@
 import java.util.function.BiConsumer
 
 import org.apache.s2graph.core.S2Edge.{Props, State}
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.types._
 import org.apache.tinkerpop.gremlin.structure
 import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, Property, T, Vertex}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
index 1e0a95b..8c609a0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala
@@ -22,7 +22,7 @@
 import java.util.function.BiConsumer
 
 import org.apache.s2graph.core.S2Edge.Props
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
 import org.apache.tinkerpop.gremlin.structure.Property
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 7f19cb4..7816a63 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -27,7 +27,7 @@
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
 import org.apache.s2graph.core.index.IndexProvider
 import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.storage.rocks.RocksStorage
 import org.apache.s2graph.core.storage.{MutateResponse, Storage}
@@ -181,8 +181,8 @@
     config.getString("s2graph.storage.backend")
   }.getOrElse("hbase")
 
-  Model.apply(config)
-  Model.loadCache()
+  Schema.apply(config)
+  Schema.loadCache()
 
   override val management = new Management(this)
 
@@ -258,7 +258,7 @@
   override def shutdown(modelDataDelete: Boolean = false): Unit =
     if (running.compareAndSet(true, false)) {
       flushStorage()
-      Model.shutdown(modelDataDelete)
+      Schema.shutdown(modelDataDelete)
       defaultStorage.shutdown()
       localLongId.set(0l)
     }
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
index 64108db..cce05af 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
@@ -22,7 +22,7 @@
 import org.apache.commons.configuration.BaseConfiguration
 import org.apache.s2graph.core.Management.JsonModel.Prop
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
-import org.apache.s2graph.core.mysqls.{ColumnMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, ServiceColumn}
 import org.apache.s2graph.core.types.HBaseType
 import org.apache.tinkerpop.gremlin.structure.T
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index f639e84..cbd31cc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -31,7 +31,7 @@
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
 import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
 import org.apache.s2graph.core.index.IndexProvider
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn}
 import org.apache.s2graph.core.storage.{MutateResponse, Storage}
 import org.apache.s2graph.core.types.{InnerValLike, VertexId}
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index 50b94de..874924c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.{CanInnerValLike, InnerValLikeWithTs, VertexId}
 import org.apache.tinkerpop.gremlin.structure.Graph.Features
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
index 954bab0..2de8e92 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
@@ -22,7 +22,7 @@
 import java.util.function.BiConsumer
 
 import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types._
 import org.apache.tinkerpop.gremlin.structure.Vertex
 import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
index 50d6526..ffd16e9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexBuilder.scala
@@ -23,7 +23,7 @@
 import java.util.function.BiConsumer
 
 import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
 import org.apache.s2graph.core.types.VertexId
 
 class S2VertexBuilder(vertex: S2VertexLike) {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index 4608ce7..7ece8c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -22,7 +22,7 @@
 import java.util.function.{BiConsumer, Consumer}
 
 import org.apache.s2graph.core.S2Vertex.Props
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Label, Service, ServiceColumn}
 import org.apache.s2graph.core.types.{InnerValLike, VertexId}
 import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
 import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, T, Vertex, VertexProperty}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
index e0abfba..01842d7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
@@ -21,7 +21,7 @@
 
 import java.util
 
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
 import org.apache.s2graph.core.types.{CanInnerValLike, InnerValLike}
 import org.apache.tinkerpop.gremlin.structure.{Property, VertexProperty}
 import play.api.libs.json.Json
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
index bdb3c00..c46dc9c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexPropertyHelper.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core
 
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
 import org.apache.s2graph.core.types.InnerValLike
 
 object S2VertexPropertyHelper {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index c40078e..0dc2aa2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -22,7 +22,7 @@
 import java.util
 
 import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, VertexId}
 import org.apache.s2graph.core.utils.{Extensions, logger}
 import org.apache.tinkerpop.gremlin.structure.Edge
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
index e67d529..be1ad6e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
@@ -25,13 +25,10 @@
 import com.sksamuel.elastic4s.http.ElasticDsl._
 import com.sksamuel.elastic4s.http.HttpClient
 import com.typesafe.config.Config
-import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait.Predicate
 import org.apache.s2graph.core.io.Conversions
-import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core._
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import org.apache.tinkerpop.gremlin.structure.Property
 import play.api.libs.json.{Json, Reads}
 
 import scala.collection.JavaConverters._
@@ -40,14 +37,10 @@
 import scala.util.Try
 
 class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends IndexProvider {
-
-  import GlobalIndex._
   import IndexProvider._
 
   import scala.collection.mutable
 
-  implicit val executor = ec
-
   val esClientUri = Try(config.getString("es.index.provider.client.uri")).getOrElse("localhost")
   val client = HttpClient(ElasticsearchClientUri(esClientUri, 9200))
 
@@ -106,10 +99,10 @@
 
   override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = {
     val bulkRequests = vertices.flatMap { vertex =>
-      toFields(vertex, forceToIndex).toSeq.map { fields =>
-        update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields)
+        toFields(vertex, forceToIndex).toSeq.map { fields =>
+          update(vertex.id.toString()).in(new IndexAndType(VertexIndexName, TypeName)).docAsUpsert(fields)
+        }
       }
-    }
 
     if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true))
     else {
@@ -135,7 +128,7 @@
   override def mutateEdgesAsync(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = {
     val bulkRequests = edges.flatMap { edge =>
       toFields(edge, forceToIndex).toSeq.map { fields =>
-        update(edge.edgeId.toString()).in(new IndexAndType(GlobalIndex.EdgeIndexName, GlobalIndex.TypeName)).docAsUpsert(fields)
+        update(edge.edgeId.toString()).in(new IndexAndType(EdgeIndexName, TypeName)).docAsUpsert(fields)
       }
     }
 
@@ -185,7 +178,7 @@
       queryString,
       0,
       1000,
-      GlobalIndex.EdgeIndexName,
+      EdgeIndexName,
       field,
       Conversions.s2EdgeIdReads)(e => EdgeId.isValid(e).isDefined)
   }
@@ -200,7 +193,7 @@
     fetchInner[VertexId](queryString,
       0,
       1000,
-      GlobalIndex.VertexIndexName,
+      VertexIndexName,
       field,
       Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
   }
@@ -215,7 +208,7 @@
           queryString,
           vertexQueryParam.offset,
           vertexQueryParam.limit,
-          GlobalIndex.VertexIndexName,
+          VertexIndexName,
           field,
           Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
       case None => Future.successful(empty)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index f573ff6..0220ff8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -23,7 +23,7 @@
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.VertexId
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
 import org.apache.tinkerpop.gremlin.process.traversal.util.{AndP, OrP}
@@ -34,8 +34,23 @@
 import scala.util.Try
 
 object IndexProvider {
-  import GlobalIndex._
+
+  //TODO: Fix Me
+  val hitsPerPage = 100000
   val IdField = "id"
+  val vidField = "_vid_"
+  val eidField = "_eid_"
+  val labelField = "_label_"
+  val serviceField = "_service_"
+  val serviceColumnField = "_serviceColumn_"
+  val EdgeType = "edge"
+  val VertexType = "vertex"
+  val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField)
+
+  //  val IndexName = "global_indices"
+  val VertexIndexName = "global_vertex_index"
+  val EdgeIndexName = "global_edge_index"
+  val TypeName = "test"
 
   def apply(config: Config)(implicit ec: ExecutionContext): IndexProvider = {
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
index c417022..4a3d044 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
@@ -31,7 +31,6 @@
 import org.apache.lucene.store.{BaseDirectory, RAMDirectory, SimpleFSDirectory}
 import org.apache.lucene.search.TopScoreDocCollector
 import org.apache.s2graph.core.io.Conversions
-import org.apache.s2graph.core.mysqls.GlobalIndex
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike, VertexQueryParam}
@@ -42,8 +41,6 @@
 
 
 class LuceneIndexProvider(config: Config) extends IndexProvider {
-
-  import GlobalIndex._
   import IndexProvider._
 
   import scala.collection.JavaConverters._
@@ -140,7 +137,7 @@
     Future.successful(mutateEdges(edges, forceToIndex))
 
   override def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean] = {
-    val writer = getOrElseCreateIndexWriter(GlobalIndex.VertexIndexName)
+    val writer = getOrElseCreateIndexWriter(VertexIndexName)
 
     vertices.foreach { vertex =>
       toDocument(vertex, forceToIndex).foreach { doc =>
@@ -160,7 +157,7 @@
     Future.successful(mutateVertices(vertices, forceToIndex))
 
   override def mutateEdges(edges: Seq[S2EdgeLike], forceToIndex: Boolean = false): Seq[Boolean] = {
-    val writer = getOrElseCreateIndexWriter(GlobalIndex.EdgeIndexName)
+    val writer = getOrElseCreateIndexWriter(EdgeIndexName)
 
     edges.foreach { edge =>
       toDocument(edge, forceToIndex).foreach { doc =>
@@ -208,7 +205,7 @@
 
     try {
       val q = new QueryParser(field, analyzer).parse(queryString)
-      fetchInner[VertexId](q, 0, 100, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads)
+      fetchInner[VertexId](q, 0, 100, VertexIndexName, vidField, Conversions.s2VertexIdReads)
     } catch {
       case ex: ParseException =>
         logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
@@ -222,7 +219,7 @@
 
     try {
       val q = new QueryParser(field, analyzer).parse(queryString)
-      fetchInner[EdgeId](q, 0, 100, GlobalIndex.EdgeIndexName, field, Conversions.s2EdgeIdReads)
+      fetchInner[EdgeId](q, 0, 100, EdgeIndexName, field, Conversions.s2EdgeIdReads)
     } catch {
       case ex: ParseException =>
         logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
@@ -239,7 +236,7 @@
       val field = vidField
       try {
         val q = new QueryParser(field, analyzer).parse(queryString)
-        fetchInner[VertexId](q, vertexQueryParam.offset, vertexQueryParam.limit, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads)
+        fetchInner[VertexId](q, vertexQueryParam.offset, vertexQueryParam.limit, VertexIndexName, vidField, Conversions.s2VertexIdReads)
       } catch {
         case ex: ParseException =>
           logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
index 83159e2..15f1231 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.io
 
 import org.apache.s2graph.core.{EdgeId, JSONParser, S2VertexPropertyId}
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Service, ServiceColumn}
 import org.apache.s2graph.core.types.{HBaseType, InnerValLike, VertexId}
 import play.api.libs.json._
 import play.api.libs.json.Reads._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
deleted file mode 100644
index e71bbce..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Bucket.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import scalikejdbc._
-
-import scala.util.Try
-
-object Bucket extends Model[Bucket] {
-
-  val rangeDelimiter = "~"
-  val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.")
-
-  def apply(rs: WrappedResultSet): Bucket = {
-    Bucket(rs.intOpt("id"),
-      rs.int("experiment_id"),
-      rs.string("modular"),
-      rs.string("http_verb"),
-      rs.string("api_path"),
-      rs.string("request_body"),
-      rs.int("timeout"),
-      rs.string("impression_id"),
-      rs.boolean("is_graph_query"),
-      rs.boolean("is_empty"))
-  }
-
-  def finds(experimentId: Int)(implicit session: DBSession = AutoSession): List[Bucket] = {
-    val cacheKey = "experimentId=" + experimentId
-    withCaches(cacheKey) {
-      sql"""select * from buckets where experiment_id = $experimentId"""
-        .map { rs => Bucket(rs) }.list().apply()
-    }
-  }
-
-  def toRange(str: String): Option[(Int, Int)] = {
-    val range = str.split(rangeDelimiter)
-    if (range.length == 2) Option((range.head.toInt, range.last.toInt))
-    else None
-  }
-
-  def findByImpressionId(impressionId: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Bucket] = {
-    val cacheKey = "impressionId=" + impressionId
-    val sql = sql"""select * from buckets where impression_id=$impressionId"""
-      .map { rs => Bucket(rs)}
-    if (useCache) {
-      withCache(cacheKey) {
-        sql.single().apply()
-      }
-    } else {
-      sql.single().apply()
-    }
-  }
-
-  def insert(experiment: Experiment, modular: String, httpVerb: String, apiPath: String,
-             requestBody: String, timeout: Int, impressionId: String,
-             isGraphQuery: Boolean, isEmpty: Boolean)
-            (implicit session: DBSession = AutoSession): Try[Bucket] = {
-    Try {
-      sql"""
-            INSERT INTO buckets(experiment_id, modular, http_verb, api_path, request_body, timeout, impression_id,
-             is_graph_query, is_empty)
-            VALUES (${experiment.id.get}, $modular, $httpVerb, $apiPath, $requestBody, $timeout, $impressionId,
-             $isGraphQuery, $isEmpty)
-        """
-        .updateAndReturnGeneratedKey().apply()
-    }.map { newId =>
-      Bucket(Some(newId.toInt), experiment.id.get, modular, httpVerb, apiPath, requestBody, timeout, impressionId,
-        isGraphQuery, isEmpty)
-    }
-  }
-}
-
-case class Bucket(id: Option[Int],
-                  experimentId: Int,
-                  modular: String,
-                  httpVerb: String, apiPath: String,
-                  requestBody: String, timeout: Int, impressionId: String,
-                  isGraphQuery: Boolean = true,
-                  isEmpty: Boolean = false) {
-
-  import Bucket._
-
-  lazy val rangeOpt = toRange(modular)
-}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
deleted file mode 100644
index 501a964..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import scalikejdbc.{AutoSession, DBSession, WrappedResultSet}
-import scalikejdbc._
-
-object GlobalIndex extends Model[GlobalIndex] {
-  val vidField = "_vid_"
-  val eidField = "_eid_"
-  val labelField = "_label_"
-  val serviceField = "_service_"
-  val serviceColumnField = "_serviceColumn_"
-  val EdgeType = "edge"
-  val VertexType = "vertex"
-  val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField)
-
-//  val IndexName = "global_indices"
-  val VertexIndexName = "global_vertex_index"
-  val EdgeIndexName = "global_edge_index"
-  val TypeName = "test"
-
-  def apply(rs: WrappedResultSet): GlobalIndex = {
-    GlobalIndex(rs.intOpt("id"),
-      rs.string("element_type"),
-      rs.string("prop_names").split(",").sorted,
-      rs.string("index_name"))
-  }
-
-  def findBy(elementType: String, indexName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[GlobalIndex] = {
-    val cacheKey = s"elementType=$elementType:indexName=$indexName"
-    lazy val sql = sql"""select * from global_indices where element_type = ${elementType} and index_name = $indexName""".map { rs => GlobalIndex(rs) }.single.apply()
-
-    if (useCache) withCache(cacheKey){sql}
-    else sql
-  }
-
-  def insert(elementType: String, indexName: String, propNames: Seq[String])(implicit session: DBSession = AutoSession): Long = {
-    val allPropNames = (hiddenIndexFields.toSeq ++ propNames).sorted
-    sql"""insert into global_indices(element_type, prop_names, index_name)
-         values($elementType, ${allPropNames.mkString(",")}, $indexName)"""
-      .updateAndReturnGeneratedKey.apply()
-  }
-
-  def findAll(elementType: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[GlobalIndex] = {
-    lazy val ls = sql"""select * from global_indices where element_type = $elementType""".map { rs => GlobalIndex(rs) }.list.apply
-    if (useCache) {
-      listCache.withCache(s"findAll:elementType=$elementType") {
-        putsToCache(ls.map { globalIndex =>
-          val cacheKey = s"elementType=${globalIndex.elementType}:indexName=${globalIndex.indexName}"
-          cacheKey -> globalIndex
-        })
-        ls
-      }
-    } else {
-      ls
-    }
-  }
-
-  def findGlobalIndex(elementType: String, hasContainers: java.util.List[HasContainer])(implicit session: DBSession = AutoSession): Option[GlobalIndex] = {
-    import scala.collection.JavaConversions._
-    val indices = findAll(elementType, useCache = true)
-    val keys = hasContainers.map(_.getKey)
-
-    val sorted = indices.map { index =>
-      val matched = keys.filter(index.propNamesSet)
-      index -> matched.length
-    }.filter(_._2 > 0).sortBy(_._2 * -1)
-
-    sorted.headOption.map(_._1)
-  }
-
-}
-
-case class GlobalIndex(id: Option[Int],
-                       elementType: String,
-                       propNames: Seq[String],
-                       indexName: String)  {
-  val backendIndexName = indexName + "_" + elementType
-  val backendIndexNameWithType = backendIndexName + "/test1"
-  lazy val propNamesSet = propNames.toSet
-}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
deleted file mode 100644
index 38e1761..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import scalikejdbc.{AutoSession, DBSession, WrappedResultSet, _}
-
-object ServiceColumnIndex extends Model[ServiceColumnIndex] {
-  val dbTableName = "service_column_indices"
-  val DefaultName = "_PK"
-  val DefaultSeq = 1.toByte
-  val MaxOrderSeq = 7
-
-  def apply(rs: WrappedResultSet): ServiceColumnIndex = {
-    ServiceColumnIndex(rs.intOpt("id"), rs.int("service_id"), rs.int("service_column_id"),
-      rs.string("name"),
-      rs.byte("seq"), rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
-        case metaSeqsList => metaSeqsList
-      },
-      rs.stringOpt("options")
-    )
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "id=" + id
-    lazy val sql = sql"""select * from $dbTableName where id = ${id}"""
-    withCache(cacheKey) {
-      sql.map { rs => ServiceColumnIndex(rs) }.single.apply
-    }.get
-  }
-
-  def findBySeqs(serviceId: Int, serviceColumnId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[ServiceColumnIndex] = {
-    val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seqs=" + seqs.mkString(",")
-    lazy val sql =
-      sql"""
-      select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and meta_seqs = ${seqs.mkString(",")}
-      """
-    withCache(cacheKey) {
-      sql.map { rs => ServiceColumnIndex(rs) }.single.apply
-    }
-  }
-
-  def findBySeq(serviceId: Int,
-                serviceColumnId: Int,
-                seq: Byte,
-                useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seq=" + seq
-    lazy val sql =
-      sql"""
-      select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and seq = ${seq}
-      """
-    if (useCache) {
-      withCache(cacheKey)(sql.map { rs => ServiceColumnIndex(rs) }.single.apply)
-    } else {
-      sql.map { rs => ServiceColumnIndex(rs) }.single.apply
-    }
-  }
-
-
-  def findAll(serviceId: Int, serviceColumnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId"
-    lazy val sql =
-      sql"""
-          select * from $dbTableName where service_id = ${serviceId} and seq > 0 order by seq ASC
-        """
-    if (useCache) {
-      withCaches(cacheKey)(
-        sql.map { rs => ServiceColumnIndex(rs) }.list.apply
-      )
-    } else {
-      sql.map { rs => LabelIndex(rs) }.list.apply
-    }
-  }
-
-  def insert(serviceId: Int,
-             serviceColumnId: Int,
-             indexName: String,
-             seq: Byte, metaSeqs: List[Byte], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
-    sql"""
-    	insert into $dbTableName(service_id, service_column_id, name, seq, meta_seqs, options)
-    	values (${serviceId}, ${serviceColumnId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${options})
-    """
-      .updateAndReturnGeneratedKey.apply()
-  }
-
-  def findOrInsert(serviceId: Int,
-                   serviceColumnId: Int,
-                   indexName: String,
-                   metaSeqs: List[Byte],
-                   options: Option[String])(implicit session: DBSession = AutoSession): ServiceColumnIndex = {
-    findBySeqs(serviceId, serviceColumnId, metaSeqs) match {
-      case Some(s) => s
-      case None =>
-        val orders = findAll(serviceId, serviceColumnId, false)
-        val seq = (orders.size + 1).toByte
-        assert(seq <= MaxOrderSeq)
-        val createdId = insert(serviceId, serviceColumnId, indexName, seq, metaSeqs, options)
-        val cacheKeys = toCacheKeys(createdId.toInt, serviceId, serviceColumnId, seq, metaSeqs)
-
-        cacheKeys.foreach { key =>
-          expireCache(key)
-          expireCaches(key)
-        }
-        findBySeq(serviceId, serviceColumnId, seq).get
-    }
-  }
-
-  def toCacheKeys(id: Int, serviceId: Int, serviceColumnId: Int, seq: Byte, seqs: Seq[Byte]): Seq[String] = {
-    Seq(s"id=$id",
-      s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seq=$seq",
-      s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seqs=$seqs",
-      s"serviceId=$serviceId:serviceColumnId=$serviceColumnId")
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val me = findById(id)
-    val seqs = me.metaSeqs.mkString(",")
-    val (serviceId, serviceColumnId, seq) = (me.serviceId, me.serviceColumnId, me.seq)
-    lazy val sql = sql"""delete from $dbTableName where id = ${id}"""
-
-    sql.execute.apply()
-
-    val cacheKeys = toCacheKeys(id, serviceId, serviceColumnId, seq, me.metaSeqs)
-
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-//  def findAll()(implicit session: DBSession = AutoSession) = {
-//    val ls = sql"""select * from $dbTableName""".map { rs => ServiceColumnIndex(rs) }.list.apply
-//    val singles = ls.flatMap { x =>
-//      val cacheKeys = toCacheKeys(x.id.get, x.serviceId, x.serviceColumnId, x.seq, x.metaSeqs).dropRight(1)
-//      cacheKeys.map { cacheKey =>
-//        cacheKey -> x
-//      }
-//    }
-//    val multies = ls.groupBy(x => (x.serviceId, x.serviceColumnId)).map { case ((serviceId, serviceColumnId), ls) =>
-//      val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId"
-//      cacheKey -> ls
-//    }.toList
-//
-//    putsToCache(singles)
-//    putsToCaches(multies)
-//
-//  }
-}
-
-case class ServiceColumnIndex(id: Option[Int],
-                              serviceId: Int,
-                              serviceColumnId: Int,
-                              name: String,
-                              seq: Byte,
-                              metaSeqs: Seq[Byte],
-                              options: Option[String]) {
-
-}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index 00ef233..75e9657 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.parsers
 
 import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException}
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.types.InnerValLike
 import org.apache.s2graph.core.{S2EdgeLike}
 import org.apache.s2graph.core.JSONParser._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 6b1ce75..f59abc0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -26,7 +26,7 @@
 import org.apache.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException}
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
 import org.apache.s2graph.core.types._
 import play.api.libs.json._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 460c627..c768d81 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -24,7 +24,7 @@
 import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import org.apache.s2graph.core.schema.{Bucket, Experiment, Service}
 import org.apache.s2graph.core.utils.logger
 import play.api.libs.json._
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
new file mode 100644
index 0000000..c88f854
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import play.api.libs.json.{JsValue, Json}
+import scalikejdbc._
+
+import scala.util.Try
+
+object Bucket extends SQLSyntaxSupport[Bucket] {
+  import Schema._
+  val className = Bucket.getClass.getSimpleName
+
+  val rangeDelimiter = "~"
+  val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.")
+  val InActiveModulars = Set("0~0")
+
+  def valueOf(rs: WrappedResultSet): Bucket = {
+    Bucket(rs.intOpt("id"),
+      rs.int("experiment_id"),
+      rs.string("modular"),
+      rs.string("http_verb"),
+      rs.string("api_path"),
+      rs.string("request_body"),
+      rs.int("timeout"),
+      rs.string("impression_id"),
+      rs.boolean("is_graph_query"),
+      rs.boolean("is_empty"))
+  }
+
+  def finds(experimentId: Int)(implicit session: DBSession = AutoSession): List[Bucket] = {
+    val cacheKey = className + "experimentId=" + experimentId
+
+    withCaches(cacheKey, broadcast = false) {
+      sql"""select * from buckets where experiment_id = $experimentId"""
+        .map { rs => Bucket.valueOf(rs) }.list().apply()
+    }
+  }
+
+  def toRange(str: String): Option[(Int, Int)] = {
+    val range = str.split(rangeDelimiter)
+    if (range.length == 2) Option((range.head.toInt, range.last.toInt))
+    else None
+  }
+
+  def findByImpressionId(impressionId: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Bucket] = {
+    val cacheKey = className + "impressionId=" + impressionId
+
+    lazy val sql = sql"""select * from buckets where impression_id=$impressionId"""
+      .map { rs => Bucket.valueOf(rs)}.single().apply()
+
+    if (useCache) withCache(cacheKey)(sql)
+    else sql
+
+  }
+
+  def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Bucket = {
+    val cacheKey = className + "id=" + id
+    lazy val sql = sql"""select * from buckets where id = $id""".map { rs => Bucket.valueOf(rs)}.single().apply()
+    if (useCache) withCache(cacheKey, false) { sql }.get
+    else sql.get
+  }
+
+  def update(id: Int,
+             experimentId: Int,
+             modular: String,
+             httpVerb: String,
+             apiPath: String,
+             requestBody: String,
+             timeout: Int,
+             impressionId: String,
+             isGraphQuery: Boolean,
+             isEmpty: Boolean)(implicit session: DBSession = AutoSession): Try[Bucket] = {
+    Try {
+      sql"""
+            UPDATE buckets set experiment_id = $experimentId, modular = $modular, http_verb = $httpVerb, api_path = $apiPath,
+            request_body = $requestBody, timeout = $timeout, impression_id = $impressionId,
+            is_graph_query = $isGraphQuery, is_empty = $isEmpty WHERE id = $id
+        """
+        .update().apply()
+    }.map { cnt =>
+      findById(id)
+    }
+  }
+
+  def insert(experimentId: Int, modular: String, httpVerb: String, apiPath: String,
+             requestBody: String, timeout: Int, impressionId: String,
+             isGraphQuery: Boolean, isEmpty: Boolean)
+            (implicit session: DBSession = AutoSession): Try[Bucket] = {
+    Try {
+      sql"""
+            INSERT INTO buckets(experiment_id, modular, http_verb, api_path, request_body, timeout, impression_id,
+             is_graph_query, is_empty)
+            VALUES (${experimentId}, $modular, $httpVerb, $apiPath, $requestBody, $timeout, $impressionId,
+             $isGraphQuery, $isEmpty)
+        """
+        .updateAndReturnGeneratedKey().apply()
+    }.map { newId =>
+      Bucket(Some(newId.toInt), experimentId, modular, httpVerb, apiPath, requestBody, timeout, impressionId,
+        isGraphQuery, isEmpty)
+    }
+  }
+}
+
+case class Bucket(id: Option[Int],
+                  experimentId: Int,
+                  modular: String,
+                  httpVerb: String, apiPath: String,
+                  requestBody: String, timeout: Int, impressionId: String,
+                  isGraphQuery: Boolean = true,
+                  isEmpty: Boolean = false) {
+
+  import Bucket._
+
+  lazy val rangeOpt = toRange(modular)
+
+  def toJson(): JsValue =
+    Json.obj("id" -> id, "experimentId" -> experimentId, "modular" -> modular, "httpVerb" -> httpVerb,
+      "requestBody" -> requestBody, "isGraphQuery" -> isGraphQuery, "isEmpty" -> isEmpty)
+
+}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
similarity index 81%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
index 51f4a93..e850541 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
@@ -17,16 +17,19 @@
  * under the License.
  */
 
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
 
 import play.api.libs.json.Json
 import scalikejdbc._
 
 import scala.util.Try
 
-object ColumnMeta extends Model[ColumnMeta] {
+object ColumnMeta extends SQLSyntaxSupport[ColumnMeta] {
+  import Schema._
+  val className = ColumnMeta.getClass.getSimpleName
 
-  val timeStampSeq = -1.toByte
+  val timeStampSeq = 0.toByte
+  val countSeq = -1.toByte
   val lastModifiedAtColumnSeq = 0.toByte
   val lastModifiedAtColumn = ColumnMeta(Some(0), 0, "lastModifiedAt", lastModifiedAtColumnSeq, "long", "-1")
   val maxValue = Byte.MaxValue
@@ -44,8 +47,7 @@
   }
 
   def findById(id: Int)(implicit session: DBSession = AutoSession) = {
-    //    val cacheKey = s"id=$id"
-    val cacheKey = "id=" + id
+    val cacheKey = className + "id=" + id
     withCache(cacheKey) {
       sql"""select * from column_metas where id = ${id}""".map { rs => ColumnMeta.valueOf(rs) }.single.apply
     }.get
@@ -53,7 +55,7 @@
 
   def findAllByColumn(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
     //    val cacheKey = s"columnId=$columnId"
-    val cacheKey = "columnId=" + columnId
+    val cacheKey = className + "columnId=" + columnId
     if (useCache) {
       withCaches(cacheKey)( sql"""select *from column_metas where column_id = ${columnId} order by seq ASC"""
         .map { rs => ColumnMeta.valueOf(rs) }.list.apply())
@@ -65,14 +67,12 @@
 
   def findByName(columnId: Int, name: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
     //    val cacheKey = s"columnId=$columnId:name=$name"
-    val cacheKey = "columnId=" + columnId + ":name=" + name
-    if (useCache) {
-      withCache(cacheKey)( sql"""select * from column_metas where column_id = ${columnId} and name = ${name}"""
-          .map { rs => ColumnMeta.valueOf(rs) }.single.apply())
-    } else {
-      sql"""select * from column_metas where column_id = ${columnId} and name = ${name}"""
-          .map { rs => ColumnMeta.valueOf(rs) }.single.apply()
-    }
+    val cacheKey = className + "columnId=" + columnId + ":name=" + name
+    lazy val sql =  sql"""select * from column_metas where column_id = ${columnId} and name = ${name}"""
+      .map { rs => ColumnMeta.valueOf(rs) }.single.apply()
+
+    if (useCache) withCache(cacheKey)(sql)
+    else sql
   }
 
   def insert(columnId: Int, name: String, dataType: String, defaultValue: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = {
@@ -95,13 +95,13 @@
       case Some(c) => c
       case None =>
         insert(columnId, name, dataType, defaultValue, storeInGlobalIndex)
-        expireCache(s"columnId=$columnId:name=$name")
+        expireCache(className + s"columnId=$columnId:name=$name")
         findByName(columnId, name).get
     }
   }
 
   def findByIdAndSeq(columnId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "columnId=" + columnId + ":seq=" + seq
+    val cacheKey = className + "columnId=" + columnId + ":seq=" + seq
     lazy val columnMetaOpt = sql"""
         select * from column_metas where column_id = ${columnId} and seq = ${seq}
     """.map { rs => ColumnMeta.valueOf(rs) }.single.apply()
@@ -114,30 +114,26 @@
     val columnMeta = findById(id)
     val (columnId, name) = (columnMeta.columnId, columnMeta.name)
     sql"""delete from column_metas where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", s"columnId=$columnId")
+    val cacheKeys = List(s"id=$id", s"columnId=$columnId:name=$name", s"colunmId=$columnId")
     cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
+      expireCache(className + key)
+      expireCaches(className + key)
     }
   }
 
   def findAll()(implicit session: DBSession = AutoSession) = {
     val ls = sql"""select * from column_metas""".map { rs => ColumnMeta.valueOf(rs) }.list().apply()
 
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"columnId=${x.columnId}:name=${x.name}",
+        s"columnId=${x.columnId}:seq=${x.seq}"
+      ).map(cacheKey => (className + cacheKey, x))
     })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"columnId=${x.columnId}:name=${x.name}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"columnId=${x.columnId}:seq=${x.seq}"
-      (cacheKey -> x)
-    })
+
     putsToCaches(ls.groupBy(x => x.columnId).map { case (columnId, ls) =>
-      val cacheKey = s"columnId=${columnId}"
+      val cacheKey = className + s"columnId=${columnId}"
       (cacheKey -> ls)
     }.toList)
   }
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala
similarity index 88%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala
index 0b16449..860bf70 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Experiment.scala
@@ -17,14 +17,17 @@
  * under the License.
  */
 
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
 
 import org.apache.s2graph.core.GraphUtil
 import scalikejdbc._
 
 import scala.util.{Try, Random}
 
-object Experiment extends Model[Experiment] {
+object Experiment extends SQLSyntaxSupport[Experiment] {
+  import Schema._
+  val className = Experiment.getClass.getSimpleName
+
   val ImpressionKey = "S2-Impression-Id"
   val ImpressionId = "Impression-Id"
 
@@ -38,24 +41,24 @@
   }
 
   def finds(serviceId: Int)(implicit session: DBSession = AutoSession): List[Experiment] = {
-    val cacheKey = "serviceId=" + serviceId
-    withCaches(cacheKey) {
+    val cacheKey = className + "serviceId=" + serviceId
+    withCaches(cacheKey, false) {
       sql"""select * from experiments where service_id = ${serviceId}"""
         .map { rs => Experiment(rs) }.list().apply()
     }
   }
 
   def findBy(serviceId: Int, name: String)(implicit session: DBSession = AutoSession): Option[Experiment] = {
-    val cacheKey = "serviceId=" + serviceId + ":name=" + name
-    withCache(cacheKey) {
+    val cacheKey = className + "serviceId=" + serviceId + ":name=" + name
+    withCache(cacheKey, false) {
       sql"""select * from experiments where service_id = ${serviceId} and name = ${name}"""
         .map { rs => Experiment(rs) }.single.apply
     }
   }
 
   def findById(id: Int)(implicit session: DBSession = AutoSession): Option[Experiment] = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey)(
+    val cacheKey = className + "id=" + id
+    withCache(cacheKey, false)(
       sql"""select * from experiments where id = ${id}"""
         .map { rs => Experiment(rs) }.single.apply
     )
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
similarity index 93%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
index c128163..7fb1183 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
 
 import java.util.Calendar
 
@@ -31,7 +31,9 @@
 import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
 import scalikejdbc._
 
-object Label extends Model[Label] {
+object Label extends SQLSyntaxSupport[Label] {
+  import Schema._
+  val className = Label.getClass.getSimpleName
 
   val maxHBaseTableNames = 2
 
@@ -51,9 +53,8 @@
     Label.delete(id.get)
   }
 
-
   def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = {
-    val cacheKey = "label=" + labelName
+    val cacheKey = className + "label=" + labelName
     lazy val labelOpt =
       sql"""
         select *
@@ -98,7 +99,7 @@
   }
 
   def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): Option[Label] = {
-    val cacheKey = "id=" + id
+    val cacheKey = className + "id=" + id
     withCache(cacheKey)(
       sql"""
         select 	*
@@ -109,7 +110,7 @@
   }
 
   def findById(id: Int)(implicit session: DBSession = AutoSession): Label = {
-    val cacheKey = "id=" + id
+    val cacheKey = className + "id=" + id
     withCache(cacheKey)(
       sql"""
         select 	*
@@ -120,7 +121,7 @@
   }
 
   def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
-    val cacheKey = "tgtColumnId=" + columnId
+    val cacheKey = className + "tgtColumnId=" + columnId
     val col = ServiceColumn.findById(columnId)
     withCaches(cacheKey)(
       sql"""
@@ -133,7 +134,7 @@
   }
 
   def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
-    val cacheKey = "srcColumnId=" + columnId
+    val cacheKey = className + "srcColumnId=" + columnId
     val col = ServiceColumn.findById(columnId)
     withCaches(cacheKey)(
       sql"""
@@ -146,14 +147,14 @@
   }
 
   def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
-    val cacheKey = "srcServiceId=" + serviceId
+    val cacheKey = className + "srcServiceId=" + serviceId
     withCaches(cacheKey)(
       sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
     )
   }
 
   def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
-    val cacheKey = "tgtServiceId=" + serviceId
+    val cacheKey = className + "tgtServiceId=" + serviceId
     withCaches(cacheKey)(
       sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
     )
@@ -189,9 +190,9 @@
         val tgtServiceId = tgtService.id.get
         val serviceId = service.id.get
 
-        /* insert serviceColumn */
-        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType))
-        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType))
+        /** insert serviceColumn */
+        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion)
+        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion)
 
         if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}")
         if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}")
@@ -221,7 +222,7 @@
 
           val cacheKeys = List(s"id=$createdId", s"label=$labelName")
           val ret = findByName(labelName, useCache = false).get
-          putsToCache(cacheKeys.map(k => k -> ret))
+          putsToCacheOption(cacheKeys.map(k => className + k -> ret))
           ret
         }
       }
@@ -231,17 +232,12 @@
 
   def findAll()(implicit session: DBSession = AutoSession) = {
     val ls = sql"""select * from labels where deleted_at is null""".map { rs => Label(rs) }.list().apply()
-
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"label=${x.label}"
+      ).map(cacheKey => (className + cacheKey, x))
     })
-
-    putsToCache(ls.map { x =>
-      val cacheKey = s"label=${x.label}"
-      (cacheKey -> x)
-    })
-
     ls
   }
 
@@ -255,10 +251,10 @@
     val cnt = sql"""update labels set hbase_table_name = $newHTableName where label = $labelName""".update().apply()
     val label = Label.findByName(labelName, useCache = false).get
 
-    val cacheKeys = List(s"id=${label.id}", s"label=${label.label}")
+    val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}")
     cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
+      expireCache(className + key)
+      expireCaches(className + key)
     }
     cnt
   }
@@ -269,8 +265,8 @@
     val cnt = sql"""delete from labels where id = ${label.id.get}""".update().apply()
     val cacheKeys = List(s"id=$id", s"label=${label.label}")
     cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
+      expireCache(className + key)
+      expireCaches(className + key)
     }
     cnt
   }
@@ -282,10 +278,10 @@
     val now = Calendar.getInstance().getTime
     val newName = s"deleted_${now.getTime}_"+ label.label
     val cnt = sql"""update labels set label = ${newName}, deleted_at = ${now} where id = ${label.id.get}""".update.apply()
-    val cacheKeys = List(s"id=${label.id}", s"label=${oldName}")
+    val cacheKeys = List(s"id=${label.id.get}", s"label=${oldName}")
     cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
+      expireCache(className + key)
+      expireCaches(className + key)
     }
     cnt
   }
@@ -387,17 +383,16 @@
       Set.empty[String]
   }
 
-  lazy val extraOptions = Model.extraOptions(options)
+  lazy val extraOptions = Schema.extraOptions(options)
 
   lazy val durability = extraOptions.get("durability").map(_.as[Boolean]).getOrElse(true)
 
   lazy val storageConfigOpt: Option[Config] = toStorageConfig
 
   def toStorageConfig: Option[Config] = {
-    Model.toStorageConfig(extraOptions)
+    Schema.toStorageConfig(extraOptions)
   }
 
-
   def srcColumnWithDir(dir: Int) = {
     // GraphUtil.directions("out"
     if (dir == 0) srcColumn else tgtColumn
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
similarity index 79%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
index 1da0e55..bb8425f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
@@ -17,15 +17,18 @@
  * under the License.
  */
 
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
 
 import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.LabelIndex.LabelIndexMutateOption
+import org.apache.s2graph.core.schema.LabelIndex.LabelIndexMutateOption
 import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.{JsObject, JsString, Json}
 import scalikejdbc._
 
-object LabelIndex extends Model[LabelIndex] {
+object LabelIndex extends SQLSyntaxSupport[LabelIndex] {
+  import Schema._
+  val className = LabelIndex.getClass.getSimpleName
+
   val DefaultName = "_PK"
   val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq)
   val DefaultSeq = 1.toByte
@@ -64,14 +67,14 @@
   }
 
   def findById(id: Int)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "id=" + id
+    val cacheKey = className + "id=" + id
     withCache(cacheKey) {
       sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply
     }.get
   }
 
   def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "labelId=" + labelId
+    val cacheKey = className + "labelId=" + labelId
     if (useCache) {
       withCaches(cacheKey)( sql"""
         select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
@@ -105,8 +108,8 @@
           s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId")
 
         cacheKeys.foreach { key =>
-          expireCache(key)
-          expireCaches(key)
+          expireCache(className + key)
+          expireCaches(className + key)
         }
 
         findByLabelIdAndSeq(labelId, seq).get
@@ -114,7 +117,7 @@
   }
 
   def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
-    val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
+    val cacheKey = className + "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
     withCache(cacheKey) {
       sql"""
       select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction}
@@ -124,7 +127,7 @@
 
   def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
     //    val cacheKey = s"labelId=$labelId:seq=$seq"
-    val cacheKey = "labelId=" + labelId + ":seq=" + seq
+    val cacheKey = className + "labelId=" + labelId + ":seq=" + seq
     if (useCache) {
       withCache(cacheKey)( sql"""
       select * from label_indices where label_id = ${labelId} and seq = ${seq}
@@ -144,69 +147,29 @@
 
     val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}")
     cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
+      expireCache(className + key)
+      expireCaches(className + key)
     }
   }
 
   def findAll()(implicit session: DBSession = AutoSession) = {
     val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply
-    putsToCache(ls.map { x =>
-      var cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"labelId=${x.labelId}:seq=${x.seq}",
+        s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
+      ).map(cacheKey => (className + cacheKey, x))
     })
-    putsToCache(ls.map { x =>
-      var cacheKey = s"labelId=${x.labelId}:seq=${x.seq}}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
-      (cacheKey -> x)
-    })
+
     putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
       val cacheKey = s"labelId=${labelId}"
-      (cacheKey -> ls)
+      (className + cacheKey -> ls)
     }.toList)
   }
 }
-/**
-mgmt.buildIndex('nameAndAge',Vertex.class)
-.addKey(name,Mapping.TEXT.getParameter())
-.addKey(age,Mapping.TEXT.getParameter())
-.buildMixedIndex("search")
 
-v: {name: abc} - E1: {age: 20}, E2, E3....
 
-Management.createServiceColumn(
-		serviceName = serviceName, columnName = "person", columnType = "integer",
-    props = Seq(
-    	Prop("name", "-", "string"),
-    	Prop("age", "0", "integer"),
-    	Prop("location", "-", "string")
-    )
-)
-
-management.createLabel(
-		label = "bought",
-    srcServiceName = serviceName, srcColumnName = "person", srcColumnType = "integer",
-    tgtServiceName = serviceName, tgtColumnName = "product", tgtColumnType = "integer", idDirected = true,
-    serviceName = serviceName,
-    indices = Seq(
-    	Index("PK", Seq("amount", "created_at"), IndexType("mixed", propsMapping: Map[String, String]),
-{"in": {}, "out": {}})
-    ),
-    props = Seq(
-    	Prop("amount", "0.0", "double"),
-    	Prop("created_at", "2000-01-01", "string")
-    ),
-    consistencyLevel = "strong"
-)
-
-mgmt.buildIndex('PK', Edge.class)
-  .addKey(amount, Double)
-  .buildCompositeIndex
-
-*/
 case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String,
                       dir: Option[Int], options: Option[String]) {
   // both
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala
similarity index 91%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala
index 3f54f49..f25be5f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelMeta.scala
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
 
 /**
  * Created by shon on 6/3/15.
@@ -30,8 +30,9 @@
 
 import scala.util.Try
 
-object LabelMeta extends Model[LabelMeta] {
-
+object LabelMeta extends SQLSyntaxSupport[LabelMeta] {
+  import Schema._
+  val className = LabelMeta.getClass.getSimpleName
   /** dummy sequences */
 
   val fromSeq = (-4).toByte
@@ -93,7 +94,7 @@
   def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq // || seq == fromHashSeq
 
   def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta = {
-    val cacheKey = "id=" + id
+    val cacheKey = className + "id=" + id
 
     withCache(cacheKey) {
       sql"""select * from label_metas where id = ${id}""".map { rs => LabelMeta(rs) }.single.apply
@@ -101,7 +102,7 @@
   }
 
   def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[LabelMeta] = {
-    val cacheKey = "labelId=" + labelId
+    val cacheKey = className + "labelId=" + labelId
     lazy val labelMetas = sql"""select *
     		  						from label_metas
     		  						where label_id = ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply()
@@ -116,7 +117,7 @@
       case from.name => Some(from)
       case to.name => Some(to)
       case _ =>
-        val cacheKey = "labelId=" + labelId + ":name=" + name
+        val cacheKey = className + "labelId=" + labelId + ":name=" + name
         lazy val labelMeta = sql"""
             select *
             from label_metas where label_id = ${labelId} and name = ${name}"""
@@ -132,8 +133,8 @@
     val seq = ls.size + 1
 
     if (seq < maxValue) {
-      sql"""insert into label_metas(label_id, name, seq, default_value, data_type, store_in_global_index)
-    select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}, ${storeInGlobalIndex}""".updateAndReturnGeneratedKey.apply()
+      sql"""insert into label_metas(label_id, name, seq, default_value, data_type)
+    select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}""".updateAndReturnGeneratedKey.apply()
     } else {
       throw MaxPropSizeReachedException("max property size reached")
     }
@@ -151,8 +152,8 @@
         insert(labelId, name, defaultValue, dataType, storeInGlobalIndex)
         val cacheKey = "labelId=" + labelId + ":name=" + name
         val cacheKeys = "labelId=" + labelId
-        expireCache(cacheKey)
-        expireCaches(cacheKeys)
+        expireCache(className + cacheKey)
+        expireCaches(className + cacheKeys)
         findByName(labelId, name, useCache = false).get
     }
   }
@@ -163,24 +164,19 @@
     sql"""delete from label_metas where id = ${id}""".execute.apply()
     val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:name=$name")
     cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
+      expireCache(className + key)
+      expireCaches(className + key)
     }
   }
 
   def findAll()(implicit session: DBSession = AutoSession) = {
     val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) }.list.apply
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      cacheKey -> x
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"labelId=${x.labelId}:name=${x.name}"
-      cacheKey -> x
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"labelId=${x.labelId}:seq=${x.seq}"
-      cacheKey -> x
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"labelId=${x.labelId}:name=${x.name}",
+        s"labelId=${x.labelId}:seq=${x.seq}"
+      ).map(cacheKey => (className + cacheKey, x))
     })
 
     putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
similarity index 79%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
index e21072e..5b9e0ae 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
 
 import java.util.concurrent.Executors
 import java.util.concurrent.atomic.AtomicLong
@@ -33,19 +33,21 @@
 import scala.language.{higherKinds, implicitConversions}
 import scala.util.{Failure, Success, Try}
 
-object Model {
+object Schema {
   var maxSize = 10000
   var ttl = 60
+  var safeUpdateCache: SafeUpdateCache = _
+
   val numOfThread = Runtime.getRuntime.availableProcessors()
   val threadPool = Executors.newFixedThreadPool(numOfThread)
   val ec = ExecutionContext.fromExecutor(threadPool)
-  val useUTF8Encoding = "?useUnicode=true&characterEncoding=utf8"
 
   private val ModelReferenceCount = new AtomicLong(0L)
 
   def apply(config: Config) = {
     maxSize = config.getInt("cache.max.size")
     ttl = config.getInt("cache.ttl.seconds")
+
     Class.forName(config.getString("db.default.driver"))
 
     val settings = ConnectionPoolSettings(
@@ -61,7 +63,7 @@
       settings)
 
     checkSchema()
-
+    safeUpdateCache = new SafeUpdateCache(maxSize, ttl)(ec)
     ModelReferenceCount.incrementAndGet()
   }
 
@@ -140,7 +142,8 @@
             throw new IllegalStateException(s"Failed to list models", e)
         }
       }
-      clearCache()
+//      clearCache()
+      safeUpdateCache.shutdown()
       ConnectionPool.closeAll()
     }
 
@@ -153,14 +156,14 @@
     ColumnMeta.findAll()
   }
 
-  def clearCache() = {
-    Service.expireAll()
-    ServiceColumn.expireAll()
-    Label.expireAll()
-    LabelMeta.expireAll()
-    LabelIndex.expireAll()
-    ColumnMeta.expireAll()
-  }
+//  def clearCache() = {
+//    Service.expireAll()
+//    ServiceColumn.expireAll()
+//    Label.expireAll()
+//    LabelMeta.expireAll()
+//    LabelIndex.expireAll()
+//    ColumnMeta.expireAll()
+//  }
 
   def extraOptions(options: Option[String]): Map[String, JsValue] = options match {
     case None => Map.empty
@@ -181,6 +184,7 @@
         val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, value) =>
           key -> JSONParser.jsValueToAny(value).getOrElse(throw new RuntimeException("!!"))
         }
+
         ConfigFactory.parseMap(configMap.asJava)
       }
     } catch {
@@ -189,43 +193,33 @@
         None
     }
   }
-}
 
-trait Model[V] extends SQLSyntaxSupport[V] {
+  private def toMultiKey(key: String): String = key + ".__m__"
 
-  import Model._
+  def withCache[T <: AnyRef](key: String, broadcast: Boolean = true)(op: => T) = safeUpdateCache.withCache(key, broadcast)(op)
 
-  implicit val ec: ExecutionContext = Model.ec
+  def withCaches[T <: AnyRef](key: String, broadcast: Boolean = true)(op: => T) = safeUpdateCache.withCache(toMultiKey(key), broadcast)(op)
 
-  val cName = this.getClass.getSimpleName()
-  logger.info(s"LocalCache[$cName]: TTL[$ttl], MaxSize[$maxSize]")
+  def expireCache(key: String) = safeUpdateCache.invalidate(key)
 
-  val optionCache = new SafeUpdateCache[Option[V]](cName, maxSize, ttl)
-  val listCache = new SafeUpdateCache[List[V]](cName, maxSize, ttl)
+  def expireCaches(key: String) = safeUpdateCache.invalidate(toMultiKey(key))
 
-  val withCache = optionCache.withCache _
-
-  val withCaches = listCache.withCache _
-
-  val expireCache = optionCache.invalidate _
-
-  val expireCaches = listCache.invalidate _
-
-  def expireAll() = {
-    listCache.invalidateAll()
-    optionCache.invalidateAll()
+  def putsToCacheOption[T <: AnyRef](kvs: List[(String, T)]) = kvs.foreach {
+    case (key, value) => safeUpdateCache.put(key, Option(value))
   }
 
-  def putsToCache(kvs: List[(String, V)]) = kvs.foreach {
-    case (key, value) => optionCache.put(key, Option(value))
+  def putsToCaches[T <: AnyRef](kvs: List[(String, T)]) = kvs.foreach {
+    case (key, values) => safeUpdateCache.put(toMultiKey(key), values)
   }
 
-  def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach {
-    case (key, values) => listCache.put(key, values)
+  def getCacheSize(): Int = safeUpdateCache.asMap().size()
+
+  def getAllCacheData[T <: AnyRef](): (List[(String, T)], List[(String, List[T])]) = {
+    (Nil, Nil)
   }
 
-  def getAllCacheData() : (List[(String, Option[_])], List[(String, List[_])]) = {
-    (optionCache.getAllData(), listCache.getAllData())
-  }
+  def toBytes(): Array[Byte] = safeUpdateCache.toBytes()
+
+  def fromBytes(safeUpdateCache: SafeUpdateCache, bytes: Array[Byte]): Unit = SafeUpdateCache.fromBytes(safeUpdateCache, bytes)
 }
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
similarity index 82%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
index 5b4f494..611a746 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
 
 import java.util.UUID
 
@@ -26,24 +26,32 @@
 import play.api.libs.json.Json
 import scalikejdbc._
 
-object Service extends Model[Service] {
+object Service extends SQLSyntaxSupport[Service] {
+  import Schema._
+  val className = Service.getClass.getSimpleName
+
   def valueOf(rs: WrappedResultSet): Service = {
-    Service(rs.intOpt("id"), rs.string("service_name"), rs.string("access_token"),
-      rs.string("cluster"), rs.string("hbase_table_name"), rs.int("pre_split_size"), rs.intOpt("hbase_table_ttl"))
+    Service(rs.intOpt("id"),
+      rs.string("service_name"),
+      rs.string("access_token"),
+      rs.string("cluster"),
+      rs.string("hbase_table_name"),
+      rs.int("pre_split_size"),
+      rs.intOpt("hbase_table_ttl"))
   }
 
   def findByAccessToken(accessToken: String)(implicit session: DBSession = AutoSession): Option[Service] = {
-    val cacheKey = s"accessToken=$accessToken"
+    val cacheKey = className + s"accessToken=$accessToken"
     withCache(cacheKey)( sql"""select * from services where access_token = ${accessToken}""".map { rs => Service.valueOf(rs) }.single.apply)
   }
 
   def findById(id: Int)(implicit session: DBSession = AutoSession): Service = {
-    val cacheKey = "id=" + id
+    val cacheKey = className + "id=" + id
     withCache(cacheKey)( sql"""select * from services where id = ${id}""".map { rs => Service.valueOf(rs) }.single.apply).get
   }
 
   def findByName(serviceName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Service] = {
-    val cacheKey = "serviceName=" + serviceName
+    val cacheKey = className + "serviceName=" + serviceName
     lazy val serviceOpt = sql"""
         select * from services where service_name = ${serviceName}
       """.map { rs => Service.valueOf(rs) }.single.apply()
@@ -67,8 +75,8 @@
     sql"""delete from service_columns where id = ${id}""".execute.apply()
     val cacheKeys = List(s"id=$id", s"serviceName=$serviceName")
     cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
+      expireCache(className + key)
+      expireCaches(className + key)
     }
   }
 
@@ -79,21 +87,18 @@
       case None =>
         insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
         val cacheKey = "serviceName=" + serviceName
-        expireCache(cacheKey)
+        expireCache(className + cacheKey)
         findByName(serviceName).get
     }
   }
 
   def findAll()(implicit session: DBSession = AutoSession) = {
     val ls = sql"""select * from services""".map { rs => Service.valueOf(rs) }.list.apply
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-
-    putsToCache(ls.map { x =>
-      val cacheKey = s"serviceName=${x.serviceName}"
-      (cacheKey -> x)
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"serviceName=${x.serviceName}"
+      ).map(cacheKey => (className + cacheKey, x))
     })
 
     ls
@@ -121,8 +126,8 @@
         Json.parse("{}")
     }
 
-  lazy val extraOptions = Model.extraOptions(options)
+  lazy val extraOptions = Schema.extraOptions(options)
   lazy val storageConfigOpt: Option[Config] = toStorageConfig
   def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache)
-  def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions)
+  def toStorageConfig: Option[Config] = Schema.toStorageConfig(extraOptions)
 }
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
similarity index 86%
rename from s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
rename to s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
index e8bec06..cc1698a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.s2graph.core.mysqls
+package org.apache.s2graph.core.schema
 
 import org.apache.s2graph.core.JSONParser
 import org.apache.s2graph.core.JSONParser._
@@ -25,7 +25,10 @@
 import play.api.libs.json.Json
 import scalikejdbc._
 
-object ServiceColumn extends Model[ServiceColumn] {
+object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
+  import Schema._
+  val className = ServiceColumn.getClass.getSimpleName
+
   val Default = ServiceColumn(Option(0), -1, "default", "string", "v4")
 
   def valueOf(rs: WrappedResultSet): ServiceColumn = {
@@ -42,18 +45,14 @@
   }
 
   def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
-    val cacheKey = "id=" + id
-
-    if (useCache) {
-      withCache(cacheKey)(sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply).get
-    } else {
-      sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply.get
-    }
+    val cacheKey = className + "id=" + id
+    lazy val sql = sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply
+    if (useCache) withCache(cacheKey)(sql).get
+    else sql.get
   }
 
   def find(serviceId: Int, columnName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = {
-//    val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
-    val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
+    val cacheKey = className + "serviceId=" + serviceId + ":columnName=" + columnName
     if (useCache) {
       withCache(cacheKey) {
         sql"""
@@ -76,8 +75,8 @@
     sql"""delete from service_columns where id = ${id}""".execute.apply()
     val cacheKeys = List(s"id=$id", s"serviceId=$serviceId:columnName=$columnName")
     cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
+      expireCache(className + key)
+      expireCaches(className + key)
     }
   }
   def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
@@ -87,20 +86,19 @@
         insert(serviceId, columnName, columnType, schemaVersion)
 //        val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
         val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
-        expireCache(cacheKey)
+        expireCache(className + cacheKey)
         find(serviceId, columnName).get
     }
   }
   def findAll()(implicit session: DBSession = AutoSession) = {
     val ls = sql"""select * from service_columns""".map { rs => ServiceColumn.valueOf(rs) }.list.apply
-    putsToCache(ls.map { x =>
-      var cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
+    putsToCacheOption(ls.flatMap { x =>
+      Seq(
+        s"id=${x.id.get}",
+        s"serviceId=${x.serviceId}:columnName=${x.columnName}"
+      ).map(cacheKey => (className + cacheKey, x))
     })
-    putsToCache(ls.map { x =>
-      var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}"
-      (cacheKey -> x)
-    })
+
     ls
   }
 }
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 2d74a7c..2b02bdd 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -22,7 +22,7 @@
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.TraversalHelper._
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.parsers.WhereParser
 import org.apache.s2graph.core.utils.logger
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index b1fdd7c..0526042 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -26,7 +26,7 @@
 import com.typesafe.config.Config
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.storage.serde._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
index e575c5b..5db02cc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
@@ -24,7 +24,7 @@
 import com.typesafe.config.Config
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
 import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange}
 import org.apache.s2graph.core.storage.serde.StorageSerializable
 import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, StorageSerDe}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
index 9871b36..0748efb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core.storage.serde
 
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.utils.logger
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
index dc7690b..884ca11 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageDeserializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs}
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
index 219d097..20e676a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/StorageSerializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{ColumnMeta, LabelMeta}
+import org.apache.s2graph.core.schema.{ColumnMeta, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 945f246..44ae52f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -21,7 +21,7 @@
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.storage.serde.Deserializable
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
index 28982dc..6e467b6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.indexedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.{GraphUtil, IndexEdge}
 import org.apache.s2graph.core.storage.serde.StorageSerializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index e533b4b..1051c6e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -21,7 +21,7 @@
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.storage.serde.Deserializable
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 34e9a6e..91127f9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.indexedge.wide
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.{GraphUtil, IndexEdge}
 import org.apache.s2graph.core.storage.serde.StorageSerializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index f5c10a7..580acd7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.snapshotedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelMeta, ServiceColumn}
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.types._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 02b2977..7937de8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -21,7 +21,7 @@
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.{S2Edge, SnapshotEdge}
-import org.apache.s2graph.core.mysqls.LabelIndex
+import org.apache.s2graph.core.schema.LabelIndex
 import org.apache.s2graph.core.storage.serde._
 import org.apache.s2graph.core.storage.serde.StorageSerializable._
 import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index 7dec6d9..e0243cc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.snapshotedge.wide
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceVertexId, TargetVertexId}
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index 44d4a2a..c5daa99 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -21,7 +21,7 @@
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.{S2Edge, SnapshotEdge}
-import org.apache.s2graph.core.mysqls.LabelIndex
+import org.apache.s2graph.core.schema.LabelIndex
 import org.apache.s2graph.core.storage.serde.Serializable
 import org.apache.s2graph.core.storage.serde.StorageSerializable._
 import org.apache.s2graph.core.types.VertexId
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
index 2a7cd6a..a1f5705 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core.storage.serde.vertex.tall
 
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.storage.serde.Deserializable
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
index d1d4d7d..f27cc5c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core.storage.serde.vertex.wide
 
-import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.schema.ColumnMeta
 import org.apache.s2graph.core.storage.CanSKeyValue
 import org.apache.s2graph.core.storage.serde.Deserializable
 import org.apache.s2graph.core.storage.serde.StorageDeserializable._
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
index 77951c0..f88509f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.types
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.schema.LabelMeta
 
 object HBaseType {
   val VERSION4 = "v4"
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
index b1de0c4..1ad56b3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
@@ -21,7 +21,7 @@
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.{GraphUtil, S2Vertex}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.HBaseType._
 import org.apache.s2graph.core.io.Conversions._
 import play.api.libs.json.Json
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
index b402c0f..36c1a6d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
@@ -19,12 +19,15 @@
 
 package org.apache.s2graph.core.utils
 
+import com.typesafe.config.ConfigFactory
 import org.slf4j.LoggerFactory
 import play.api.libs.json.JsValue
 
 import scala.language.{higherKinds, implicitConversions}
+import scala.util.{Random, Try}
 
 object logger {
+  val conf = ConfigFactory.load()
 
   trait Loggable[T] {
     def toLogMessage(msg: T): String
@@ -53,12 +56,17 @@
   private val metricLogger = LoggerFactory.getLogger("metrics")
   private val queryLogger = LoggerFactory.getLogger("query")
   private val malformedLogger = LoggerFactory.getLogger("malformed")
+  private val syncLogger = LoggerFactory.getLogger("meta_sync")
 
   def metric[T: Loggable](msg: => T) = metricLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
 
+  def syncInfo[T: Loggable](msg: => T) = syncLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
+
   def info[T: Loggable](msg: => T) = logger.info(implicitly[Loggable[T]].toLogMessage(msg))
 
-  def debug[T: Loggable](msg: => T) = logger.debug(implicitly[Loggable[T]].toLogMessage(msg))
+  def warn[T: Loggable](msg: => T) = logger.warn(implicitly[Loggable[T]].toLogMessage(msg))
+
+  def debug[T: Loggable](msg: => T) = if (logger.isDebugEnabled) logger.debug(implicitly[Loggable[T]].toLogMessage(msg))
 
   def error[T: Loggable](msg: => T, exception: => Throwable) = errorLogger.error(implicitly[Loggable[T]].toLogMessage(msg), exception)
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index a98104c..cc9bd4b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -19,60 +19,123 @@
 
 package org.apache.s2graph.core.utils
 
+import java.io._
 import java.util.concurrent.atomic.AtomicBoolean
 
 import com.google.common.cache.CacheBuilder
-
+import com.google.common.hash.Hashing
 import scala.collection.JavaConversions._
 import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 object SafeUpdateCache {
 
   case class CacheKey(key: String)
 
-}
+  def serialise(value: AnyRef): Try[Array[Byte]] = {
+    import scala.pickling.Defaults._
+    import scala.pickling.binary._
 
-class SafeUpdateCache[T](prefix: String, maxSize: Int, ttl: Int)(implicit executionContext: ExecutionContext) {
+    val result = Try(value.pickle.value)
+    result.failed.foreach { e =>
+      logger.syncInfo(s"[Serialise failed]: ${value}, ${e}")
+    }
 
-  import SafeUpdateCache._
-
-  implicit class StringOps(key: String) {
-    def toCacheKey = new CacheKey(prefix + ":" + key)
+    result
   }
 
-  def toTs() = (System.currentTimeMillis() / 1000).toInt
+  def deserialise(bytes: Array[Byte]): Try[AnyRef] = {
+    import scala.pickling.Defaults._
+    import scala.pickling.binary._
 
-  private val cache = CacheBuilder.newBuilder().maximumSize(maxSize).build[CacheKey, (T, Int, AtomicBoolean)]()
+    Try(BinaryPickle(bytes).unpickle[AnyRef])
+  }
 
-  def put(key: String, value: T) = cache.put(key.toCacheKey, (value, toTs, new AtomicBoolean(false)))
+  def fromBytes(cache: SafeUpdateCache, bytes: Array[Byte]): Unit = {
+    import org.apache.hadoop.io.WritableUtils
 
-  def invalidate(key: String) = cache.invalidate(key.toCacheKey)
+    val bais = new ByteArrayInputStream(bytes)
+    val input = new DataInputStream(bais)
 
-  def withCache(key: String)(op: => T): T = {
-    val cacheKey = key.toCacheKey
+    try {
+      val size = WritableUtils.readVInt(input)
+      (1 to size).foreach { ith =>
+        val cacheKey = WritableUtils.readVLong(input)
+        val value = deserialise(WritableUtils.readCompressedByteArray(input))
+        value.foreach { dsv =>
+          cache.putInner(cacheKey, dsv)
+        }
+      }
+    } finally {
+      bais.close()
+      input.close()
+    }
+  }
+}
+
+class SafeUpdateCache(maxSize: Int,
+                      ttl: Int)
+                     (implicit executionContext: ExecutionContext) {
+
+  import java.lang.{Long => JLong}
+  import SafeUpdateCache._
+
+  private val cache = CacheBuilder.newBuilder().maximumSize(maxSize)
+    .build[JLong, (AnyRef, Int, AtomicBoolean)]()
+
+  private def toCacheKey(key: String): Long = {
+    Hashing.murmur3_128().hashBytes(key.getBytes("UTF-8")).asLong()
+  }
+
+  private def toTs() = (System.currentTimeMillis() / 1000).toInt
+
+  def put(key: String, value: AnyRef, broadcast: Boolean = false): Unit = {
+    val cacheKey = toCacheKey(key)
+    cache.put(cacheKey, (value, toTs, new AtomicBoolean(false)))
+  }
+
+  def putInner(cacheKey: Long, value: AnyRef): Unit = {
+    cache.put(cacheKey, (value, toTs, new AtomicBoolean(false)))
+  }
+
+  def invalidate(key: String, broadcast: Boolean = true) = {
+    val cacheKey = toCacheKey(key)
+    cache.invalidate(cacheKey)
+  }
+
+  def invalidateInner(cacheKey: Long): Unit = {
+    cache.invalidate(cacheKey)
+  }
+
+  def withCache[T <: AnyRef](key: String, broadcast: Boolean)(op: => T): T = {
+    val cacheKey = toCacheKey(key)
     val cachedValWithTs = cache.getIfPresent(cacheKey)
 
     if (cachedValWithTs == null) {
+      val value = op
       // fetch and update cache.
-      val newValue = op
-      cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false)))
-      newValue
+      put(key, value, broadcast)
+      value
     } else {
-      val (cachedVal, updatedAt, isUpdating) = cachedValWithTs
+      val (_cachedVal, updatedAt, isUpdating) = cachedValWithTs
+      val cachedVal = _cachedVal.asInstanceOf[T]
+
       if (toTs() < updatedAt + ttl) cachedVal // in cache TTL
       else {
         val running = isUpdating.getAndSet(true)
+
         if (running) cachedVal
         else {
-          Future(op)(executionContext) onComplete {
+          val value = op
+          Future(value)(executionContext) onComplete {
             case Failure(ex) =>
-              cache.put(cacheKey, (cachedVal, toTs(), new AtomicBoolean(false))) // keep old value
-              logger.error(s"withCache update failed: $cacheKey")
+              put(key, cachedVal, false)
+              logger.error(s"withCache update failed: $cacheKey", ex)
             case Success(newValue) =>
-              cache.put(cacheKey, (newValue, toTs(), new AtomicBoolean(false))) // update new value
+              put(key, newValue, broadcast = (broadcast && newValue != cachedVal))
               logger.info(s"withCache update success: $cacheKey")
           }
+
           cachedVal
         }
       }
@@ -81,10 +144,35 @@
 
   def invalidateAll() = cache.invalidateAll()
 
-  def getAllData() : List[(String, T)] = {
-    cache.asMap().map { case (key, value) =>
-      (key.key.substring(prefix.size + 1), value._1)
-    }.toList
+  def asMap() = cache.asMap()
+
+  def get(key: String) = cache.asMap().get(toCacheKey(key))
+
+  def toBytes(): Array[Byte] = {
+    import org.apache.hadoop.io.WritableUtils
+    val baos = new ByteArrayOutputStream()
+    val output = new DataOutputStream(baos)
+
+    try {
+      val m = cache.asMap()
+      val size = m.size()
+
+      WritableUtils.writeVInt(output, size)
+      for (key <- m.keys) {
+        val (value, _, _) = m.get(key)
+        WritableUtils.writeVLong(output, key)
+        serialise(value).foreach { sv =>
+          WritableUtils.writeCompressedByteArray(output, sv)
+        }
+      }
+      output.flush()
+      baos.toByteArray
+    } finally {
+      baos.close()
+      output.close()
+    }
   }
+
+  def shutdown() = {}
 }
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala
new file mode 100644
index 0000000..fb5dbe6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Sync.scala
@@ -0,0 +1,181 @@
+//package org.apache.s2graph.core.utils
+//
+//import java.util.Properties
+//
+//import io.reactivex._
+//import io.reactivex.schedulers.Schedulers
+//import org.apache.kafka.clients.consumer.KafkaConsumer
+//import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
+//import org.apache.s2graph.core.ExceptionHandler
+//
+//
+//trait Sync[VALUE] {
+//  type KV = (String, VALUE)
+//
+//  val EmptyValue: VALUE
+//  val SyncId: String
+//  val SkipSelfBroadcast: Boolean
+//
+//  val delim = "___" // ! do not change
+//
+//  // public
+//  val emitter = io.reactivex.processors.PublishProcessor.create[KV]()
+//
+//  def send(key: String): Unit = send(key, EmptyValue)
+//
+//  def send(key: String, value: VALUE): Unit = sendImpl(encodeKey(key), value)
+//
+//  def shutdown(): Unit = {
+//    emitter.onComplete()
+//  }
+//
+//  protected def emit(kv: KV): Unit = {
+//    val (encodedKey, value) = kv
+//    val (sid, key) = decodeKey(encodedKey)
+//
+//    if (SkipSelfBroadcast && sid == SyncId) {
+//      logger.syncInfo(s"[Message ignore] sent by self: ${SyncId}")
+//      // pass
+//    } else {
+//      emitter.onNext(key -> value)
+//      logger.syncInfo(s"[Message emit success]: selfId: ${SyncId}, from: ${sid}")
+//    }
+//  }
+//
+//  protected def sendImpl(key: String, value: VALUE): Unit
+//
+//  private def encodeKey(key: String): String = s"${SyncId}${delim}${key}"
+//
+//  private def decodeKey(key: String): (String, String) = {
+//    val Array(cid, decodedKey) = key.split(delim)
+//    cid -> decodedKey
+//  }
+//}
+//
+//class MemorySync(syncId: String) extends Sync[Array[Byte]] {
+//
+//  override val EmptyValue: Array[Byte] = Array.empty[Byte]
+//
+//  override val SyncId: String = syncId
+//
+//  override val SkipSelfBroadcast: Boolean = false
+//
+//  override protected def sendImpl(key: String, value: Array[Byte]): Unit = emit(key -> value)
+//}
+//
+//object KafkaSync {
+//
+//  import java.util.Properties
+//
+//  val keySerializer = "org.apache.kafka.common.serialization.StringSerializer"
+//  val valueSerializer = "org.apache.kafka.common.serialization.ByteArraySerializer"
+//  val keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"
+//  val valueDeserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+//
+//  val maxMessageSize = "15728640" // 15 MB
+//
+//  def producerConfig(brokers: String): Properties = {
+//    val props = ExceptionHandler.toKafkaProp(brokers)
+//
+//    props.setProperty("key.serializer", keySerializer)
+//    props.setProperty("value.serializer", valueSerializer)
+//    props.setProperty("message.max.bytes", maxMessageSize)
+//    props.setProperty("max.request.size", maxMessageSize)
+//
+//
+//    props
+//  }
+//
+//  def consumerConfig(brokers: String, groupId: String): Properties = {
+//    val props = new Properties()
+//
+//    props.put("bootstrap.servers", brokers)
+//    props.put("group.id", groupId)
+//    props.put("enable.auto.commit", "false")
+//    props.put("key.deserializer", keyDeserializer)
+//    props.put("value.deserializer", valueDeserializer)
+//    props.put("max.partition.fetch.bytes", maxMessageSize)
+//    props.put("fetch.message.max.bytes", maxMessageSize)
+//
+//    props
+//  }
+//}
+//
+//class KafkaSync(topic: String,
+//                syncId: String,
+//                producerConfig: Properties,
+//                consumerConfig: Properties,
+//                skipSelfBroadcast: Boolean = true,
+//                seekTo: String = "end"
+//               ) extends Sync[Array[Byte]] {
+//
+//  type VALUE = Array[Byte]
+//
+//  val consumerTimeout: Int = 1000
+//
+//  override val EmptyValue = Array.empty[Byte]
+//
+//  override val SyncId = syncId
+//
+//  override val SkipSelfBroadcast: Boolean = skipSelfBroadcast
+//
+//  lazy val producer = new KafkaProducer[String, VALUE](producerConfig)
+//
+//  lazy val consumer = {
+//    import scala.collection.JavaConverters._
+//
+//    val kc = new KafkaConsumer[String, VALUE](consumerConfig)
+//    kc.subscribe(Seq(topic).asJava)
+//    kc.poll(consumerTimeout * 10) // Just for meta info sync
+//
+//    if (seekTo == "end") {
+//      kc.seekToEnd(kc.assignment())
+//    } else if (seekTo == "beginning") {
+//      kc.seekToBeginning(kc.assignment())
+//    } else {
+//      // pass
+//    }
+//
+//    kc
+//  }
+//
+//  // Emit event from kafka consumer: Flowable is just for while loop, not for Observabl
+//  Flowable.create(new FlowableOnSubscribe[KV] {
+//    import scala.collection.JavaConverters._
+//
+//    override def subscribe(e: FlowableEmitter[KV]) = {
+//      while (true) {
+//        val ls = consumer.poll(consumerTimeout).asScala.map(record => record.key() -> record.value())
+//        ls.foreach(emit)
+//
+//        if (ls.nonEmpty) {
+//          consumer.commitSync()
+//          logger.syncInfo(s"[Kafka consume success]: message size: ${ls.size}]")
+//        }
+//      }
+//
+//      e.onComplete()
+//    }
+//  }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.single()).subscribe()
+//
+//  override protected def sendImpl(key: String, value: VALUE): Unit = {
+//    val record = new ProducerRecord[String, VALUE](topic, key, value)
+//
+//    producer.send(record, new Callback {
+//      override def onCompletion(metadata: RecordMetadata, exception: Exception) = {
+//        if (exception != null) {
+//          logger.syncInfo(s"[Kafka produce failed]: from: key: ${key} e: ${exception}")
+//        } else {
+//          logger.syncInfo(s"[Kafka produce success]: from: ${SyncId} ${metadata}")
+//        }
+//      }
+//    })
+//  }
+//
+//  override def shutdown(): Unit = {
+//    super.shutdown()
+//
+//    producer.close()
+//    consumer.close()
+//  }
+//}
diff --git a/s2core/src/test/resources/reference.conf b/s2core/src/test/resources/reference.conf
new file mode 100644
index 0000000..d7fe546
--- /dev/null
+++ b/s2core/src/test/resources/reference.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# APP PHASE
+phase = test
+
+host = localhost
+
+# Hbase
+hbase.table.compression.algorithm="gz"
+hbase.zookeeper.quorum=${host}
+
+# Asynchbase
+hbase.client.retries.number=100
+hbase.rpcs.buffered_flush_interval=100
+hbase.rpc.timeout=0
+
+# local retry number
+max.retry.number=100
+max.back.off=50
+
+# Future cache.
+future.cache.max.size=100000
+future.cache.expire.after.write=10000
+future.cache.expire.after.access=5000
+future.cache.metric.interval=60000
+
+# Local Cache
+cache.ttl.seconds=60
+cache.max.size=100000
+
+# DB
+s2graph.models.table.name = "models-dev"
+
+db.default.driver = "org.h2.Driver"
+db.default.url = "jdbc:h2:file:./var/metastore;MODE=MYSQL"
+db.default.user = "sa"
+db.default.password = "sa"
+
+
+akka {
+  loggers = ["akka.event.slf4j.Slf4jLogger"]
+  loglevel = "DEBUG"
+}
+
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
index 4ed7905..1baa89d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/CrudTest.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core.Integrate
 
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.utils.logger
 import play.api.libs.json.{JsObject, Json}
 
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
index c7d9f59..0177543 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/IntegrateCommon.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.Integrate
 
 import com.typesafe.config._
-import org.apache.s2graph.core.mysqls.{GlobalIndex, Label, Model}
+import org.apache.s2graph.core.schema.{GlobalIndex, Label, Schema}
 import org.apache.s2graph.core.rest.{RequestParser, RestHandler}
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.core._
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala
index b2c1113..c2ca367 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/ManagementTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.Integrate.IntegrateCommon
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
 import play.api.libs.json.Json
 
 class ManagementTest extends IntegrateCommon {
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
index 2b439bc..05096e1 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{ServiceColumn, LabelMeta}
+import org.apache.s2graph.core.schema.{ServiceColumn, LabelMeta}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
 import org.apache.s2graph.core.utils.logger
 import org.scalatest.FunSuite
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
index 3f6cd2a..ea43a76 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommon.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import org.apache.s2graph.core.schema.{LabelIndex, LabelMeta}
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLikeWithTs, LabelWithDirection}
 
 
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 6ac77e4..488b294 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -21,7 +21,7 @@
 
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{Label, LabelIndex, Service, ServiceColumn}
 import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection}
 import scalikejdbc.AutoSession
 
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
index 03ea50a..87e09bc 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
@@ -21,7 +21,7 @@
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.ServiceColumn
+import org.apache.s2graph.core.schema.ServiceColumn
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, SourceVertexId}
 
 import scala.collection.mutable
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
index a5349dc..4a87cb2 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
@@ -23,7 +23,7 @@
 import org.apache.s2graph.core._
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
 import org.apache.tinkerpop.gremlin.process.traversal.{Order, P}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs}
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper
 import org.apache.tinkerpop.gremlin.structure.T
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/io/ConversionTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/io/ConversionTest.scala
index bc53e2f..d44bbe7 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/io/ConversionTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/io/ConversionTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.io
 
 //import org.apache.s2graph.core.{EdgeId, S2VertexPropertyId}
-import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{ColumnMeta, Service, ServiceColumn}
 import org.apache.s2graph.core.types.{InnerVal, VertexId}
 import org.apache.s2graph.core.utils.logger
 import org.scalatest.{FunSuite, Matchers}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
index 0920a89..342d9c6 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.parsers
 
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelMeta}
+import org.apache.s2graph.core.schema.{ServiceColumn, Label, LabelMeta}
 import org.apache.s2graph.core.rest.TemplateHelper
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
similarity index 69%
rename from s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala
rename to s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
index 87a84ae..1843a57 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/models/ModelTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/schema/SchemaTest.scala
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-package org.apache.s2graph.core.models
+package org.apache.s2graph.core.schema
 
 import org.apache.s2graph.core.TestCommonWithModels
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.utils.SafeUpdateCache
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
-class ModelTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll {
+class SchemaTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll {
   override def beforeAll(): Unit = {
     initTests()
   }
@@ -56,4 +56,22 @@
     val tgtColumn = labelOpt.get.tgtService
     println(tgtColumn)
   }
+
+  test("serialize/deserialize Schema.") {
+    import scala.collection.JavaConverters._
+    val originalMap = Schema.safeUpdateCache.asMap().asScala
+    val newCache = new SafeUpdateCache(Schema.maxSize, Schema.ttl)(scala.concurrent.ExecutionContext.Implicits.global)
+    Schema.fromBytes(newCache, Schema.toBytes())
+    val newMap = newCache.asMap().asScala
+
+    originalMap.size shouldBe newMap.size
+    originalMap.keySet shouldBe newMap.keySet
+
+    originalMap.keySet.foreach { key =>
+      val (originalVal, _, _) = originalMap(key)
+      val (newVal, _, _) = newMap(key)
+
+      originalVal shouldBe newVal
+    }
+  }
 }
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala
index f1331e3..05dfbf3 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/StorageIOTest.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core.storage
 
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorageSerDe
 import org.apache.s2graph.core.storage.serde.{StorageDeserializable, StorageSerializable}
 import org.apache.s2graph.core.{S2Vertex, S2VertexLike, TestCommonWithModels}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
index 4f38e18..23ad143 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core.storage.hbase
 
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.TestCommonWithModels
 import org.scalatest.{FunSuite, Matchers}
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
index 4aecb33..b871dd6 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/rocks/RocksStorageTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.rocks
 
 import org.apache.s2graph.core.TestCommonWithModels
-import org.apache.s2graph.core.mysqls.{Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{Service, ServiceColumn}
 import org.apache.tinkerpop.gremlin.structure.T
 import org.scalatest.{FunSuite, Matchers}
 
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
index f52f86a..0467f7d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
@@ -24,7 +24,7 @@
 import org.apache.s2graph.core.Management.JsonModel.Prop
 import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, VertexId}
 import org.apache.s2graph.core.utils.logger
 import org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
index 45060cc..04268d6 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
@@ -24,7 +24,7 @@
 import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.Management.JsonModel.Prop
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{GlobalIndex, Service, ServiceColumn}
+import org.apache.s2graph.core.schema.{GlobalIndex, Service, ServiceColumn}
 import org.apache.s2graph.core.tinkerpop.S2GraphProvider
 import org.apache.s2graph.core.utils.logger
 import org.apache.tinkerpop.gremlin.process.traversal.{Compare, P, Scope}
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
index caddf8d..2f599e0 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala
@@ -23,7 +23,7 @@
 import akka.stream.ActorMaterializer
 import com.typesafe.config.Config
 import org.apache.http.HttpStatus
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
 import org.apache.s2graph.counter.config.S2CounterConfig
 import org.apache.s2graph.counter.core
 import org.apache.s2graph.counter.core.ExactCounter.ExactValueMap
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
index b9ca9a4..1439118 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala
@@ -24,7 +24,7 @@
 import com.typesafe.config.Config
 import org.apache.commons.httpclient.HttpStatus
 import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
 import org.apache.s2graph.counter.config.S2CounterConfig
 import org.apache.s2graph.counter.core.RankingCounter.RankingValueMap
 import org.apache.s2graph.counter.core.v2.ExactStorageGraph._
diff --git a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
index 17ecc87..d61f70b 100644
--- a/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
+++ b/s2counter_core/src/main/scala/org/apache/s2graph/counter/helper/CounterAdmin.scala
@@ -22,7 +22,7 @@
 import com.typesafe.config.Config
 import org.apache
 import org.apache.s2graph.core.S2Graph
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
 import org.apache.s2graph.counter
 import org.apache.s2graph.counter.config.S2CounterConfig
 import org.apache.s2graph.counter.core.{RankingCounter, ExactCounter}
diff --git a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
index 2ab4823..aba035a 100644
--- a/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
+++ b/s2counter_core/src/test/scala/org/apache/s2graph/counter/core/RankingCounterSpec.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.counter.core
 
 import com.typesafe.config.ConfigFactory
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
 import org.apache.s2graph.core.{Management, S2Graph, S2Graph$}
 import org.apache.s2graph.counter.config.S2CounterConfig
 import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
diff --git a/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSchemaSpec.scala
similarity index 97%
rename from s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala
rename to s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSchemaSpec.scala
index be79e23..2de7248 100644
--- a/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterModelSpec.scala
+++ b/s2counter_core/src/test/scala/org/apache/s2graph/counter/models/CounterSchemaSpec.scala
@@ -22,7 +22,7 @@
 import com.typesafe.config.ConfigFactory
 import org.specs2.mutable.Specification
 
-class CounterModelSpec extends Specification {
+class CounterSchemaSpec extends Specification {
   val config = ConfigFactory.load()
 
   DBModel.initialize(config)
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
index f941224..4d9f5f5 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala
@@ -22,7 +22,7 @@
 import akka.actor.ActorSystem
 import akka.stream.ActorMaterializer
 import org.apache.commons.httpclient.HttpStatus
-import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import org.apache.s2graph.core.schema.{Bucket, Experiment, Service}
 import org.apache.s2graph.counter.loader.config.StreamingConfig
 import org.apache.s2graph.counter.models.Counter
 import org.apache.s2graph.counter.util.{RetryAsync, CollectionCache, CollectionCacheConfig}
diff --git a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
index 35575fb..8d58692 100644
--- a/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
+++ b/s2counter_loader/src/test/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctionsSpec.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.counter.loader.core
 
 import com.typesafe.config.ConfigFactory
-import org.apache.s2graph.core.mysqls.{Label, Service}
+import org.apache.s2graph.core.schema.{Label, Service}
 import org.apache.s2graph.core.types.HBaseType
 import org.apache.s2graph.core.{S2Graph, Management}
 import org.apache.s2graph.counter.models.DBModel
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index c2597d6..ddb21b2 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -44,6 +44,7 @@
 
 object GraphQLServer {
 
+  val className = Schema.getClass.getName
   val logger = LoggerFactory.getLogger(this.getClass)
 
   // Init s2graph
@@ -56,7 +57,7 @@
   val s2graph = new S2Graph(config)
   val schemaCacheTTL = Try(config.getInt("schemaCacheTTL")).getOrElse(-1)
   val s2Repository = new GraphRepository(s2graph)
-  val schemaCache = new SafeUpdateCache[Schema[GraphRepository, Any]]("schema", maxSize = 1, ttl = schemaCacheTTL)
+  val schemaCache = new SafeUpdateCache(1, ttl = schemaCacheTTL)
 
   def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = {
 
@@ -97,7 +98,8 @@
   }
 
   private def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
-    val s2schema = schemaCache.withCache("s2Schema")(createNewSchema())
+    val cacheKey = className + "s2Schema"
+    val s2schema = schemaCache.withCache(cacheKey, broadcast = false)(createNewSchema())
     import GraphRepository._
     val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)
 
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/bind/Unmarshaller.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/bind/Unmarshaller.scala
index 807fe48..61c58e8 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/bind/Unmarshaller.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/bind/Unmarshaller.scala
@@ -20,8 +20,8 @@
 package org.apache.s2graph.graphql.bind
 
 import org.apache.s2graph.core.Management.JsonModel._
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
-import org.apache.s2graph.core.{QueryParam, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.{S2EdgeLike, S2VertexLike}
 import org.apache.s2graph.graphql.repository.GraphRepository
 import org.apache.s2graph.graphql.types.S2Type._
 import sangria.marshalling._
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
index 3bfa556..90037cf 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
@@ -21,7 +21,7 @@
 
 import org.apache.s2graph.core.Management.JsonModel._
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.rest.RequestParser
 import org.apache.s2graph.core.storage.MutateResponse
 import org.apache.s2graph.core.types._
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
index cb6238f..478517f 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
@@ -20,7 +20,8 @@
 package org.apache.s2graph.graphql.types
 
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.index.IndexProvider
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.graphql.bind.AstHelper
 import org.apache.s2graph.graphql.repository.GraphRepository
 import org.apache.s2graph.graphql.types.S2Type.EdgeQueryParam
@@ -70,7 +71,7 @@
   }
 
   def serviceColumnOnService(column: ServiceColumn, c: Context[GraphRepository, Any]): VertexQueryParam = {
-    val prefix = s"${GlobalIndex.serviceField}:${column.service.serviceName} AND ${GlobalIndex.serviceColumnField}:${column.columnName}"
+    val prefix = s"${IndexProvider.serviceField}:${column.service.serviceName} AND ${IndexProvider.serviceColumnField}:${column.columnName}"
 
     val ids = c.argOpt[Any]("id").toSeq ++ c.argOpt[List[Any]]("ids").toList.flatten
     val offset = c.arg[Int]("offset")
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
index 0312abf..1475280 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.graphql.types
 
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.graphql.repository.GraphRepository
 import sangria.schema._
 
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
index c10d85e..49e6b82 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
@@ -21,7 +21,7 @@
 
 import org.apache.s2graph.core.Management.JsonModel._
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.graphql
 import org.apache.s2graph.graphql.repository.GraphRepository
 import sangria.schema._
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/StaticType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/StaticType.scala
index aca832c..83168f4 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/StaticType.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/StaticType.scala
@@ -21,7 +21,7 @@
 
 import org.apache.s2graph.core.Management.JsonModel._
 import org.apache.s2graph.core.{JSONParser, S2EdgeLike, S2VertexLike}
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.storage.MutateResponse
 import org.apache.s2graph.graphql.repository.GraphRepository
 import sangria.macros.derive._
diff --git a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
index acd6f66..72412f5 100644
--- a/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
+++ b/s2graphql/src/test/scala/org/apache/s2graph/graphql/TestGraph.scala
@@ -22,7 +22,7 @@
 import com.typesafe.config.Config
 import org.apache.s2graph
 import org.apache.s2graph.core.Management.JsonModel.Prop
-import org.apache.s2graph.core.mysqls.{Label, Model, Service}
+import org.apache.s2graph.core.schema.{Label, Schema, Service}
 import org.apache.s2graph.core.rest.RequestParser
 import org.apache.s2graph.core.{Management, S2Graph}
 import org.apache.s2graph.graphql
@@ -77,7 +77,7 @@
 }
 
 class EmptyGraph(config: Config) extends TestGraph {
-  Model.apply(config)
+  Schema.apply(config)
 
   lazy val graph = new S2Graph(config)(scala.concurrent.ExecutionContext.Implicits.global)
   lazy val management = new Management(graph)
@@ -92,7 +92,7 @@
   override def repository: GraphRepository = s2Repository
 
   override def open(): Unit = {
-    Model.shutdown(true)
+    Schema.shutdown(true)
   }
 
 }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
index 561c676..27747f6 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/DegreeKey.scala
@@ -22,7 +22,7 @@
 import org.apache.s2graph.core._
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.hadoop.hbase.{KeyValue => HKeyValue}
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.types.{InnerValLikeWithTs, SourceVertexId}
 import play.api.libs.json.Json
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
index b0b4aed..975714a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/S2GraphHelper.scala
@@ -21,6 +21,7 @@
 
 import com.typesafe.config.Config
 import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.storage.SKeyValue
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.StructType
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 3fa8cea..e8a3e86 100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@ -23,9 +23,9 @@
 
 import com.holdenkarau.spark.testing.DataFrameSuiteBase
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.mysqls.{Label, ServiceColumn}
-import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
 import org.apache.s2graph.core.{Management, S2Graph}
+import org.apache.s2graph.core.types.HBaseType
 import org.apache.s2graph.s2jobs.loader.GraphFileOptions
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
diff --git a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
index 126f193..6615cec 100644
--- a/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
+++ b/s2rest_netty/src/main/scala/org/apache/s2graph/rest/netty/Server.scala
@@ -34,7 +34,7 @@
 import io.netty.handler.logging.{LogLevel, LoggingHandler}
 import io.netty.util.CharsetUtil
 import org.apache.s2graph.core.GraphExceptions.{BadQueryException}
-import org.apache.s2graph.core.mysqls.Experiment
+import org.apache.s2graph.core.schema.Experiment
 import org.apache.s2graph.core.rest.RestHandler
 import org.apache.s2graph.core.rest.RestHandler.{CanLookup, HandlerResult}
 import org.apache.s2graph.core.utils.Extensions._
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
index 23bda0a..00702cf 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.rest.play.controllers
 
 import org.apache.s2graph.core.Management
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.rest.RequestParser
 import org.apache.s2graph.core.utils.logger
 import play.api.libs.functional.syntax._
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
index b3ac89d..c751250 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala
@@ -22,7 +22,7 @@
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.s2graph.core.ExceptionHandler
 import org.apache.s2graph.core.ExceptionHandler.KafkaMessage
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
 import org.apache.s2graph.counter
 import org.apache.s2graph.counter.config.S2CounterConfig
 import org.apache.s2graph.counter.core.TimedQualifier.IntervalUnit
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 69878f8..310b098 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -22,7 +22,7 @@
 import com.fasterxml.jackson.databind.JsonMappingException
 import org.apache.s2graph.core.ExceptionHandler.KafkaMessage
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.schema.Label
 import org.apache.s2graph.core.rest.RequestParser
 import org.apache.s2graph.core.utils.logger
 import org.apache.s2graph.core.storage.{IncrementResponse, MutateResponse}