Merge branch 'master' into S2GRAPH-226
diff --git a/.gitignore b/.gitignore
index 0066e18..442e6a7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,6 +17,9 @@
 src_managed/
 project/boot/
 project/plugins/project/
+doc/build/*
+doc/ENV/*
+doc/s2graph_doc/*
 
 # Scala-IDE specific
 .scala_dependencies
diff --git a/.travis.yml b/.travis.yml
index 448bc64..bc5e6cb 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -32,9 +32,3 @@
   - oraclejdk8
 
 sbt_args: -J-Xmx4G -J-Xms1G -J-Xss1M
-
-before_install:
-  - .travis/install-hbase.sh
-
-before_script:
-  - $HOME/hbase-$HBASE_VERSION/bin/start-hbase.sh
diff --git a/.travis/install-hbase.sh b/.travis/install-hbase.sh
index f55437c..5744fd8 100755
--- a/.travis/install-hbase.sh
+++ b/.travis/install-hbase.sh
@@ -14,8 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-set -xe
-
-if [ ! -d "$HOME/hbase-$HBASE_VERSION/bin" ]; then
-  cd $HOME && wget -q -O - http://mirror.navercorp.com/apache/hbase/stable/hbase-$HBASE_VERSION-bin.tar.gz | tar xz
-fi
+#set -xe
+#
+#if [ ! -d "$HOME/hbase-$HBASE_VERSION/bin" ]; then
+#  cd $HOME && wget -q -O - http://mirror.apache-kr.org/hbase/$HBASE_VERSION/hbase-$HBASE_VERSION-bin.tar.gz | tar xz
+#fi
diff --git a/CHANGES b/CHANGES
index 734f498..bdc3da4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -38,6 +38,7 @@
     * [S2GRAPH-205] - too many initialize S2Graph when writeBatchMutate on S2GraphSink
     * [S2GRAPH-201] - Provide S2GraphSource
     * [S2GRAPH-218] - add operations not supported on sql
+    * [S2GRAPH-225] - support custom udf class
 
 ** Bug
     * [S2GRAPH-159] - Wrong syntax at a bash script under Linux
@@ -58,6 +59,7 @@
     * [S2GRAPH-196] - Apply Query Parameter to Label Fetch in S2GraphQL
     * [S2GRAPH-220] - Filter clause is not working on AnnoyModelFetcher
     * [S2GRAPH-228] - GraphQL empty variable fields on post request cause spray.json.DeserializationException
+    * [S2GRAPH-243] - Limit bug on 'graph/getEdges' 
 
 ** Improvement
     * [S2GRAPH-72] - Support Apache TinkerPop and Gremlin
@@ -85,6 +87,8 @@
     * [S2GRAPH-221] - Unify configurations for bulk and mutate in S2GraphSink.
     * [S2GRAPH-230] - ResourceManager onEvict cause segmentation fault with AnnoyModelFetcher 
     * [S2GRAPH-231] - Change the GraphQL type name to a valid string.
+    * [S2GRAPH-232] - Elimination of inefficiency due to duplication in GraphQL schema generation.
+    * [S2GRAPH-235] - Fix typo errors in S2GraphQL MD.
 
 ** New Feature
     * [S2GRAPH-123] - Support different index on out/in direction.
@@ -107,6 +111,8 @@
     * [S2GRAPH-211] - Include 's2jobs' test in CI
     * [S2GRAPH-212] - Fix broken markdown on README.md.
     * [S2GRAPH-229] - 'Step' abstraction for combinable queries
+    * [S2GRAPH-245] - Remove install hbase step on travis CI.
+    * [S2GRAPH-246] - Integration of documents into the S2Graph project.
 
 Release 0.1.0 - Released
 
diff --git a/doc/Makefile b/doc/Makefile
new file mode 100644
index 0000000..69fe55e
--- /dev/null
+++ b/doc/Makefile
@@ -0,0 +1,19 @@
+# Minimal makefile for Sphinx documentation
+#
+
+# You can set these variables from the command line.
+SPHINXOPTS    =
+SPHINXBUILD   = sphinx-build
+SOURCEDIR     = source
+BUILDDIR      = build
+
+# Put it first so that "make" without argument is like "make help".
+help:
+	@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
+
+.PHONY: help Makefile
+
+# Catch-all target: route all unknown targets to Sphinx using the new
+# "make mode" option.  $(O) is meant as a shortcut for $(SPHINXOPTS).
+%: Makefile
+	@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
\ No newline at end of file
diff --git a/doc/make.bat b/doc/make.bat
new file mode 100644
index 0000000..543c6b1
--- /dev/null
+++ b/doc/make.bat
@@ -0,0 +1,35 @@
+@ECHO OFF

+

+pushd %~dp0

+

+REM Command file for Sphinx documentation

+

+if "%SPHINXBUILD%" == "" (

+	set SPHINXBUILD=sphinx-build

+)

+set SOURCEDIR=source

+set BUILDDIR=build

+

+if "%1" == "" goto help

+

+%SPHINXBUILD% >NUL 2>NUL

+if errorlevel 9009 (

+	echo.

+	echo.The 'sphinx-build' command was not found. Make sure you have Sphinx

+	echo.installed, then set the SPHINXBUILD environment variable to point

+	echo.to the full path of the 'sphinx-build' executable. Alternatively you

+	echo.may add the Sphinx directory to PATH.

+	echo.

+	echo.If you don't have Sphinx installed, grab it from

+	echo.http://sphinx-doc.org/

+	exit /b 1

+)

+

+%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%

+goto end

+

+:help

+%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%

+

+:end

+popd

diff --git a/doc/readme.md b/doc/readme.md
new file mode 100644
index 0000000..75a9891
--- /dev/null
+++ b/doc/readme.md
@@ -0,0 +1,56 @@
+# S2Graph Documentation
+
+## Dependencies
+  [Python](https://www.python.org/)
+  
+  [Sphinx](http://www.sphinx-doc.org/en/master/)
+
+  [Read the Docs Sphinx Theme](https://sphinx-rtd-theme.readthedocs.io/en/latest/index.html)
+
+I used [`pip`](https://pip.pypa.io/en/stable/installing/) to install Python module.
+I used [`virtualenv`](https://virtualenv.pypa.io/en/latest/) to isolate the Python environment.
+
+> Depending on your environment, the tools(pip, virtualenv) may not be required
+
+## Quickstart
+
+All work is done under the `s2graph/doc` folder.
+
+```
+cd doc
+```
+
+### Creating a virtualenv environment for documnet build
+
+If `pip` is not installed, you need to install it first by referring to the link: https://pip.pypa.io/en/stable/installing/
+
+```
+pip install virtualenv
+
+virtualenv -p python s2graph_doc
+source s2graph_doc/bin/activate
+```
+
+### install sphinx and theme
+```
+pip install Sphinx
+pip install sphinx_rtd_theme 
+```
+
+### Building
+```
+make html
+```
+
+### Viewing
+```
+# python 2
+pushd build/html && python -m SimpleHTTPServer 3000 
+
+# python 3
+pushd build/html && python -m http.server 3000 
+```
+
+### Screenshot
+
+<img src="https://user-images.githubusercontent.com/1182522/48395569-04995d00-e75b-11e8-87b8-2f28662ef3ca.png">
diff --git a/doc/source/conf.py b/doc/source/conf.py
new file mode 100644
index 0000000..afa4c10
--- /dev/null
+++ b/doc/source/conf.py
@@ -0,0 +1,186 @@
+# -*- coding: utf-8 -*-
+#
+# Configuration file for the Sphinx documentation builder.
+#
+# This file does only contain a selection of the most common options. For a
+# full list see the documentation:
+# http://www.sphinx-doc.org/en/master/config
+
+# -- Path setup --------------------------------------------------------------
+
+# If extensions (or modules to document with autodoc) are in another directory,
+# add these directories to sys.path here. If the directory is relative to the
+# documentation root, use os.path.abspath to make it absolute, like shown here.
+#
+# import os
+# import sys
+# sys.path.insert(0, os.path.abspath('.'))
+
+
+# -- Project information -----------------------------------------------------
+
+project = 'S2Graph'
+copyright = '2018, s2graph'
+author = 's2graph'
+
+# The short X.Y version
+version = ''
+# The full version, including alpha/beta/rc tags
+release = '0.0.2'
+
+
+# -- General configuration ---------------------------------------------------
+
+# If your documentation needs a minimal Sphinx version, state it here.
+#
+# needs_sphinx = '1.0'
+
+# Add any Sphinx extension module names here, as strings. They can be
+# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
+# ones.
+extensions = [
+    'sphinx.ext.autodoc',
+    'sphinx.ext.intersphinx',
+    'sphinx.ext.ifconfig',
+    'sphinx.ext.viewcode',
+    'sphinx.ext.githubpages',
+]
+
+# Add any paths that contain templates here, relative to this directory.
+templates_path = ['_templates']
+
+# The suffix(es) of source filenames.
+# You can specify multiple suffix as a list of string:
+#
+# source_suffix = ['.rst', '.md']
+source_suffix = '.rst'
+
+# The master toctree document.
+master_doc = 'index'
+
+# The language for content autogenerated by Sphinx. Refer to documentation
+# for a list of supported languages.
+#
+# This is also used if you do content translation via gettext catalogs.
+# Usually you set "language" from the command line for these cases.
+language = None
+
+# List of patterns, relative to source directory, that match files and
+# directories to ignore when looking for source files.
+# This pattern also affects html_static_path and html_extra_path.
+exclude_patterns = []
+
+# The name of the Pygments (syntax highlighting) style to use.
+pygments_style = None
+
+
+# -- Options for HTML output -------------------------------------------------
+
+# The theme to use for HTML and HTML Help pages.  See the documentation for
+# a list of builtin themes.
+#
+html_theme = 'sphinx_rtd_theme'
+
+# Theme options are theme-specific and customize the look and feel of a theme
+# further.  For a list of options available for each theme, see the
+# documentation.
+#
+# html_theme_options = {}
+
+# Add any paths that contain custom static files (such as style sheets) here,
+# relative to this directory. They are copied after the builtin static files,
+# so a file named "default.css" will overwrite the builtin "default.css".
+html_static_path = ['_static']
+
+# Custom sidebar templates, must be a dictionary that maps document names
+# to template names.
+#
+# The default sidebars (for documents that don't match any pattern) are
+# defined by theme itself.  Builtin themes are using these templates by
+# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
+# 'searchbox.html']``.
+#
+# html_sidebars = {}
+
+
+# -- Options for HTMLHelp output ---------------------------------------------
+
+# Output file base name for HTML help builder.
+htmlhelp_basename = 'S2Graphdoc'
+
+
+# -- Options for LaTeX output ------------------------------------------------
+
+latex_elements = {
+    # The paper size ('letterpaper' or 'a4paper').
+    #
+    # 'papersize': 'letterpaper',
+
+    # The font size ('10pt', '11pt' or '12pt').
+    #
+    # 'pointsize': '10pt',
+
+    # Additional stuff for the LaTeX preamble.
+    #
+    # 'preamble': '',
+
+    # Latex figure (float) alignment
+    #
+    # 'figure_align': 'htbp',
+}
+
+# Grouping the document tree into LaTeX files. List of tuples
+# (source start file, target name, title,
+#  author, documentclass [howto, manual, or own class]).
+latex_documents = [
+    (master_doc, 'S2Graph.tex', 'S2Graph Documentation',
+     's2graph', 'manual'),
+]
+
+
+# -- Options for manual page output ------------------------------------------
+
+# One entry per manual page. List of tuples
+# (source start file, name, description, authors, manual section).
+man_pages = [
+    (master_doc, 's2graph', 'S2Graph Documentation',
+     [author], 1)
+]
+
+
+# -- Options for Texinfo output ----------------------------------------------
+
+# Grouping the document tree into Texinfo files. List of tuples
+# (source start file, target name, title, author,
+#  dir menu entry, description, category)
+texinfo_documents = [
+    (master_doc, 'S2Graph', 'S2Graph Documentation',
+     author, 'S2Graph', 'One line description of project.',
+     'Miscellaneous'),
+]
+
+
+# -- Options for Epub output -------------------------------------------------
+
+# Bibliographic Dublin Core info.
+epub_title = project
+
+# The unique identifier of the text. This can be a ISBN number
+# or the project homepage.
+#
+# epub_identifier = ''
+
+# A unique identification for the text.
+#
+# epub_uid = ''
+
+# A list of files that should not be packed into the epub file.
+epub_exclude_files = ['search.html']
+
+
+# -- Extension configuration -------------------------------------------------
+
+# -- Options for intersphinx extension ---------------------------------------
+
+# Example configuration for intersphinx: refer to the Python standard library.
+intersphinx_mapping = {'https://docs.python.org/': None}
diff --git a/doc/source/getting_started/index.rst b/doc/source/getting_started/index.rst
new file mode 100644
index 0000000..5f54cfc
--- /dev/null
+++ b/doc/source/getting_started/index.rst
@@ -0,0 +1,43 @@
+Getting Started
+===============
+
+********************
+The Directory Layout
+********************
+
+Once extracted the downloaded binary release of S2Graph or built from the source as described below, the following files and directories should be found in the directory.
+
+.. code::
+
+  DISCLAIMER
+  LICENCE               # the Apache License 2.0
+  NOTICE
+  bin                   # scripts to manage the lifecycle of S2Graph
+  conf                  # configuration files
+  lib                   # contains the binary
+  logs                  # application logs
+  var                   # application data
+
+*****************
+Launching S2Graph
+*****************
+
+The following will launch S2Graph, using HBase in the standalone mode for data storage and H2 as the metadata storage.
+
+.. code::
+
+  sh bin/start-s2graph.sh
+
+To connect to a remote HBase cluster or use MySQL as the metastore, refer to the instructions in ``conf/application.conf``
+
+************************
+Building from the Source
+************************
+
+We use SBT to build the project, which can be installed using Homebrew on MacOS (brew install sbt). For other operating systems, refer to the SBT Tutorial. Once SBT is installed, running the following command on the source root will build the project from the source:
+
+.. code:: bash
+
+  sbt package
+
+Depending on the internet connection, the initial run might take a while downloading the required dependencies. Once the build is complete, the same directory layout as in the top of this document can be found at ``target/apache-s2graph-$version-bin``, where ``$version`` is the current version of the project, e.g. ``0.1.0-incubating``
diff --git a/doc/source/getting_started/your_first_graph.rst b/doc/source/getting_started/your_first_graph.rst
new file mode 100644
index 0000000..72d6e08
--- /dev/null
+++ b/doc/source/getting_started/your_first_graph.rst
@@ -0,0 +1,178 @@
+Your First S2Graph
+==================
+
+Once the S2Graph server has been set up, you can now start to send HTTP queries to the server to create a graph and pour data in it. This tutorial goes over a simple toy problem to get a sense of how S2Graph's API looks like. For the exact definitions of the terminology used here, refer to The Data Model document.
+The toy problem is to create a timeline feature for a simple social media, like a simplified version of Facebook's timeline. Using simple S2Graph queries it is possible to keep track of each user's friends and their posts.
+
+First, we need a name for the new service.
+---------------------------------------------
+
+The following POST query will create a service named ``KakaoFavorites``
+
+.. code:: bash
+
+  curl -XPOST localhost:9000/graphs/createService -H 'Content-Type: Application/json' -d '
+  {
+    "serviceName": "KakaoFavorites",
+    "compressionAlgorithm" : "gz"
+  }'
+
+To make sure the service is created correctly, check out the following
+
+.. code:: bash
+
+  curl -XGET localhost:9000/graphs/getService/KakaoFavorites
+
+Next, we will need some friends.
+---------------------------------------------
+In S2Graph, relationships are organized as labels. Create a ``friends`` label with the following ``createLabel`` API call:
+
+.. code:: bash
+
+  curl -XPOST localhost:9000/graphs/createLabel -H 'Content-Type: Application/json' -d '
+  {
+    "label": "friends",
+    "srcServiceName": "KakaoFavorites",
+    "srcColumnName": "userName",
+    "srcColumnType": "string",
+    "tgtServiceName": "KakaoFavorites",
+    "tgtColumnName": "userName",
+    "tgtColumnType": "string",
+    "isDirected": "false",
+    "indices": [],
+    "props": [],
+    "consistencyLevel": "strong"
+  }'
+
+Check if the label has been created correctly:
+
+.. code:: bash
+
+   curl -XGET localhost:9000/graphs/getLabel/friends
+
+Now that the label ``friends`` is ready, we can store the friendship data. Entries of a label are called edges, and you can add edges with ``edges/insert`` API:
+
+.. code:: bash
+
+   curl -XPOST localhost:9000/graphs/edges/insert -H 'Content-Type: Application/json' -d '
+   [
+      {"from":"Elmo","to":"Big Bird","label":"friends","props":{},"timestamp":1444360152477},
+      {"from":"Elmo","to":"Ernie","label":"friends","props":{},"timestamp":1444360152478},
+      {"from":"Elmo","to":"Bert","label":"friends","props":{},"timestamp":1444360152479},
+      {"from":"Cookie Monster","to":"Grover","label":"friends","props":{},"timestamp":1444360152480},
+      {"from":"Cookie Monster","to":"Kermit","label":"friends","props":{},"timestamp":1444360152481},
+      {"from":"Cookie Monster","to":"Oscar","label":"friends","props":{},"timestamp":1444360152482}
+   ]'
+
+Query friends of Elmo with ``getEdges`` API:
+
+.. code:: bash
+
+  curl -XPOST localhost:9000/graphs/getEdges -H 'Content-Type: Application/json' -d '
+  {
+    "srcVertices": [{"serviceName": "KakaoFavorites", "columnName": "userName", "id":"Elmo"}],
+    "steps": [
+      {"step": [{"label": "friends", "direction": "out", "offset": 0, "limit": 10}]}
+    ]
+  }'
+
+Now query friends of Cookie Monster:
+
+.. code:: bash
+
+  curl -XPOST localhost:9000/graphs/getEdges -H 'Content-Type: Application/json' -d '
+  {
+    "srcVertices": [{"serviceName": "KakaoFavorites", "columnName": "userName", "id":"Cookie Monster"}],
+    "steps": [
+      {"step": [{"label": "friends", "direction": "out", "offset": 0, "limit": 10}]}
+    ]
+  }'
+
+Users of Kakao Favorites will be able to ``post`` URLs of their favorite websites.
+----------------------------------------------------------------------------------
+
+We will need a new label ``post`` for this data:
+
+.. code:: bash
+
+  curl -XPOST localhost:9000/graphs/createLabel -H 'Content-Type: Application/json' -d '
+  {
+    "label": "post",
+    "srcServiceName": "KakaoFavorites",
+    "srcColumnName": "userName",
+    "srcColumnType": "string",
+    "tgtServiceName": "KakaoFavorites",
+    "tgtColumnName": "url",
+    "tgtColumnType": "string",
+    "isDirected": "true",
+    "indices": [],
+    "props": [],
+    "consistencyLevel": "strong"
+  }'
+
+Now, insert some posts of the users:
+
+.. code:: bash
+
+  curl -XPOST localhost:9000/graphs/edges/insert -H 'Content-Type: Application/json' -d '
+  [
+    {"from":"Big Bird","to":"www.kakaocorp.com/en/main","label":"post","props":{},"timestamp":1444360152477},
+    {"from":"Big Bird","to":"github.com/kakao/s2graph","label":"post","props":{},"timestamp":1444360152478},
+    {"from":"Ernie","to":"groups.google.com/forum/#!forum/s2graph","label":"post","props":{},"timestamp":1444360152479},
+    {"from":"Grover","to":"hbase.apache.org/forum/#!forum/s2graph","label":"post","props":{},"timestamp":1444360152480},
+    {"from":"Kermit","to":"www.playframework.com","label":"post","props":{},"timestamp":1444360152481},
+    {"from":"Oscar","to":"www.scala-lang.org","label":"post","props":{},"timestamp":1444360152482}
+  ]'
+
+
+Query posts of Big Bird:
+
+.. code:: bash
+
+  curl -XPOST localhost:9000/graphs/getEdges -H 'Content-Type: Application/json' -d '
+  {
+    "srcVertices": [{"serviceName": "KakaoFavorites", "columnName": "userName", "id":"Big Bird"}],
+    "steps": [
+      {"step": [{"label": "post", "direction": "out", "offset": 0, "limit": 10}]}
+    ]
+  }'
+
+
+So far, we have designed a label schema for the labels ``friends`` and ``post``, and stored some edges to them.
+---------------------------------------------------------------------------------------------------------------
+
+This should be enough for creating the timeline feature! The following two-step query will return the URLs for Elmo's timeline, which are the posts of Elmo's friends:
+
+
+.. code:: bash
+
+  curl -XPOST localhost:9000/graphs/getEdges -H 'Content-Type: Application/json' -d '
+  {
+    "srcVertices": [{"serviceName": "KakaoFavorites", "columnName": "userName", "id":"Elmo"}],
+    "steps": [
+      {"step": [{"label": "friends", "direction": "out", "offset": 0, "limit": 10}]},
+      {"step": [{"label": "post", "direction": "out", "offset": 0, "limit": 10}]}
+    ]
+  }'
+
+Also try Cookie Monster's timeline:
+
+.. code:: bash
+
+  curl -XPOST localhost:9000/graphs/getEdges -H 'Content-Type: Application/json' -d '
+  {
+    "srcVertices": [{"serviceName": "KakaoFavorites", "columnName": "userName", "id":"Cookie Monster"}],
+    "steps": [
+      {"step": [{"label": "friends", "direction": "out", "offset": 0, "limit": 10}]},
+      {"step": [{"label": "post", "direction": "out", "offset": 0, "limit": 10}]}
+    ]
+  }'
+
+
+The example above is by no means a full blown social network timeline, but it gives you an idea of how to represent, store and query graph data with S2Graph.
+
+We also provide a simple script under ``script/test.sh`` so that you can see if everything is setup correctly.
+
+.. code:: bash
+
+  sh script/test.sh
diff --git a/doc/source/index.rst b/doc/source/index.rst
new file mode 100644
index 0000000..83bf8d5
--- /dev/null
+++ b/doc/source/index.rst
@@ -0,0 +1,30 @@
+.. S2Graph documentation master file, created by
+   sphinx-quickstart on Mon Nov 12 15:53:14 2018.
+   You can adapt this file completely to your liking, but it should at least
+   contain the root `toctree` directive.
+
+Introduction
+===================================
+
+S2Graph is a graph database designed to handle transactional graph processing at scale. Its REST API allows you to store, manage and query relational information using edge and vertex representations in a fully asynchronous and non-blocking manner. This document covers the basic concepts and terminology of S2Graph to help you get a feel for the S2Graph API.
+
+Mailing Lists
+Everyone interested in using and developing S2Graph is welcome to join our mailing lists!
+
+users@s2graph.incubator.apache.org is for usage questions and announcements.
+dev@s2graph.incubator.apache.org is for people who want to contribute to S2Graph.
+
+.. toctree::
+   :caption: S2Graph Documentation
+   :maxdepth: 3
+
+   getting_started/index
+   getting_started/your_first_graph
+
+..
+   Indices and tables
+   ==================
+
+   * :ref:`genindex`
+   * :ref:`modindex`
+   * :ref:`search`
diff --git a/project/Common.scala b/project/Common.scala
index e4a1b61..1008b2c 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -26,10 +26,11 @@
   val specs2Version = "3.8.5"
 
   val hbaseVersion = "1.2.6.1"
+  val asynchbaseVersion = "1.7.2"
   val hadoopVersion = "2.7.3"
   val tinkerpopVersion = "3.2.5"
 
-  val elastic4sVersion = "6.1.1"
+  val elastic4sVersion = "6.2.4"
 
   val KafkaVersion = "0.10.2.1"
 
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 0368715..76ec221 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -41,7 +41,7 @@
   "com.h2database" % "h2" % "1.4.192",
   "com.stumbleupon" % "async" % "1.4.1",
   "io.netty" % "netty" % "3.9.4.Final" force(),
-  "org.hbase" % "asynchbase" % "1.7.2" excludeLogging(),
+  "org.hbase" % "asynchbase" % asynchbaseVersion excludeLogging(),
   "net.bytebuddy" % "byte-buddy" % "1.4.26",
   "org.apache.tinkerpop" % "gremlin-core" % tinkerpopVersion excludeLogging(),
   "org.apache.tinkerpop" % "gremlin-test" % tinkerpopVersion % "test",
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
index e850541..b1aba3e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ColumnMeta.scala
@@ -136,6 +136,8 @@
       val cacheKey = className + s"columnId=${columnId}"
       (cacheKey -> ls)
     }.toList)
+
+    ls
   }
 
   def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try {
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
index bb8425f..4209ca5 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/LabelIndex.scala
@@ -166,6 +166,8 @@
       val cacheKey = s"labelId=${labelId}"
       (className + cacheKey -> ls)
     }.toList)
+
+    ls
   }
 }
 
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index f670e9c..0c49e71 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -297,7 +297,7 @@
         get.setMaxTimestamp(maxTs)
         get.setTimeout(queryParam.rpcTimeout)
 
-        val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset)
+        val pagination = new ColumnPaginationFilter(queryParam.innerLimit, queryParam.innerOffset)
         val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
           new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
         }
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/ExampleTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/ExampleTest.scala
new file mode 100644
index 0000000..76e30d9
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/ExampleTest.scala
@@ -0,0 +1,130 @@
+package org.apache.s2graph.core.Integrate
+
+import org.apache.s2graph.core.schema._
+import play.api.libs.json.{JsObject, Json}
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+class ExampleTest extends IntegrateCommon {
+
+  import TestUtil._
+
+  private def prepareTestData() = {
+    // create service KakaoFavorites
+    val createServicePayload = Json.parse(
+      """
+        |{"serviceName": "KakaoFavorites", "compressionAlgorithm" : "gz"}
+        |""".stripMargin)
+
+    val createFriendsPayload = Json.parse(
+      s"""{
+         |  "label": "friends",
+         |  "srcServiceName": "KakaoFavorites",
+         |  "srcColumnName": "userName",
+         |  "srcColumnType": "string",
+         |  "tgtServiceName": "KakaoFavorites",
+         |  "tgtColumnName": "userName",
+         |  "tgtColumnType": "string",
+         |  "isDirected": "false",
+         |  "indices": [],
+         |  "props": [],
+         |  "consistencyLevel": "strong"
+         |}""".stripMargin)
+
+    val createPostPayload = Json.parse(
+      """{
+        |  "label": "post",
+        |  "srcServiceName": "KakaoFavorites",
+        |  "srcColumnName": "userName",
+        |  "srcColumnType": "string",
+        |  "tgtServiceName": "KakaoFavorites",
+        |  "tgtColumnName": "url",
+        |  "tgtColumnType": "string",
+        |  "isDirected": "true",
+        |  "indices": [],
+        |  "props": [],
+        |  "consistencyLevel": "strong"
+        |}""".stripMargin)
+
+    val insertFriendsPayload = Json.parse(
+      """[
+        |  {"from":"Elmo","to":"Big Bird","label":"friends","props":{},"timestamp":1444360152477},
+        |  {"from":"Elmo","to":"Ernie","label":"friends","props":{},"timestamp":1444360152478},
+        |  {"from":"Elmo","to":"Bert","label":"friends","props":{},"timestamp":1444360152479},
+        |
+        |  {"from":"Cookie Monster","to":"Grover","label":"friends","props":{},"timestamp":1444360152480},
+        |  {"from":"Cookie Monster","to":"Kermit","label":"friends","props":{},"timestamp":1444360152481},
+        |  {"from":"Cookie Monster","to":"Oscar","label":"friends","props":{},"timestamp":1444360152482}
+        |]""".stripMargin)
+
+    val insertPostPayload = Json.parse(
+      """[
+        |  {"from":"Big Bird","to":"www.kakaocorp.com/en/main","label":"post","props":{},"timestamp":1444360152477},
+        |  {"from":"Big Bird","to":"github.com/kakao/s2graph","label":"post","props":{},"timestamp":1444360152478},
+        |  {"from":"Ernie","to":"groups.google.com/forum/#!forum/s2graph","label":"post","props":{},"timestamp":1444360152479},
+        |  {"from":"Grover","to":"hbase.apache.org/forum/#!forum/s2graph","label":"post","props":{},"timestamp":1444360152480},
+        |  {"from":"Kermit","to":"www.playframework.com","label":"post","props":{},"timestamp":1444360152481},
+        |  {"from":"Oscar","to":"www.scala-lang.org","label":"post","props":{},"timestamp":1444360152482}
+        |]""".stripMargin)
+
+
+    val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = parser.toServiceElements(createServicePayload)
+    management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
+
+    Service.findByName("KakaoFavorites", useCache = false)
+
+    Label.findByName("friends", useCache = false).foreach { label =>
+      Label.delete(label.id.get)
+    }
+    parser.toLabelElements(createFriendsPayload)
+
+    Label.findByName("post", useCache = false).foreach { label =>
+      Label.delete(label.id.get)
+    }
+
+    parser.toLabelElements(createPostPayload)
+    Await.result(graph.mutateEdges(parser.parseJsonFormat(insertFriendsPayload, operation = "insert").map(_._1), withWait = true), Duration("10 seconds"))
+    Await.result(graph.mutateEdges(parser.parseJsonFormat(insertPostPayload, operation = "insert").map(_._1), withWait = true), Duration("10 seconds"))
+  }
+
+  override def initTestData(): Unit = {
+    prepareTestData()
+  }
+
+  test("[S2GRAPH-243]: Limit bug on 'graph/getEdges' offset 0, limit 3") {
+    val queryJson = Json.parse(
+      """{
+        |    "select": ["to"],
+        |    "srcVertices": [{"serviceName": "KakaoFavorites", "columnName": "userName", "id":"Elmo"}],
+        |    "steps": [
+        |      {"step": [{"label": "friends", "direction": "out", "offset": 0, "limit": 3}]}
+        |    ]
+        |}""".stripMargin)
+
+    val response = getEdgesSync(queryJson)
+
+    val expectedFriends = Seq("Bert", "Ernie", "Big Bird")
+    val friends = (response \ "results").as[Seq[JsObject]].map(obj => (obj \ "to").as[String])
+
+    friends shouldBe expectedFriends
+  }
+
+  test("[S2GRAPH-243]: Limit bug on 'graph/getEdges' offset 1, limit 2") {
+    val queryJson = Json.parse(
+      """{
+        |    "select": ["to"],
+        |    "srcVertices": [{"serviceName": "KakaoFavorites", "columnName": "userName", "id":"Elmo"}],
+        |    "steps": [
+        |      {"step": [{"label": "friends", "direction": "out", "offset": 1, "limit": 2}]}
+        |    ]
+        |}""".stripMargin)
+
+    val response = getEdgesSync(queryJson)
+
+    val expectedFriends = Seq("Ernie", "Big Bird")
+    val friends = (response \ "results").as[Seq[JsObject]].map(obj => (obj \ "to").as[String])
+
+    friends shouldBe expectedFriends
+  }
+}
diff --git a/s2graphql/README.md b/s2graphql/README.md
index 21973d9..0180df7 100644
--- a/s2graphql/README.md
+++ b/s2graphql/README.md
@@ -20,10 +20,10 @@
 --->
 # Suggest to implement GraphQL as standard web interface for S2Graph.
 
-  - To support GraphQL i used [Akka HTTP](https://github.com/akka/akka-http) and [Sangria](https://github.com/sangria-graphql). each is an HTTP Server and GraphQL Scala implementation.
-  - I also used [GraphiQL](https://github.com/graphql/graphiql) as a tool for GraphQL queries.
+  - To support GraphQL through [Akka HTTP](https://github.com/akka/akka-http) and [Sangria](https://github.com/sangria-graphql). Akka HTTP and Sangria each are an HTTP Server and GraphQL Scala implementation.
+  - It is also used [GraphiQL](https://github.com/graphql/graphiql) as a tool for GraphQL queries.
 
-## Wroking example
+## Working example
 
 ![mutation](https://user-images.githubusercontent.com/1182522/35611013-f551f2b6-06a6-11e8-8f48-e39e667a8849.gif)
 
@@ -32,13 +32,13 @@
 
 ## Overview
   
-  The reason I started this work is because the `Label` used by S2Graph has a strong type system, so I think it will work well with the `schema` provided by GraphQL.
+  The reason why started supporting GraphQL is the `Label` used by S2Graph has a strong type system, so it will work well with the `schema` provided by GraphQL.
   
-  To do this, we converted S2Graph Model (Label, Service ...) into GraphLQL schema whenever added (changed).
+  So far, whenever GraphQL schema has been changed, it has been reflected in S2Graph Model (Service, Label... ).
 
 ## Setup
   Assume that hbase is running on localhost.  
-  If the hbase environment is not set up, you can run it with the following command
+  If the hbase environment is not set up, you should type the following commands.
 
 ```bash
 sbt package
@@ -63,7 +63,7 @@
 graphiql.html
 ```
 
-And let's run http server.
+Then let's run http server.
 
 ```bash
 sbt -DschemaCacheTTL=-1 -Dhttp.port=8000 'project s2graphql' '~re-start'
@@ -83,16 +83,16 @@
 ## Your First Grpah (GraphQL version)
 
 [S2Graph tutorial](https://github.com/apache/incubator-s2graph#your-first-graph)
-I have ported the contents of `Your first graph` provided by S2Graph based on GraphQL.
+The following content rewrote `Your first graph` to the GraphQL version.
 
 ### Start by connecting to `http://localhost:8000`.
 
-The environment for this example is Mac OS and Chrome.
+The environment for this examples is Mac OS and Chrome.
 You can get help with schema-based `Autocompletion` using the `ctrl + space` key.
 
 If you add a `label` or `service`, etc. you will need to `refresh` (`cmd + r`) your browser because the schema will change dynamically.
 
-1. First, we need a name for the new service.
+#### 1. First, we need a name for the new service.
 
     The following POST query will create a service named "KakaoFavorites".
 
@@ -128,7 +128,7 @@
 ```
 
 
-1.1 And create a `service column`` which is meta information for storing vertex.
+#### 1.1 And create a `service column`` which is meta information for storing vertex.
 
     The following POST query will create a service column with the age attribute named "user"
 
@@ -191,7 +191,7 @@
 ```graphql
 query {
   Management {
-    Service(name:KakaoFavorites) {    
+    Services(name:KakaoFavorites) {    
       name
       serviceColumns {
         name
@@ -231,7 +231,7 @@
 }
 ```
 
-2. Next, we will need some friends.
+#### 2. Next, we will need some friends.
 
     In S2Graph, relationships are organized as labels. Create a label called friends using the following createLabel API call:
 
@@ -292,7 +292,7 @@
 ```graphql
 query {
   Management {
-    Label(name: friends) {
+    Labels(name: friends) {
       name
       srcColumnName
       tgtColumnName
@@ -376,7 +376,7 @@
   KakaoFavorites {    
     user(id: "Elmo") {
       friends {
-        to {
+        user {
           id
         }
       }
@@ -418,7 +418,7 @@
   KakaoFavorites {    
     user(id: "Elmo") {      
       friends {        
-        to {
+        user {
           id
         }
       }
@@ -459,8 +459,46 @@
 }
 ```
 
+Before next examples, you should add url to serviceColumn.
 
-3. Users of Kakao Favorites will be able to post URLs of their favorite websites.
+Request
+
+```graphql
+mutation {
+  Management {
+    createServiceColumn(
+      serviceName: KakaoFavorites
+      columnName: "url"
+      columnType: string
+    ) {
+      isSuccess
+      object {
+        name
+      }
+    }
+  }
+}
+```
+
+Response
+
+```json
+{
+  "data": {
+    "Management": {
+      "createServiceColumn": {
+        "isSuccess": true,
+        "object": {
+          "name": "url"
+        }
+      }
+    }
+  }
+}
+```
+
+
+#### 3. Users of Kakao Favorites will be able to post URLs of their favorite websites.
 
 Request
 
@@ -474,9 +512,9 @@
           columnName: user
         }
       }
-			targetService: {
+      targetService: {
         KakaoFavorites: {
-          columnName: user
+          columnName: url
         }
       }
       consistencyLevel: strong
@@ -561,7 +599,7 @@
 }
 ```
 
-4. So far, we have designed a label schema for the labels friends and post, and stored some edges to them.+
+#### 4. So far, we have designed a label schema for the labels friends and post, and stored some edges to them.+
 
     This should be enough for creating the timeline feature! The following two-step query will return the URLs for Elmo's timeline, which are the posts of Elmo's friends:
 
@@ -573,10 +611,10 @@
     user(id: "Elmo") {
       id
       friends {
-        to {
+        user {
           id
           post {
-            to {
+            url {
               id
             }
           }
@@ -597,11 +635,11 @@
           "id": "Elmo",
           "friends": [
             {
-              "to": {
+              "user": {
                 "id": "Ernie",
                 "post": [
                   {
-                    "to": {
+                    "url": {
                       "id": "groups.google.com/forum/#!forum/s2graph"
                     }
                   }
@@ -609,23 +647,23 @@
               }
             },
             {
-              "to": {
+              "user": {
                 "id": "Bert",
                 "post": []
               }
             },
             {
-              "to": {
+              "user": {
                 "id": "Big Bird",
                 "post": [
                   {
-                    "to": {
-                      "id": "www.kakaocorp.com/en/main"
+                    "url": {
+                      "id": "github.com/kakao/s2graph"
                     }
                   },
                   {
-                    "to": {
-                      "id": "github.com/kakao/s2graph"
+                    "url": {
+                      "id": "www.kakaocorp.com/en/main"
                     }
                   }
                 ]
@@ -647,10 +685,10 @@
   KakaoFavorites {
     user(id: "Cookie Monster") {
       friends {      
-        to {
+        user {
           id
           post {
-            to {
+            url {
               id
             }
           }
@@ -670,11 +708,11 @@
         {
           "friends": [
             {
-              "to": {
+              "user": {
                 "id": "Oscar",
                 "post": [
                   {
-                    "to": {
+                    "url": {
                       "id": "www.scala-lang.org"
                     }
                   }
@@ -682,11 +720,11 @@
               }
             },
             {
-              "to": {
+              "user": {
                 "id": "Kermit",
                 "post": [
                   {
-                    "to": {
+                    "url": {
                       "id": "www.playframework.com"
                     }
                   }
@@ -694,11 +732,11 @@
               }
             },
             {
-              "to": {
+              "user": {
                 "id": "Grover",
                 "post": [
                   {
-                    "to": {
+                    "url": {
                       "id": "hbase.apache.org/forum/#!forum/s2graph"
                     }
                   }
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index 5f1c225..b650714 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -45,7 +45,7 @@
 import scala.util.control.NonFatal
 import scala.util.{Failure, Success, Try}
 
-object GraphQLServer {
+class GraphQLServer() {
   val className = Schema.getClass.getName
   val logger = LoggerFactory.getLogger(this.getClass)
 
@@ -57,12 +57,14 @@
 
   val config = ConfigFactory.load()
   val s2graph = new S2Graph(config)
-  val schemaCacheTTL = Try(config.getInt("schemaCacheTTL")).getOrElse(-1)
-  val s2Repository = new GraphRepository(s2graph)
+  val schemaCacheTTL = Try(config.getInt("schemaCacheTTL")).getOrElse(3000)
+  val enableMutation = Try(config.getBoolean("enableMutation")).getOrElse(false)
+
   val schemaConfig = ConfigFactory.parseMap(Map(
     SafeUpdateCache.MaxSizeKey -> 1, SafeUpdateCache.TtlKey -> schemaCacheTTL
   ).asJava)
 
+  // Manage schema instance lifecycle
   val schemaCache = new SafeUpdateCache(schemaConfig)
 
   def updateEdgeFetcher(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Try[Unit] = {
@@ -77,17 +79,26 @@
     ret
   }
 
+  val schemaCacheKey = className + "s2Schema"
+
+  schemaCache.put(schemaCacheKey, createNewSchema(enableMutation))
+
   /**
-    * In development mode(schemaCacheTTL = -1),
+    * In development mode(schemaCacheTTL = 1),
     * a new schema is created for each request.
     */
-  logger.info(s"schemaCacheTTL: ${schemaCacheTTL}")
 
-  private def createNewSchema(): Schema[GraphRepository, Any] = {
-    val newSchema = new SchemaDef(s2Repository).S2GraphSchema
-    logger.info(s"Schema updated: ${System.currentTimeMillis()}")
+  private def createNewSchema(withAdmin: Boolean): (SchemaDef, GraphRepository) = {
+    logger.info(s"Schema update start")
 
-    newSchema
+    val ts = System.currentTimeMillis()
+
+    val s2Repository = new GraphRepository(s2graph)
+    val newSchema = new SchemaDef(s2Repository, withAdmin)
+
+    logger.info(s"Schema updated: ${(System.currentTimeMillis() - ts) / 1000} sec")
+
+    newSchema -> s2Repository
   }
 
   def formatError(error: Throwable): JsValue = error match {
@@ -115,15 +126,16 @@
   def executeGraphQLQuery(query: Document, op: Option[String], vars: JsObject)(implicit e: ExecutionContext) = {
     import GraphRepository._
 
-    val cacheKey = className + "s2Schema"
-    val s2schema = schemaCache.withCache(cacheKey, broadcast = false, onEvict = onEvictSchema)(createNewSchema())
+    val (schemaDef, s2Repository) =
+      schemaCache.withCache(schemaCacheKey, broadcast = false, onEvict = onEvictSchema)(createNewSchema(enableMutation))
+
     val resolver: DeferredResolver[GraphRepository] = DeferredResolver.fetchers(vertexFetcher, edgeFetcher)
 
     val includeGrpaph = vars.fields.get("includeGraph").contains(spray.json.JsBoolean(true))
     val middleWares = if (includeGrpaph) GraphFormatted :: TransformMiddleWare else TransformMiddleWare
 
     Executor.execute(
-      s2schema,
+      schemaDef.schema,
       query,
       s2Repository,
       variables = vars,
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
index 65ff348..8b89c73 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
@@ -54,12 +54,14 @@
 
   import spray.json.DefaultJsonProtocol._
 
+  val graphQLServer = new GraphQLServer()
+
   val route: Route =
     get {
       getFromResource("assets/graphiql.html")
     } ~ (post & path("updateEdgeFetcher")) {
       entity(as[JsValue]) { body =>
-        GraphQLServer.updateEdgeFetcher(body) match {
+        graphQLServer.updateEdgeFetcher(body) match {
           case Success(_) => complete(StatusCodes.OK -> JsString("Update fetcher finished"))
           case Failure(e) =>
             logger.error("Error on execute", e)
@@ -70,11 +72,11 @@
       parameters('operationName.?, 'variables.?) { (operationNameParam, variablesParam) =>
         entity(as[Document]) { document ⇒
           variablesParam.map(parseJson) match {
-            case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, JsObject()))
-            case Some(Right(js)) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationNameParam, js.asJsObject))
+            case None ⇒ complete(graphQLServer.executeGraphQLQuery(document, operationNameParam, JsObject()))
+            case Some(Right(js)) ⇒ complete(graphQLServer.executeGraphQLQuery(document, operationNameParam, js.asJsObject))
             case Some(Left(e)) ⇒
               logger.error("Error on execute", e)
-              complete(StatusCodes.BadRequest -> GraphQLServer.formatError(e))
+              complete(StatusCodes.BadRequest -> graphQLServer.formatError(e))
           }
         } ~ entity(as[JsValue]) { body ⇒
           val fields = body.asJsObject.fields
@@ -84,13 +86,13 @@
           val variables = fields.get("variables").filterNot(_ == JsNull)
 
           query.map(QueryParser.parse(_)) match {
-            case None ⇒ complete(StatusCodes.BadRequest -> GraphQLServer.formatError("No query to execute"))
+            case None ⇒ complete(StatusCodes.BadRequest -> graphQLServer.formatError("No query to execute"))
             case Some(Failure(error)) ⇒
               logger.error("Error on execute", error)
-              complete(StatusCodes.BadRequest -> GraphQLServer.formatError(error))
+              complete(StatusCodes.BadRequest -> graphQLServer.formatError(error))
             case Some(Success(document)) => variables match {
-              case Some(js) ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, js.asJsObject))
-              case None ⇒ complete(GraphQLServer.executeGraphQLQuery(document, operationName, JsObject()))
+              case Some(js) ⇒ complete(graphQLServer.executeGraphQLQuery(document, operationName, js.asJsObject))
+              case None ⇒ complete(graphQLServer.executeGraphQLQuery(document, operationName, JsObject()))
             }
           }
         }
@@ -101,7 +103,9 @@
 
   logger.info(s"Starting GraphQL server... $port")
 
-  Http().bindAndHandle(route, "0.0.0.0", port)
+  Http().bindAndHandle(route, "0.0.0.0", port).foreach { binding =>
+    logger.info(s"GraphQL server ready for connect")
+  }
 
   def shutdown(): Unit = {
     logger.info("Terminating...")
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
index b5e65dc..460a817 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
@@ -79,17 +79,80 @@
 
     tryObj
   }
+
+  def services(): List[Service] = {
+    Service.findAll().distinct
+  }
+
+  def serviceColumns(): List[ServiceColumn] = {
+    val allServices = services().toSet
+
+    ServiceColumn
+      .findAll()
+      .filter(sc => allServices(sc.service))
+      .distinct
+  }
+
+  def labels() = {
+    val allServiceColumns = serviceColumns().toSet
+
+    Label
+      .findAll()
+      .filter(l => allServiceColumns(l.srcColumn) || allServiceColumns(l.tgtColumn))
+      .distinct
+  }
+
+  def labelIndices() = {
+    LabelIndex.findAll()
+  }
+
+  def labelMetas() = {
+    LabelMeta.findAll()
+  }
+
+  def columnMetas() = {
+    ColumnMeta.findAll()
+  }
 }
 
 class GraphRepository(val graph: S2GraphLike) {
+
   implicit val logger = LoggerFactory.getLogger(this.getClass)
 
   import GraphRepository._
 
+  implicit val ec = graph.ec
+
   val management = graph.management
   val parser = new RequestParser(graph)
 
-  implicit val ec = graph.ec
+  val services = GraphRepository.services()
+  val serviceColumns = GraphRepository.serviceColumns()
+  val columnMetas = GraphRepository.columnMetas()
+
+  val labels = GraphRepository.labels
+  val labelMetas = GraphRepository.labelMetas()
+  val labelIndices = GraphRepository.labelIndices()
+
+  val serviceColumnMap = services.map { s =>
+    s -> serviceColumns.filter(s.id.get == _.serviceId)
+  }.toMap
+
+  val labelMetaMap = labels.map { l =>
+    l -> labelMetas.filter(l.id.get == _.labelId)
+  }.toMap
+
+  val labelIndiceMap = labels.map { l =>
+    l -> labelIndices.filter(l.id.get == _.labelId)
+  }.toMap
+
+  val columnMetaMap = serviceColumns.map { sc =>
+    sc -> columnMetas.filter(sc.id.get == _.columnId)
+  }.toMap
+
+  val columnLabelMap = serviceColumns.map { sc =>
+    sc -> labels.filter(l => l.srcColumn == sc || l.tgtColumn == sc)
+  }.toMap
 
   def toS2EdgeLike(labelName: String, param: AddEdgeParam): S2EdgeLike = {
     graph.toEdge(
@@ -130,14 +193,14 @@
 
   def getEdges(vertices: Seq[S2VertexLike], queryParam: QueryParam): Future[Seq[S2EdgeLike]] = {
     val step = Step(Seq(queryParam))
-    val q = Query(vertices, steps = Vector(step))
+    val q = Query(vertices, steps = Vector(step), QueryOption(returnDegree = false))
 
     graph.getEdges(q).map(_.edgeWithScores.map(_.edge))
   }
 
   def getEdges(vertex: S2VertexLike, queryParam: QueryParam): Future[Seq[S2EdgeLike]] = {
     val step = Step(Seq(queryParam))
-    val q = Query(Seq(vertex), steps = Vector(step))
+    val q = Query(Seq(vertex), steps = Vector(step), QueryOption(returnDegree = false))
 
     graph.getEdges(q).map(_.edgeWithScores.map(_.edge))
   }
@@ -273,14 +336,8 @@
 
   def deleteLabel(args: Args): Try[Label] = {
     val labelName = args.arg[String]("name")
-
     val deleteLabelTry = Management.deleteLabel(labelName)
+
     withLogTryResponse("deleteLabel", deleteLabelTry)
   }
-
-  def services(): List[Service] = Service.findAll()
-
-  def serviceColumns(): List[ServiceColumn] = ServiceColumn.findAll()
-
-  def labels() = Label.findAll()
 }
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
index 1994e5e..4de1b2e 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
@@ -53,7 +53,7 @@
     val vertex = c.value.asInstanceOf[S2VertexLike]
 
     val dir = c.arg[String]("direction")
-    val offset = c.arg[Int]("offset") + 1 // +1 for skip degree edge: currently not support
+    val offset = c.arg[Int]("offset") // +1 for skip degree edge: currently not support
     val limit = c.arg[Int]("limit")
     val whereClauseOpt = c.argOpt[String]("filter")
     val where = c.ctx.parser.extractWhere(label, whereClauseOpt)
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
index 9baec93..deb7664 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/ManagementType.scala
@@ -68,7 +68,7 @@
   import org.apache.s2graph.graphql.bind.Unmarshaller._
   import org.apache.s2graph.graphql.types.StaticTypes._
 
-  lazy val serviceColumnOnServiceWithPropInputObjectFields = repo.services().map { service =>
+  lazy val serviceColumnOnServiceWithPropInputObjectFields = repo.services.map { service =>
     InputField(service.serviceName.toValidName, OptionInputType(InputObjectType(
       s"Input_${service.serviceName.toValidName}_ServiceColumn_Props",
       description = "desc here",
@@ -79,7 +79,7 @@
     )))
   }
 
-  lazy val serviceColumnOnServiceInputObjectFields = repo.services().map { service =>
+  lazy val serviceColumnOnServiceInputObjectFields = repo.services.map { service =>
     InputField(service.serviceName.toValidName, OptionInputType(InputObjectType(
       s"Input_${service.serviceName.toValidName}_ServiceColumn",
       description = "desc here",
@@ -90,17 +90,20 @@
   }
 
   def makeServiceColumnEnumTypeOnService(service: Service): EnumType[String] = {
-    val columns = service.serviceColumns(false).toList
+    val columns = repo.serviceColumnMap(service)
+
     EnumType(
-      s"Enum_${service.serviceName}_ServiceColumn",
+      s"Enum_${service.serviceName.toValidName}_ServiceColumn",
       description = Option("desc here"),
-      values = dummyEnum +: columns.map { column =>
-        EnumValue(column.columnName.toValidName, value = column.columnName.toValidName)
-      }
+      values =
+        if (columns.isEmpty) dummyEnum :: Nil
+        else columns.map { column =>
+          EnumValue(column.columnName.toValidName, value = column.columnName)
+        }
     )
   }
 
-  lazy val labelPropsInputFields = repo.labels().map { label =>
+  lazy val labelPropsInputFields = repo.labels.map { label =>
     InputField(label.label.toValidName, OptionInputType(InputObjectType(
       s"Input_${label.label.toValidName}_props",
       description = "desc here",
@@ -115,7 +118,7 @@
     ObjectTypeDescription("desc here"),
     RenameField("serviceName", "name"),
     AddFields(
-      Field("serviceColumns", ListType(ServiceColumnType), resolve = c => c.value.serviceColumns(false).toList)
+      Field("serviceColumns", ListType(ServiceColumnType), resolve = c => c.value.serviceColumns(true).toList)
     )
   )
 
@@ -135,28 +138,34 @@
   lazy val ServiceListType = EnumType(
     s"Enum_Service",
     description = Option("desc here"),
-    values =
-      dummyEnum +: repo.services().map { service =>
+    values = {
+      if (repo.services.isEmpty) dummyEnum :: Nil
+      else repo.services.map { service =>
         EnumValue(service.serviceName.toValidName, value = service.serviceName)
       }
+    }
   )
 
   lazy val ServiceColumnListType = EnumType(
     s"Enum_ServiceColumn",
     description = Option("desc here"),
-    values =
-      dummyEnum +: repo.serviceColumns().map { serviceColumn =>
+    values = {
+      if (repo.serviceColumns.isEmpty) dummyEnum :: Nil
+      else repo.serviceColumns.map { serviceColumn =>
         EnumValue(serviceColumn.columnName.toValidName, value = serviceColumn.columnName)
       }
+    }
   )
 
   lazy val EnumLabelsType = EnumType(
     s"Enum_Label",
     description = Option("desc here"),
-    values =
-      dummyEnum +: repo.labels().map { label =>
+    values = {
+      if (repo.labels.isEmpty) dummyEnum :: Nil
+      else repo.labels.map { label =>
         EnumValue(label.label.toValidName, value = label.label)
       }
+    }
   )
 
   lazy val ServiceMutationResponseType = makeMutationResponseType[Service](
@@ -184,8 +193,8 @@
     arguments = List(LabelNameArg),
     resolve = { c =>
       c.argOpt[String]("name") match {
-        case Some(name) => c.ctx.labels().filter(_.label == name)
-        case None => c.ctx.labels()
+        case Some(name) => repo.labels.filter(_.label == name)
+        case None => repo.labels
       }
     }
   )
@@ -201,19 +210,25 @@
   val AddPropServiceType = InputObjectType[ServiceColumnParam](
     "Input_Service_ServiceColumn_Props",
     description = "desc",
-    fields = DummyInputField +: serviceColumnOnServiceWithPropInputObjectFields
+    fields =
+      if (serviceColumnOnServiceWithPropInputObjectFields.isEmpty) DummyInputField :: Nil
+      else serviceColumnOnServiceWithPropInputObjectFields
   )
 
   val ServiceColumnSelectType = InputObjectType[ServiceColumnParam](
     "Input_Service_ServiceColumn",
     description = "desc",
-    fields = DummyInputField +: serviceColumnOnServiceInputObjectFields
+    fields =
+      if (serviceColumnOnServiceInputObjectFields.isEmpty) DummyInputField :: Nil
+      else serviceColumnOnServiceInputObjectFields
   )
 
   val InputServiceType = InputObjectType[ServiceColumnParam](
     "Input_Service",
     description = "desc",
-    fields = DummyInputField +: serviceColumnOnServiceInputObjectFields
+    fields =
+      if (serviceColumnOnServiceInputObjectFields.isEmpty) DummyInputField :: Nil
+      else serviceColumnOnServiceInputObjectFields
   )
 
   lazy val servicesField: Field[GraphRepository, Any] = Field(
@@ -223,8 +238,8 @@
     arguments = List(ServiceNameArg),
     resolve = { c =>
       c.argOpt[String]("name") match {
-        case Some(name) => c.ctx.services().filter(_.serviceName.toValidName == name)
-        case None => c.ctx.services()
+        case Some(name) => repo.services.filter(_.serviceName.toValidName == name)
+        case None => repo.services
       }
     }
   )
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
index a18fc4e..3632525 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
@@ -68,10 +68,13 @@
     }
   }
 
-  def makeInputFieldsOnService(service: Service): Seq[InputField[Any]] = {
-    val inputFields = service.serviceColumns(false).map { serviceColumn =>
+  def makeInputFieldsOnService(service: Service)(implicit repo: GraphRepository): Seq[InputField[Any]] = {
+    val serviceColumns = repo.serviceColumnMap(service)
+
+    val inputFields = serviceColumns.map { serviceColumn =>
+      val columnMetas = repo.columnMetaMap(serviceColumn)
       val idField = InputField("id", toScalarType(serviceColumn.columnType))
-      val propFields = serviceColumn.metasWithoutCache.filter(ColumnMeta.isValid).map { lm =>
+      val propFields = columnMetas.filter(ColumnMeta.isValid).map { lm =>
         InputField(lm.name.toValidName, OptionInputType(toScalarType(lm.dataType)))
       }
 
@@ -87,8 +90,10 @@
     inputFields
   }
 
-  def makeInputFieldsOnLabel(label: Label): Seq[InputField[Any]] = {
-    val propFields = label.labelMetaSet.toList.filterNot(_.name == "timestamp").map { lm =>
+  def makeInputFieldsOnLabel(label: Label)(implicit repo: GraphRepository): Seq[InputField[Any]] = {
+    val labelMetaSet = repo.labelMetaMap(label)
+
+    val propFields = labelMetaSet.filterNot(_.name == "timestamp").map { lm =>
       InputField(lm.name.toValidName, OptionInputType(toScalarType(lm.dataType)))
     }
 
@@ -102,35 +107,119 @@
     labelFields.asInstanceOf[Seq[InputField[Any]]] ++ propFields.asInstanceOf[Seq[InputField[Any]]]
   }
 
-  def makeServiceColumnFields(column: ServiceColumn, allLabels: Seq[Label]): List[Field[GraphRepository, Any]] = {
-    val reservedFields = List("id" -> column.columnType, "timestamp" -> "long")
-    val columnMetasKv = column.metasWithoutCache.filter(ColumnMeta.isValid).map { columnMeta => columnMeta.name -> columnMeta.dataType }
+  def makeServiceColumnFields(column: ServiceColumn)
+                             (implicit repo: GraphRepository): List[Field[GraphRepository, Any]] = {
 
-    val (sameLabel, diffLabel) = allLabels.toList.partition(l => l.srcColumn == l.tgtColumn)
+    val columnMetas = repo.columnMetaMap(column)
+    val relatedLabels = repo.columnLabelMap(column)
 
-    val outLabels = diffLabel.filter(l => column == l.srcColumn).distinct.toList
-    val inLabels = diffLabel.filter(l => column == l.tgtColumn).distinct.toList
+    val reservedFields = Vector("id" -> column.columnType, "timestamp" -> "long")
+    val columnMetasKv = columnMetas.filter(ColumnMeta.isValid).map { columnMeta => columnMeta.name -> columnMeta.dataType }
+
+    val (sameLabel, diffLabel) = relatedLabels.toList.partition(l => l.srcColumn == l.tgtColumn)
+
+    val outLabels = diffLabel.filter(l => column == l.srcColumn).distinct
+    val inLabels = diffLabel.filter(l => column == l.tgtColumn).distinct
     val inOutLabels = sameLabel.filter(l => l.srcColumn == column && l.tgtColumn == column)
 
-    lazy val columnFields = (reservedFields ++ columnMetasKv).map { case (k, v) => makeGraphElementField(k.toValidName, v) }
+    val columnFields = reservedFields.map { case (k, v) => makeGraphElementField(k.toValidName, v) }
+    val propFields = columnMetasKv.map { case (k, v) => makeGraphElementField(k.toValidName, v) }
 
-    lazy val outLabelFields: List[Field[GraphRepository, Any]] = outLabels.map(l => makeLabelField("out", l, allLabels))
-    lazy val inLabelFields: List[Field[GraphRepository, Any]] = inLabels.map(l => makeLabelField("in", l, allLabels))
-    lazy val inOutLabelFields: List[Field[GraphRepository, Any]] = inOutLabels.map(l => makeLabelField("both", l, allLabels))
-    lazy val propsType = wrapField(s"ServiceColumn_${column.service.serviceName}_${column.columnName}_props", "props", columnFields)
+    val outLabelFields: List[Field[GraphRepository, Any]] = outLabels.map(l => toLabelFieldOnColumn("out", l))
+    val inLabelFields: List[Field[GraphRepository, Any]] = inLabels.map(l => toLabelFieldOnColumn("in", l))
+    val inOutLabelFields: List[Field[GraphRepository, Any]] = inOutLabels.map(l => toLabelFieldOnColumn("both", l))
 
-    lazy val labelFieldNameSet = (outLabels ++ inLabels ++ inOutLabels).map(_.label).toSet
+    val propsType =
+      if (propFields.isEmpty) Nil
+      else List(wrapField(
+        s"ServiceColumn_${column.service.serviceName.toValidName}_${column.columnName.toValidName}_props", "props", propFields))
 
-    propsType :: inLabelFields ++ outLabelFields ++ inOutLabelFields ++ columnFields.filterNot(cf => labelFieldNameSet(cf.name))
+    lazy val labelFieldNameSet = (outLabels ++ inLabels ++ inOutLabels).map(_.label.toValidName).toSet
+
+    propsType ++ inLabelFields ++ outLabelFields ++ inOutLabelFields ++ columnFields
   }
 
-  def makeServiceField(service: Service, allLabels: List[Label])(implicit repo: GraphRepository): List[Field[GraphRepository, Any]] = {
-    val columnsOnService = service.serviceColumns(false).toList.map { column =>
-      lazy val serviceColumnFields = makeServiceColumnFields(column, allLabels)
-      lazy val ColumnType = ObjectType(
-        s"ServiceColumn_${service.serviceName}_${column.columnName}",
-        () => fields[GraphRepository, Any](serviceColumnFields: _*)
-      )
+  def toLabelFieldOnColumn(dir: String, label: Label)
+                          (implicit repo: GraphRepository): Field[GraphRepository, Any] = {
+
+    val LabelType = makeLabelType(dir, label)
+
+    val dirArgs = dir match {
+      case "in" => Argument("direction", OptionInputType(InDirectionType), "desc here", defaultValue = "in") :: Nil
+      case "out" => Argument("direction", OptionInputType(OutDirectionType), "desc here", defaultValue = "out") :: Nil
+      case "both" => Argument("direction", OptionInputType(BothDirectionType), "desc here", defaultValue = "out") :: Nil
+    }
+
+    val indices = repo.labelIndiceMap(label)
+
+    val indexEnumType = EnumType(
+      s"Label_Index_${label.label.toValidName}",
+      description = Option("desc here"),
+      values =
+        if (indices.isEmpty) EnumValue("_", value = "_") :: Nil
+        else indices.map(idx => EnumValue(idx.name.toValidName, value = idx.name))
+    )
+
+    val paramArgs = List(
+      Argument("offset", OptionInputType(IntType), "desc here", defaultValue = 0),
+      Argument("limit", OptionInputType(IntType), "desc here", defaultValue = 100),
+      Argument("index", OptionInputType(indexEnumType), "desc here"),
+      Argument("filter", OptionInputType(StringType), "desc here")
+    )
+
+    val edgeTypeField: Field[GraphRepository, Any] = Field(
+      s"${label.label.toValidName}",
+      ListType(LabelType),
+      arguments = dirArgs ++ paramArgs,
+      description = Some("fetch edges"),
+      resolve = { c =>
+        implicit val ec = c.ctx.ec
+
+        val edgeQueryParam = graphql.types.FieldResolver.label(label, c)
+        val empty = Seq.empty[S2EdgeLike]
+
+        DeferredValue(
+          GraphRepository.edgeFetcher.deferOpt(edgeQueryParam)
+        ).map(m => m.fold(empty)(m => m._2))
+      }
+    )
+
+    edgeTypeField
+  }
+
+
+  def makeColumnType(column: ServiceColumn)
+                    (implicit repo: GraphRepository): ObjectType[GraphRepository, Any] = {
+
+    val objectName = s"ServiceColumn_${column.service.serviceName.toValidName}_${column.columnName.toValidName}"
+
+    lazy val serviceColumnFields = makeServiceColumnFields(column)
+
+    val ColumnType = ObjectType(
+      objectName,
+      () => fields[GraphRepository, Any](serviceColumnFields: _*)
+    )
+
+    ColumnType
+  }
+
+  def makeServiceType(service: Service)(implicit repo: GraphRepository): ObjectType[GraphRepository, Any] = {
+
+    val _serviceFields = makeServiceFields(service)
+    val serviceFields = if (_serviceFields.isEmpty) DummyObjectTypeField :: _serviceFields else _serviceFields
+
+    ObjectType(
+      s"Service_${service.serviceName.toValidName}",
+      fields[GraphRepository, Any](serviceFields: _*)
+    )
+  }
+
+  def makeServiceFields(service: Service)(implicit repo: GraphRepository): List[Field[GraphRepository, Any]] = {
+
+    val serviceColumns = repo.serviceColumnMap(service)
+    val columnsOnService = serviceColumns.map { column =>
+
+      val ColumnType = makeColumnType(column)
 
       Field(column.columnName.toValidName,
         ListType(ColumnType),
@@ -152,110 +241,121 @@
       ): Field[GraphRepository, Any]
     }
 
-    columnsOnService
+    columnsOnService.toList
   }
 
-  def makeLabelField(dir: String, label: Label, allLabels: Seq[Label]): Field[GraphRepository, Any] = {
+  def makeLabelType(dir: String, label: Label)
+                   (implicit repo: GraphRepository): ObjectType[GraphRepository, Any] = {
+
+    val objectName = s"Label_${label.label.toValidName}_${dir}"
+
+    lazy val labelFields = makeLabelFields(dir, label)
+
+    val LabelType = ObjectType(
+      objectName,
+      () => fields[GraphRepository, Any](labelFields: _*)
+    )
+
+    LabelType
+  }
+
+  def makeLabelFields(dir: String, label: Label)
+                     (implicit repo: GraphRepository): List[Field[GraphRepository, Any]] = {
+
+    val relatedMetas = repo.labelMetaMap(label)
     val labelReserved = List("direction" -> "string", "timestamp" -> "long")
-    val labelProps = label.labelMetas.map { lm => lm.name -> lm.dataType }
+
+    val labelProps = relatedMetas
+      .filterNot(l => labelReserved.exists(kv => kv._1 == l.name))
+      .map { lm => lm.name -> lm.dataType }
 
     val column = if (dir == "out") label.tgtColumn else label.srcColumn
 
-    lazy val labelFields: List[Field[GraphRepository, Any]] =
-      (labelReserved ++ labelProps).map { case (k, v) => makeGraphElementField(k.toValidName, v) }
+    val labelFields = labelReserved.map { case (k, v) => makeGraphElementField(k.toValidName, v) }
+    val propFields = labelProps.map { case (k, v) => makeGraphElementField(k.toValidName, v) }
 
-    lazy val labelPropField = wrapField(s"Label_${label.label.toValidName}_props", "props", labelFields)
+    val labelPropField =
+      if (propFields.isEmpty) Nil
+      else List(wrapField(s"Label_${label.label.toValidName}_props", "props", propFields))
 
-    lazy val labelColumnType = ObjectType(s"Label_${label.label.toValidName}_${column.columnName.toValidName}",
-      () => makeServiceColumnFields(column, allLabels)
-    )
+    val labelColumnType = makeColumnType(column)
 
-    lazy val serviceColumnField: Field[GraphRepository, Any] = Field(column.columnName, labelColumnType, resolve = c => {
-      implicit val ec = c.ctx.ec
-      val vertexQueryParam = FieldResolver.serviceColumnOnLabel(c)
-
-      DeferredValue(GraphRepository.vertexFetcher.defer(vertexQueryParam)).map(m => m._2.head)
-    })
-
-    lazy val EdgeType = ObjectType(
-      s"Label_${label.label.toValidName}_${column.columnName.toValidName}_${dir}",
-      () => fields[GraphRepository, Any](
-        List(serviceColumnField, labelPropField) ++ labelFields.filterNot(_.name == column.columnName): _*)
-    )
-
-    val dirArgs = dir match {
-      case "in" => Argument("direction", OptionInputType(InDirectionType), "desc here", defaultValue = "in") :: Nil
-      case "out" => Argument("direction", OptionInputType(OutDirectionType), "desc here", defaultValue = "out") :: Nil
-      case "both" => Argument("direction", OptionInputType(BothDirectionType), "desc here", defaultValue = "out") :: Nil
-    }
-
-    val idxNames = label.indices.map { idx =>
-      EnumValue(idx.name.toValidName, value = idx.name.toValidName)
-    }
-
-    val indexEnumType = EnumType(
-      s"Label_Index_${label.label.toValidName}",
-      description = Option("desc here"),
-      values = idxNames
-    )
-
-    val paramArgs = List(
-      Argument("offset", OptionInputType(IntType), "desc here", defaultValue = 0),
-      Argument("limit", OptionInputType(IntType), "desc here", defaultValue = 100),
-      Argument("index", OptionInputType(indexEnumType), "desc here"),
-      Argument("filter", OptionInputType(StringType), "desc here")
-    )
-
-    lazy val edgeTypeField: Field[GraphRepository, Any] = Field(
-      s"${label.label.toValidName}",
-      ListType(EdgeType),
-      arguments = dirArgs ++ paramArgs,
-      description = Some("fetch edges"),
-      resolve = { c =>
+    val serviceColumnField: Field[GraphRepository, Any] =
+      Field(column.columnName.toValidName, labelColumnType, resolve = c => {
         implicit val ec = c.ctx.ec
+        val vertexQueryParam = FieldResolver.serviceColumnOnLabel(c)
 
-        val edgeQueryParam = graphql.types.FieldResolver.label(label, c)
-        val empty = Seq.empty[S2EdgeLike]
+        DeferredValue(GraphRepository.vertexFetcher.defer(vertexQueryParam)).map(m => m._2.head)
+      })
 
-        DeferredValue(GraphRepository.edgeFetcher.deferOpt(edgeQueryParam)).map(m => m.fold(empty)(_._2))
-      }
-    )
+    List(serviceColumnField) ++ labelPropField ++ labelFields.filterNot(_.name.toValidName == column.columnName.toValidName)
+  }
 
-    edgeTypeField
+  def services(): List[Service] = {
+    Service.findAll().distinct
+  }
+
+  def serviceColumns(): List[ServiceColumn] = {
+    val allServices = services().toSet
+
+    ServiceColumn
+      .findAll()
+      .filter(sc => allServices(sc.service))
+      .distinct
+  }
+
+  def labels() = {
+    val allServiceColumns = serviceColumns().toSet
+
+    Label
+      .findAll()
+      .filter(l => allServiceColumns(l.srcColumn) || allServiceColumns(l.tgtColumn))
+      .distinct
+  }
+
+  def labelIndices() = {
+    LabelIndex.findAll()
+  }
+
+  def labelMetas() = {
+    LabelMeta.findAll()
+  }
+
+  def columnMetas() = {
+    ColumnMeta.findAll()
   }
 }
 
-class S2Type(repo: GraphRepository) {
+class S2Type(_repo: GraphRepository) {
+  implicit val repo = _repo
 
   import S2Type._
   import org.apache.s2graph.graphql.bind.Unmarshaller._
 
-  implicit val graphRepository = repo
-
   /**
     * fields
     */
-  lazy val serviceFields: List[Field[GraphRepository, Any]] = repo.services().map { service =>
-    lazy val serviceFields = DummyObjectTypeField :: makeServiceField(service, repo.labels())
+  val serviceFields: List[Field[GraphRepository, Any]] = {
 
-    lazy val ServiceType = ObjectType(
-      s"Service_${service.serviceName.toValidName}",
-      fields[GraphRepository, Any](serviceFields: _*)
-    )
+    repo.services.flatMap { service =>
+      val ServiceType = makeServiceType(service)
 
-    Field(
-      service.serviceName.toValidName,
-      ServiceType,
-      description = Some(s"serviceName: ${service.serviceName}"),
-      resolve = _ => service
-    ): Field[GraphRepository, Any]
+      val f = Field(
+        service.serviceName.toValidName,
+        ServiceType,
+        description = Some(s"serviceName: ${service.serviceName}"),
+        resolve = _ => service
+      ): Field[GraphRepository, Any]
+
+      List(f)
+    }
   }
 
   /**
     * arguments
     */
   lazy val addVertexArg = {
-    val serviceArguments = repo.services().map { service =>
+    val serviceArguments = repo.services.map { service =>
       val serviceFields = DummyInputField +: makeInputFieldsOnService(service)
 
       val ServiceInputType = InputObjectType[List[AddVertexParam]](
@@ -269,7 +369,7 @@
   }
 
   lazy val addEdgeArg = {
-    val labelArguments = repo.labels().map { label =>
+    val labelArguments = repo.labels.map { label =>
       val labelFields = DummyInputField +: makeInputFieldsOnLabel(label)
       val labelInputType = InputObjectType[AddEdgeParam](
         s"Input_label_${label.label.toValidName}_param",
@@ -287,7 +387,7 @@
     * Provide s2graph query / mutate API
     * - Fields is created(or changed) for metadata is changed.
     */
-  lazy val queryFields = serviceFields
+  val queryFields = serviceFields
 
   lazy val mutationFields: List[Field[GraphRepository, Any]] = List(
     Field("addVertex",
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
index 7451023..835cd0b 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/SchemaDef.scala
@@ -20,13 +20,16 @@
 package org.apache.s2graph.graphql.types
 
 import org.apache.s2graph.graphql.repository.GraphRepository
+import sangria.schema.ObjectType
+
+import scala.collection.mutable
 
 /**
   * S2Graph GraphQL schema.
   *
   * When a Label or Service is created, the GraphQL schema is created dynamically.
   */
-class SchemaDef(g: GraphRepository) {
+class SchemaDef(g: GraphRepository, withAdmin: Boolean = false) {
 
   import sangria.schema._
 
@@ -39,19 +42,26 @@
     fields(s2Type.queryFields ++ queryManagementFields: _*)
   )
 
-  val mutateManagementFields = List(wrapField("MutationManagement", "Management", s2ManagementType.mutationFields))
-  val S2MutationType = ObjectType[GraphRepository, Any](
-    "Mutation",
-    fields(s2Type.mutationFields ++ mutateManagementFields: _*)
-  )
+  lazy val mutateManagementFields = List(wrapField("MutationManagement", "Management", s2ManagementType.mutationFields))
+
+  val S2MutationType =
+    if (!withAdmin) None
+    else {
+      val mutationTpe = ObjectType[GraphRepository, Any](
+        "Mutation",
+        fields(s2Type.mutationFields ++ mutateManagementFields: _*)
+      )
+
+      Option(mutationTpe)
+    }
 
   val directives = S2Directive.Transform :: BuiltinDirectives
 
   private val s2Schema = Schema(
     S2QueryType,
-    Option(S2MutationType),
+    S2MutationType,
     directives = directives
   )
 
-  val S2GraphSchema = s2Schema
+  val schema = s2Schema
 }
diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt
index f647040..47ec835 100644
--- a/s2jobs/build.sbt
+++ b/s2jobs/build.sbt
@@ -38,6 +38,7 @@
   "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
   "org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion,
   "com.github.scopt" %% "scopt" % "3.7.0",
+  "io.thekraken" % "grok" % "0.1.5",
   "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test
 )
 
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
index 6d9f509..026b688 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/Job.scala
@@ -35,7 +35,7 @@
 
       dfMap.put(source.conf.name, df)
     }
-    logger.debug(s"valid source DF set : ${dfMap.keySet}")
+    logger.info(s"valid source DF set : ${dfMap.keySet}")
 
     // process
     var processRst:Seq[(String, DataFrame)] = Nil
@@ -45,7 +45,7 @@
 
     } while(processRst.nonEmpty)
 
-    logger.debug(s"valid named DF set : ${dfMap.keySet}")
+    logger.info(s"valid named DF set : ${dfMap.keySet}")
 
     // sinks
     jobDesc.sinks.foreach { s =>
@@ -63,7 +63,7 @@
     val dfKeys = dfMap.keySet
 
     processes.filter{ p =>
-        val existAllInput = p.conf.inputs.forall { input => dfKeys(input) }
+        val existAllInput = p.conf.inputs.forall{ input => dfKeys(input) }
         !dfKeys(p.conf.name) && existAllInput
     }
     .map { p =>
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 9a529aa..0943056 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -21,28 +21,32 @@
 
 import play.api.libs.json.{JsValue, Json}
 import org.apache.s2graph.s2jobs.task._
+import org.apache.s2graph.s2jobs.udfs.UdfOption
 
 case class JobDescription(
                            name:String,
+                           udfs: Seq[UdfOption],
                            sources:Seq[Source],
                            processes:Seq[task.Process],
                            sinks:Seq[Sink]
                          )
 
 object JobDescription extends Logger {
-  val dummy = JobDescription("dummy", Nil, Nil, Nil)
+  val dummy = JobDescription("dummy", Nil, Nil, Nil, Nil)
 
   def apply(jsVal:JsValue):JobDescription = {
     implicit val TaskConfReader = Json.reads[TaskConf]
+    implicit val UdfOptionReader = Json.reads[UdfOption]
 
     logger.debug(s"JobDescription: ${jsVal}")
 
     val jobName = (jsVal \ "name").as[String]
+    val udfs = (jsVal \ "udfs").asOpt[Seq[UdfOption]].getOrElse(Nil)
     val sources = (jsVal \ "source").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSource(conf))
     val processes = (jsVal \ "process").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getProcess(conf))
     val sinks = (jsVal \ "sink").asOpt[Seq[TaskConf]].getOrElse(Nil).map(conf => getSink(jobName, conf))
 
-    JobDescription(jobName, sources, processes, sinks)
+    JobDescription(jobName, udfs, sources, processes, sinks)
   }
 
   def getSource(conf:TaskConf):Source = {
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
index a64a399..0a76274 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobLauncher.scala
@@ -19,6 +19,7 @@
 
 package org.apache.s2graph.s2jobs
 
+import org.apache.s2graph.s2jobs.udfs.Udf
 import org.apache.spark.sql.SparkSession
 import play.api.libs.json.{JsValue, Json}
 
@@ -82,6 +83,13 @@
       .enableHiveSupport()
       .getOrCreate()
 
+    // register udfs
+    jobDescription.udfs.foreach{ udfOption =>
+      val udf = Class.forName(udfOption.`class`).newInstance().asInstanceOf[Udf]
+      logger.info((s"[udf register] ${udfOption}"))
+      udf.register(ss, udfOption.name, udfOption.params.getOrElse(Map.empty))
+    }
+
     val job = new Job(ss, jobDescription)
     job.run()
   }
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 4de585c..dd6f41b 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -20,22 +20,17 @@
 package org.apache.s2graph.s2jobs.task
 
 import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
-import org.apache.hadoop.util.ToolRunner
-import org.apache.s2graph.core.{GraphUtil, Management}
+import org.apache.s2graph.core.GraphUtil
 import org.apache.s2graph.s2jobs.S2GraphHelper
-import org.apache.s2graph.s2jobs.loader.{GraphFileOptions, HFileGenerator, SparkBulkLoaderTransformer}
+import org.apache.s2graph.s2jobs.loader.{HFileGenerator, SparkBulkLoaderTransformer}
 import org.apache.s2graph.s2jobs.serde.reader.RowBulkFormatReader
 import org.apache.s2graph.s2jobs.serde.writer.KeyValueWriter
-import org.apache.s2graph.spark.sql.streaming.S2SinkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.streaming.{DataStreamWriter, OutputMode, Trigger}
 import org.elasticsearch.spark.sql.EsSparkSQL
 
-import scala.collection.mutable.ListBuffer
-import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
 
 /**
   * Sink
@@ -132,6 +127,8 @@
     logger.debug(s"${LOG_PREFIX} schema: ${df.schema}")
 
     conf.options.getOrElse("format", "json") match {
+      case "raw" =>
+        df
       case "tsv" =>
         val delimiter = conf.options.getOrElse("delimiter", "\t")
 
@@ -212,9 +209,10 @@
   * @param conf
   */
 class S2GraphSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
-  import scala.collection.JavaConversions._
-  import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
   import org.apache.s2graph.core.S2GraphConfigs._
+  import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+  import scala.collection.JavaConversions._
 
   override def mandatoryOptions: Set[String] = Set()
 
@@ -261,6 +259,10 @@
   }
 
   private def writeBatchWithMutate(df:DataFrame):Unit = {
+    import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+
+    import scala.collection.JavaConversions._
+
     // TODO: FIX THIS. overwrite local cache config.
     val mergedOptions = conf.options ++ TaskConf.parseLocalCacheConfigs(conf)
     val graphConfig: Config = ConfigFactory.parseMap(mergedOptions).withFallback(ConfigFactory.load())
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
new file mode 100644
index 0000000..ebcb41d
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Grok.scala
@@ -0,0 +1,36 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.utils.GrokHelper
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.{DataType, StructType}
+import play.api.libs.json.{JsValue, Json}
+
+class Grok extends Udf {
+  import org.apache.spark.sql.functions.udf
+
+  def register(ss: SparkSession, name:String, options:Map[String, String]) = {
+    // grok
+    val patternDir = options.getOrElse("patternDir", "/tmp")
+    val patternFiles = options.getOrElse("patternFiles", "").split(",").toSeq
+    val patterns = Json.parse(options.getOrElse("patterns", "{}")).asOpt[Map[String, String]].getOrElse(Map.empty)
+    val compilePattern = options("compilePattern")
+    val schemaOpt = options.get("schema")
+
+    patternFiles.foreach { patternFile =>
+      ss.sparkContext.addFile(s"${patternDir}/${patternFile}")
+    }
+
+    implicit val grok = GrokHelper.getGrok(name, patternFiles, patterns, compilePattern)
+
+    val f = if(schemaOpt.isDefined) {
+      val schema = DataType.fromJson(schemaOpt.get)
+      implicit val keys:Array[String] = schema.asInstanceOf[StructType].fieldNames
+      udf(GrokHelper.grokMatchWithSchema _, schema)
+    } else {
+      udf(GrokHelper.grokMatch _)
+    }
+
+
+    ss.udf.register(name, f)
+  }
+}
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
new file mode 100644
index 0000000..821527c
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/udfs/Udf.scala
@@ -0,0 +1,14 @@
+package org.apache.s2graph.s2jobs.udfs
+
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.sql.SparkSession
+
+case class UdfOption(name:String, `class`:String, params:Option[Map[String, String]] = None)
+trait Udf extends Serializable with Logger {
+  def register(ss: SparkSession, name:String, options:Map[String, String])
+}
+
+
+
+
+
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
new file mode 100644
index 0000000..37485c8
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/utils/GrokHelper.scala
@@ -0,0 +1,59 @@
+package org.apache.s2graph.s2jobs.utils
+
+import io.thekraken.grok.api.Grok
+import org.apache.s2graph.s2jobs.Logger
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.Row
+
+import scala.collection.mutable
+
+object GrokHelper extends Logger {
+  private val grokPool:mutable.Map[String, Grok] = mutable.Map.empty
+
+  def getGrok(name:String, patternFiles:Seq[String], patterns:Map[String, String], compilePattern:String):Grok = {
+    if (grokPool.get(name).isEmpty) {
+      println(s"Grok '$name' initialized..")
+      val grok = new Grok()
+      patternFiles.foreach { patternFile =>
+        val filePath = SparkFiles.get(patternFile)
+        println(s"[Grok][$name] add pattern file : $patternFile  ($filePath)")
+        grok.addPatternFromFile(filePath)
+      }
+      patterns.foreach { case (name, pattern) =>
+        println(s"[Grok][$name] add pattern : $name ($pattern)")
+        grok.addPattern(name, pattern)
+      }
+
+      grok.compile(compilePattern)
+      println(s"[Grok][$name] patterns: ${grok.getPatterns}")
+      grokPool.put(name, grok)
+    }
+
+    grokPool(name)
+  }
+
+  def grokMatch(text:String)(implicit grok:Grok):Option[Map[String, String]] = {
+    import scala.collection.JavaConverters._
+
+    val m = grok.`match`(text)
+    m.captures()
+    val rstMap = m.toMap.asScala.toMap
+      .filter(_._2 != null)
+      .map{ case (k, v) =>  k -> v.toString}
+    if (rstMap.isEmpty) None else Some(rstMap)
+  }
+
+  def grokMatchWithSchema(text:String)(implicit grok:Grok, keys:Array[String]):Option[Row] = {
+    import scala.collection.JavaConverters._
+
+    val m = grok.`match`(text)
+    m.captures()
+
+    val rstMap = m.toMap.asScala.toMap
+    if (rstMap.isEmpty) None
+    else {
+      val l = keys.map { key => rstMap.getOrElse(key, null)}
+      Some(Row.fromSeq(l))
+    }
+  }
+}