[Gearpump 311] refactor state management
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
index e1999ed..edbe9a1 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
@@ -19,7 +19,6 @@
 package org.apache.gearpump.streaming.refactor.coder;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.CountingOutputStream;
 
@@ -130,5 +129,4 @@
                     coder, Joiner.on("%n  ").join(reasons));
         }
     }
-
 }
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
index e706f4f..3fb8034 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
@@ -20,16 +20,10 @@
 import org.apache.gearpump.Message
 import org.apache.gearpump.streaming.dsl.window.api.Trigger
 
-<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
 trait ReduceFnRunner {
 
   def process(message: Message): Unit
 
   def onTrigger(trigger: Trigger): Unit
-=======
-trait State {
 
-  def clear: Unit
->>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
-
-}
+}
\ No newline at end of file
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
index 6665766..e79f271 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
 package org.apache.gearpump.streaming.refactor.sink
 
 import akka.actor.ActorSystem
@@ -30,24 +29,8 @@
       parallelism: Int = 1,
       description: String = "",
       taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
-    : Processor[DataSinkTask] = {
+  : Processor[DataSinkTask] = {
     Processor[DataSinkTask](parallelism, description = description,
       taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink))
   }
-=======
-package org.apache.gearpump.streaming.refactor.state
-
-import org.apache.gearpump.streaming.refactor.state.api.State
-
-trait StateTag[StateT <: State] extends Serializable {
-
-  def appendTo(sb: Appendable)
-
-  def getId: String
-
-  def getSpec: StateSpec[StateT]
-
-  def bind(binder: StateBinder): StateT
-
->>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
-}
+}
\ No newline at end of file
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
index 8832aee..f538400 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
@@ -23,16 +23,7 @@
 import org.apache.gearpump.streaming.refactor.coder.Coder
 import org.apache.gearpump.streaming.refactor.state.api.StateInternals
 
-<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
-/**
- *
- */
 trait RuntimeContext {
-=======
-trait StateSpec[StateT <: State] extends Serializable {
-
-  def bind(id: String, binder: StateBinder): StateT
->>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
 
   def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals
 
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
new file mode 100644
index 0000000..91cdbe5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.State
+
+trait StateSpec[StateT <: State] extends Serializable {
+
+  def bind(id: String, binder: StateBinder): StateT
+
+  def offerCoders(coders: Array[Coder[StateT]]): Unit
+
+  def finishSpecifying: Unit
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
new file mode 100644
index 0000000..9fa865d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.state.api.State
+
+trait StateTag[StateT <: State] extends Serializable {
+
+  def appendTo(sb: Appendable)
+
+  def getId: String
+
+  def getSpec: StateSpec[StateT]
+
+  def bind(binder: StateBinder): StateT
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
index 0f94052..531ff66 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
@@ -31,16 +31,8 @@
 import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
-import org.apache.gearpump.util.LogUtil
 import org.apache.gearpump.{Message, TimeStamp}
 
-<<<<<<< HEAD
-=======
-object StatefulTask {
-  val LOG = LogUtil.getLogger(getClass)
-}
-
->>>>>>> e6ce91c... [Gearpump 311] refactor state management
 abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
 
@@ -60,11 +52,7 @@
   // core state data
   var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null
 
-<<<<<<< HEAD
   def open(runtimeContext: RuntimeContext): Unit = {}
-=======
-  def open: Unit = {}
->>>>>>> e6ce91c... [Gearpump 311] refactor state management
 
   def invoke(message: Message): Unit
 
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
new file mode 100644
index 0000000..5c01977
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait State {
+
+  def clear: Unit
+
+}