blob: 4dbb07f4a36875c67e87e82301345ea0c5c30f58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.streaming.refactor.state
import java.util
import java.util.Map.Entry
import java.util.{ArrayList, HashSet, List, Set}
import java.lang.Iterable
import com.google.common.collect.{HashBasedTable, Table}
import org.apache.gearpump.streaming.refactor.coder.Coder
import org.apache.gearpump.streaming.refactor.state.InMemoryGlobalStateInternals.InMemoryStateBinder
import org.apache.gearpump.streaming.refactor.state.api._
class InMemoryGlobalStateInternals[K] protected(key: K) extends StateInternals {
protected val inMemoryStateTable: InMemoryGlobalStateInternals.StateTable =
new InMemoryGlobalStateInternals.StateTable {
override def binderForNamespace(namespace: StateNamespace): StateBinder = {
new InMemoryStateBinder
}
}
override def getKey: Any = key
override def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T =
inMemoryStateTable.get(namespace, address)
}
object InMemoryGlobalStateInternals {
abstract class StateTable {
val stateTable: Table[StateNamespace, StateTag[_], State] = HashBasedTable.create()
def get[StateT <: State](namespace: StateNamespace, tag: StateTag[StateT]): StateT = {
val storage: State = stateTable.get(namespace, tag)
if (storage != null) {
storage.asInstanceOf[StateT]
}
val typedStorage: StateT = tag.getSpec.bind(tag.getId, binderForNamespace(namespace))
stateTable.put(namespace, tag, typedStorage)
typedStorage
}
def clearNamespace(namespace: StateNamespace): Unit = stateTable.rowKeySet().remove(namespace)
def clear: Unit = stateTable.clear()
def values: Iterable[State] = stateTable.values().asInstanceOf[Iterable[State]]
def isNamespaceInUse(namespace: StateNamespace): Boolean = stateTable.containsRow(namespace)
def getTagsInUse(namespace: StateNamespace): java.util.Map[StateTag[_], State]
= stateTable.row(namespace)
def getNamespacesInUse(): java.util.Set[StateNamespace] = stateTable.rowKeySet()
def binderForNamespace(namespace: StateNamespace): StateBinder
}
class InMemoryStateBinder extends StateBinder {
override def bindValue[T](id: String, spec: StateSpec[ValueState[T]],
coder: Coder[T]): ValueState[T] = new InMemoryValueState[T]()
override def bindBag[T](id: String, spec: StateSpec[BagState[T]],
elemCoder: Coder[T]): BagState[T] = new InMemoryBagState[T]()
override def bindSet[T](id: String, spec: StateSpec[SetState[T]],
elemCoder: Coder[T]): SetState[T] = new InMemorySetState[T]()
override def bindMap[KeyT, ValueT](id: String, spec: StateSpec[MapState[KeyT, ValueT]],
mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): MapState[KeyT, ValueT] =
new InMemoryMapState[KeyT, ValueT]()
}
trait InMemoryState[T <: InMemoryState[T]] {
def isCleared: Boolean
def copy: T
}
class InMemoryBagState[T] extends BagState[T] with InMemoryState[InMemoryBagState[T]] {
private var contents: List[T] = new ArrayList[T]
override def readLater: BagState[T] = this
override def isCleared: Boolean = contents.isEmpty
override def copy: InMemoryBagState[T] = {
val that: InMemoryBagState[T] = new InMemoryBagState[T]
that.contents.addAll(this.contents)
that
}
override def add(value: T): Unit = contents.add(value)
override def isEmpty: ReadableState[Boolean] = {
new ReadableState[Boolean] {
override def readLater: ReadableState[Boolean] = {
this
}
override def read: Boolean = {
contents.isEmpty
}
}
}
override def clear: Unit = contents = new ArrayList[T]
override def read: Iterable[T] = contents.asInstanceOf[Iterable[T]]
}
class InMemoryValueState[T] extends ValueState[T] with InMemoryState[InMemoryValueState[T]] {
private var cleared: Boolean = true
private var value: T = _
def write(input: T): Unit = {
cleared = false
this.value = input
}
def readLater: InMemoryValueState[T] = this
def isCleared: Boolean = cleared
def copy: InMemoryValueState[T] = {
val that: InMemoryValueState[T] = new InMemoryValueState[T]
if (!this.cleared) {
that.cleared = this.cleared
that.value = this.value
}
that
}
def clear: Unit = {
value = null.asInstanceOf[T]
cleared = true
}
def read: T = value
}
class InMemoryMapState[K, V] extends MapState[K, V] with InMemoryState[InMemoryMapState[K, V]] {
private var contents: util.Map[K, V] = new util.HashMap[K, V]
override def put(key: K, value: V): Unit = contents.put(key, value)
override def putIfAbsent(key: K, value: V): ReadableState[V] = {
var v: V = contents.get(key)
if (v == null) {
v = contents.put(key, value)
}
ReadableStates.immediate(v)
}
override def remove(key: K): Unit = contents.remove(key)
override def get(key: K): ReadableState[V] = ReadableStates.immediate(contents.get(key))
override def keys: ReadableState[Iterable[K]] =
ReadableStates.immediate(contents.keySet().asInstanceOf[Iterable[K]])
override def values: ReadableState[Iterable[V]] =
ReadableStates.immediate(contents.values().asInstanceOf[Iterable[V]])
override def entries: ReadableState[Iterable[Entry[K, V]]] =
ReadableStates.immediate(contents.entrySet().asInstanceOf[Iterable[util.Map.Entry[K, V]]])
override def isCleared: Boolean = contents.isEmpty
override def copy: InMemoryMapState[K, V] = {
val that: InMemoryMapState[K, V] = new InMemoryMapState
that.contents.putAll(this.contents)
that
}
override def clear: Unit = contents = new util.HashMap[K, V]
}
class InMemorySetState[T] extends SetState[T] with InMemoryState[InMemorySetState[T]] {
private var contents: Set[T] = new HashSet[T]
override def contains(t: T): ReadableState[Boolean] =
ReadableStates.immediate(contents.contains(t))
override def addIfAbsent(t: T): ReadableState[Boolean] = {
val alreadyContained: Boolean = contents.contains(t)
contents.add(t)
ReadableStates.immediate(!alreadyContained)
}
override def remove(t: T): Unit = contents.remove(t)
override def readLater: SetState[T] = this
override def isCleared: Boolean = contents.isEmpty
override def copy: InMemorySetState[T] = {
val that: InMemorySetState[T] = new InMemorySetState[T]
that.contents.addAll(this.contents)
that
}
override def add(value: T): Unit = contents.add(value)
override def isEmpty: ReadableState[Boolean] = {
new ReadableState[Boolean] {
override def readLater: ReadableState[Boolean] = this
override def read: Boolean = contents.isEmpty
}
}
override def clear: Unit = contents = new HashSet[T]
override def read: Iterable[T] = contents.asInstanceOf[Iterable[T]]
}
}
object ReadableStates {
def immediate[T](value: T): ReadableState[T] = {
new ReadableState[T] {
override def readLater: ReadableState[T] = {
this
}
override def read: T = {
value
}
}
}
}