[Gearpump 311] refactor state management
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
index e1999ed..edbe9a1 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
@@ -19,7 +19,6 @@
package org.apache.gearpump.streaming.refactor.coder;
import com.google.common.base.Joiner;
-import com.google.common.base.Objects;
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingOutputStream;
@@ -130,5 +129,4 @@
coder, Joiner.on("%n ").join(reasons));
}
}
-
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
index e706f4f..3fb8034 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
@@ -20,16 +20,10 @@
import org.apache.gearpump.Message
import org.apache.gearpump.streaming.dsl.window.api.Trigger
-<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
trait ReduceFnRunner {
def process(message: Message): Unit
def onTrigger(trigger: Trigger): Unit
-=======
-trait State {
- def clear: Unit
->>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
-
-}
+}
\ No newline at end of file
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
index 6665766..e79f271 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
package org.apache.gearpump.streaming.refactor.sink
import akka.actor.ActorSystem
@@ -30,24 +29,8 @@
parallelism: Int = 1,
description: String = "",
taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
- : Processor[DataSinkTask] = {
+ : Processor[DataSinkTask] = {
Processor[DataSinkTask](parallelism, description = description,
taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink))
}
-=======
-package org.apache.gearpump.streaming.refactor.state
-
-import org.apache.gearpump.streaming.refactor.state.api.State
-
-trait StateTag[StateT <: State] extends Serializable {
-
- def appendTo(sb: Appendable)
-
- def getId: String
-
- def getSpec: StateSpec[StateT]
-
- def bind(binder: StateBinder): StateT
-
->>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
-}
+}
\ No newline at end of file
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
index 8832aee..f538400 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
@@ -23,16 +23,7 @@
import org.apache.gearpump.streaming.refactor.coder.Coder
import org.apache.gearpump.streaming.refactor.state.api.StateInternals
-<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
-/**
- *
- */
trait RuntimeContext {
-=======
-trait StateSpec[StateT <: State] extends Serializable {
-
- def bind(id: String, binder: StateBinder): StateT
->>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
new file mode 100644
index 0000000..91cdbe5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.State
+
+trait StateSpec[StateT <: State] extends Serializable {
+
+ def bind(id: String, binder: StateBinder): StateT
+
+ def offerCoders(coders: Array[Coder[StateT]]): Unit
+
+ def finishSpecifying: Unit
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
new file mode 100644
index 0000000..9fa865d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.state.api.State
+
+trait StateTag[StateT <: State] extends Serializable {
+
+ def appendTo(sb: Appendable)
+
+ def getId: String
+
+ def getSpec: StateSpec[StateT]
+
+ def bind(binder: StateBinder): StateT
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
index 0f94052..531ff66 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
@@ -31,16 +31,8 @@
import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock}
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
-import org.apache.gearpump.util.LogUtil
import org.apache.gearpump.{Message, TimeStamp}
-<<<<<<< HEAD
-=======
-object StatefulTask {
- val LOG = LogUtil.getLogger(getClass)
-}
-
->>>>>>> e6ce91c... [Gearpump 311] refactor state management
abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
@@ -60,11 +52,7 @@
// core state data
var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null
-<<<<<<< HEAD
def open(runtimeContext: RuntimeContext): Unit = {}
-=======
- def open: Unit = {}
->>>>>>> e6ce91c... [Gearpump 311] refactor state management
def invoke(message: Message): Unit
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
new file mode 100644
index 0000000..5c01977
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait State {
+
+ def clear: Unit
+
+}