[Gearpump 311] refactor state management
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java
new file mode 100644
index 0000000..e152b48
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/AtomicCoder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AtomicCoder<T> extends StructuredCoder<T> {
+
+ @Override
+ public void verifyDeterministic() {}
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public final List<? extends Coder<?>> getComponents() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public final boolean equals(Object other) {
+ return other != null && this.getClass().equals(other.getClass());
+ }
+
+ @Override
+ public final int hashCode() {
+ return this.getClass().hashCode();
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java
new file mode 100644
index 0000000..27ec539
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianIntegerCoder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class BigEndianIntegerCoder extends AtomicCoder<Integer> {
+
+ public static BigEndianIntegerCoder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final BigEndianIntegerCoder INSTANCE = new BigEndianIntegerCoder();
+
+ private BigEndianIntegerCoder() {}
+
+ @Override
+ public void encode(Integer value, OutputStream outStream)
+ throws CoderException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Integer");
+ }
+
+ try {
+ new DataOutputStream(outStream).writeInt(value);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Integer decode(InputStream inStream)
+ throws CoderException {
+ try {
+ return new DataInputStream(inStream).readInt();
+ } catch (EOFException | UTFDataFormatException exn) {
+ // These exceptions correspond to decoding problems, so change
+ // what kind of exception they're branded as.
+ throw new CoderException(exn);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public void verifyDeterministic() {}
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Integer value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(Integer value) {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Integer");
+ }
+ return 4;
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java
new file mode 100644
index 0000000..c788729
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigEndianLongCoder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class BigEndianLongCoder extends AtomicCoder<Long> {
+
+ public static BigEndianLongCoder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final BigEndianLongCoder INSTANCE = new BigEndianLongCoder();
+
+ private BigEndianLongCoder() {}
+
+ @Override
+ public void encode(Long value, OutputStream outStream)
+ throws CoderException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Long");
+ }
+ try {
+ new DataOutputStream(outStream).writeLong(value);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public Long decode(InputStream inStream)
+ throws CoderException {
+ try {
+ return new DataInputStream(inStream).readLong();
+ } catch (EOFException | UTFDataFormatException exn) {
+ // These exceptions correspond to decoding problems, so change
+ // what kind of exception they're branded as.
+ throw new CoderException(exn);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Long value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(Long value) {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Long");
+ }
+ return 8;
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java
new file mode 100644
index 0000000..4a65992
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BigIntegerCoder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigInteger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class BigIntegerCoder extends AtomicCoder<BigInteger> {
+
+ public static BigIntegerCoder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final BigIntegerCoder INSTANCE = new BigIntegerCoder();
+ private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+
+ private BigIntegerCoder() {
+ }
+
+ @Override
+ public void encode(BigInteger value, OutputStream outStream)
+ throws CoderException {
+ checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
+ BYTE_ARRAY_CODER.encode(value.toByteArray(), outStream);
+ }
+
+ @Override
+ public BigInteger decode(InputStream inStream)
+ throws CoderException {
+ return new BigInteger(BYTE_ARRAY_CODER.decode(inStream));
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ BYTE_ARRAY_CODER.verifyDeterministic();
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(BigInteger value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(BigInteger value) {
+ checkNotNull(value, String.format("cannot encode a null %s", BigInteger.class.getSimpleName()));
+ return BYTE_ARRAY_CODER.getEncodedElementByteSize(value.toByteArray());
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java
new file mode 100644
index 0000000..119e6eb
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/BufferedElementCountingOutputStream.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class BufferedElementCountingOutputStream extends OutputStream {
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private final ByteBuffer buffer;
+ private final OutputStream os;
+ private boolean finished;
+ private long count;
+
+ public BufferedElementCountingOutputStream(OutputStream os) {
+ this(os, DEFAULT_BUFFER_SIZE);
+ }
+
+ BufferedElementCountingOutputStream(OutputStream os, int bufferSize) {
+ this.buffer = ByteBuffer.allocate(bufferSize);
+ this.os = os;
+ this.finished = false;
+ this.count = 0;
+ }
+
+ public void finish() throws IOException {
+ if (finished) {
+ return;
+ }
+ flush();
+ // Finish the stream by stating that there are 0 elements that follow.
+ VarInt.encode(0, os);
+ finished = true;
+ }
+
+ public void markElementStart() throws IOException {
+ if (finished) {
+ throw new IOException("Stream has been finished. Can not add any more elements.");
+ }
+ count++;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (finished) {
+ throw new IOException("Stream has been finished. Can not write any more data.");
+ }
+ if (count == 0) {
+ os.write(b);
+ return;
+ }
+
+ if (buffer.hasRemaining()) {
+ buffer.put((byte) b);
+ } else {
+ outputBuffer();
+ os.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (finished) {
+ throw new IOException("Stream has been finished. Can not write any more data.");
+ }
+ if (count == 0) {
+ os.write(b, off, len);
+ return;
+ }
+
+ if (buffer.remaining() >= len) {
+ buffer.put(b, off, len);
+ } else {
+ outputBuffer();
+ os.write(b, off, len);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (finished) {
+ return;
+ }
+ outputBuffer();
+ os.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ finish();
+ os.close();
+ }
+
+ // Output the buffer if it contains any data.
+ private void outputBuffer() throws IOException {
+ if (count > 0) {
+ VarInt.encode(count, os);
+ // We are using a heap based buffer and not a direct buffer so it is safe to access
+ // the underlying array.
+ os.write(buffer.array(), buffer.arrayOffset(), buffer.position());
+ buffer.clear();
+ // The buffer has been flushed so we must write to the underlying stream until
+ // we learn of the next element. We reset the count to zero marking that we should
+ // not use the buffer.
+ count = 0;
+ }
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java
new file mode 100644
index 0000000..6b1af05
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteArrayCoder.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.io.ByteStreams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class ByteArrayCoder extends AtomicCoder<byte[]> {
+
+ public static ByteArrayCoder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final ByteArrayCoder INSTANCE = new ByteArrayCoder();
+
+ private ByteArrayCoder() {
+ }
+
+ @Override
+ public void encode(byte[] value, OutputStream outStream)
+ throws CoderException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null byte[]");
+ }
+
+ try {
+ VarInt.encode(value.length, outStream);
+ outStream.write(value);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public byte[] decode(InputStream inStream)
+ throws CoderException {
+ byte[] value = null;
+ try {
+ int length = VarInt.decodeInt(inStream);
+ if (length < 0) {
+ throw new CoderException("invalid length " + length);
+ }
+ value = new byte[length];
+
+ ByteStreams.readFully(inStream, value);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ return value;
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ }
+
+ @Override
+ public Object structuralValue(byte[] value) {
+ return new StructuralByteArray(value);
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(byte[] value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(byte[] value) {
+ if (value == null) {
+ throw new CoderException("cannot encode a null byte[]");
+ }
+ return VarInt.getLength(value.length) + value.length;
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java
new file mode 100644
index 0000000..e3cb7e4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ByteCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class ByteCoder extends AtomicCoder<Byte> {
+
+ public static ByteCoder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final ByteCoder INSTANCE = new ByteCoder();
+
+ private ByteCoder() {
+ }
+
+ @Override
+ public void encode(Byte value, OutputStream outStream)
+ throws CoderException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Byte");
+ }
+ try {
+ outStream.write(value.byteValue());
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public Byte decode(InputStream inStream) throws CoderException {
+ try {
+ // value will be between 0-255, -1 for EOF
+ int value = inStream.read();
+ if (value == -1) {
+ throw new EOFException("EOF encountered decoding 1 byte from input stream");
+ }
+ return (byte) value;
+ } catch (EOFException | UTFDataFormatException exn) {
+ // These exceptions correspond to decoding problems, so change
+ // what kind of exception they're branded as.
+ throw new CoderException(exn);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Byte value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(Byte value) {
+ if (value == null) {
+ throw new CoderException("cannot estimate size for unsupported null value");
+ }
+ return 1;
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
new file mode 100644
index 0000000..e1999ed
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/Coder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+
+import java.io.*;
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public abstract class Coder<T> implements Serializable {
+
+ public abstract void encode(T value, OutputStream outStream)
+ throws CoderException;
+
+ public abstract T decode(InputStream inStream) throws CoderException;
+
+ public abstract List<? extends Coder<?>> getCoderArguments();
+
+ public abstract void verifyDeterministic() throws Coder.NonDeterministicException;
+
+ public static void verifyDeterministic(Coder<?> target, String message, Iterable<Coder<?>> coders)
+ throws NonDeterministicException {
+ for (Coder<?> coder : coders) {
+ try {
+ coder.verifyDeterministic();
+ } catch (NonDeterministicException e) {
+ throw new NonDeterministicException(target, message, e);
+ }
+ }
+ }
+
+ public static void verifyDeterministic(Coder<?> target, String message, Coder<?>... coders)
+ throws NonDeterministicException {
+ verifyDeterministic(target, message, Arrays.asList(coders));
+ }
+
+ public boolean consistentWithEquals() {
+ return false;
+ }
+
+ public Object structuralValue(T value) {
+ if (value != null && consistentWithEquals()) {
+ return value;
+ } else {
+ try {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ encode(value, os);
+ return new StructuralByteArray(os.toByteArray());
+ } catch (Exception exn) {
+ throw new IllegalArgumentException(
+ "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+ }
+ }
+ }
+
+ public boolean isRegisterByteSizeObserverCheap(T value) {
+ return false;
+ }
+
+ public void registerByteSizeObserver(T value, ElementByteSizeObserver observer) {
+ observer.update(getEncodedElementByteSize(value));
+ }
+
+ protected long getEncodedElementByteSize(T value) {
+ try (CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream())) {
+ encode(value, os);
+ return os.getCount();
+ } catch (Exception exn) {
+ throw new IllegalArgumentException(
+ "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+ }
+ }
+
+ public static class NonDeterministicException extends RuntimeException {
+ private Coder<?> coder;
+ private List<String> reasons;
+
+ public NonDeterministicException(
+ Coder<?> coder, String reason, NonDeterministicException e) {
+ this(coder, Arrays.asList(reason), e);
+ }
+
+ public NonDeterministicException(Coder<?> coder, String reason) {
+ this(coder, Arrays.asList(reason), null);
+ }
+
+ public NonDeterministicException(Coder<?> coder, List<String> reasons) {
+ this(coder, reasons, null);
+ }
+
+ public NonDeterministicException(
+ Coder<?> coder,
+ List<String> reasons,
+ NonDeterministicException cause) {
+ super(cause);
+ checkArgument(reasons.size() > 0, "Reasons must not be empty.");
+ this.reasons = reasons;
+ this.coder = coder;
+ }
+
+ public Iterable<String> getReasons() {
+ return reasons;
+ }
+
+ @Override
+ public String getMessage() {
+ return String.format("%s is not deterministic because:%n %s",
+ coder, Joiner.on("%n ").join(reasons));
+ }
+ }
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java
new file mode 100644
index 0000000..8213e42
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.coder;
+
+public class CoderException extends RuntimeException {
+ public CoderException(String message) {
+ super(message);
+ }
+
+ public CoderException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CoderException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java
new file mode 100644
index 0000000..2126c48
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/CoderUtils.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.io.BaseEncoding;
+
+import java.io.*;
+import java.lang.ref.SoftReference;
+
+public final class CoderUtils {
+ private CoderUtils() {
+ } // Non-instantiable
+
+ private static ThreadLocal<SoftReference<ExposedByteArrayOutputStream>>
+ threadLocalOutputStream = new ThreadLocal<>();
+
+ private static ThreadLocal<Boolean> threadLocalOutputStreamInUse = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return false;
+ }
+ };
+
+ public static <T> byte[] encodeToByteArray(Coder<T> coder, T value)
+ throws CoderException {
+ if (threadLocalOutputStreamInUse.get()) {
+ // encodeToByteArray() is called recursively and the thread local stream is in use,
+ // allocating a new one.
+ ByteArrayOutputStream stream = new ExposedByteArrayOutputStream();
+ encodeToSafeStream(coder, value, stream);
+ return stream.toByteArray();
+ } else {
+ threadLocalOutputStreamInUse.set(true);
+ try {
+ ByteArrayOutputStream stream = getThreadLocalOutputStream();
+ encodeToSafeStream(coder, value, stream);
+ return stream.toByteArray();
+ } finally {
+ threadLocalOutputStreamInUse.set(false);
+ }
+ }
+ }
+
+ private static <T> void encodeToSafeStream(
+ Coder<T> coder, T value, OutputStream stream) throws CoderException {
+ try {
+ coder.encode(value, new UnownedOutputStream(stream));
+ } catch (CoderException exn) {
+ throw new IllegalArgumentException(
+ "Forbidden IOException when writing to OutputStream", exn);
+ }
+ }
+
+ public static <T> T decodeFromByteArray(
+ Coder<T> coder, byte[] encodedValue) throws CoderException {
+ try (ExposedByteArrayInputStream stream = new ExposedByteArrayInputStream(encodedValue)) {
+ T result = decodeFromSafeStream(coder, stream);
+ if (stream.available() != 0) {
+ throw new CoderException(
+ stream.available() + " unexpected extra bytes after decoding " + result);
+ }
+ return result;
+ }
+ }
+
+ private static <T> T decodeFromSafeStream(
+ Coder<T> coder, InputStream stream) throws CoderException {
+ try {
+ return coder.decode(new UnownedInputStream(stream));
+ } catch (CoderException exn) {
+ throw new IllegalArgumentException(
+ "Forbidden IOException when reading from InputStream", exn);
+ }
+ }
+
+ private static ByteArrayOutputStream getThreadLocalOutputStream() {
+ SoftReference<ExposedByteArrayOutputStream> refStream = threadLocalOutputStream.get();
+ ExposedByteArrayOutputStream stream = refStream == null ? null : refStream.get();
+ if (stream == null) {
+ stream = new ExposedByteArrayOutputStream();
+ threadLocalOutputStream.set(new SoftReference<>(stream));
+ }
+ stream.reset();
+ return stream;
+ }
+
+ public static <T> T clone(Coder<T> coder, T value) throws CoderException {
+ return decodeFromByteArray(coder, encodeToByteArray(coder, value));
+ }
+
+ public static <T> String encodeToBase64(Coder<T> coder, T value)
+ throws CoderException {
+ byte[] rawValue = encodeToByteArray(coder, value);
+ return BaseEncoding.base64Url().omitPadding().encode(rawValue);
+ }
+
+ public static <T> T decodeFromBase64(Coder<T> coder, String encodedValue) throws CoderException {
+ return decodeFromSafeStream(
+ coder,
+ new ByteArrayInputStream(BaseEncoding.base64Url().omitPadding().decode(encodedValue)));
+ }
+
+ public static class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
+
+ private byte[] swappedBuffer;
+
+ private boolean isFallback = false;
+
+ private void fallback() {
+ isFallback = true;
+ if (swappedBuffer != null) {
+ // swappedBuffer != null means buf is actually provided by the caller of writeAndOwn(),
+ // while swappedBuffer is the original buffer.
+ // Recover the buffer and copy the bytes from buf.
+ byte[] tempBuffer = buf;
+ count = 0;
+ buf = swappedBuffer;
+ super.write(tempBuffer, 0, tempBuffer.length);
+ swappedBuffer = null;
+ }
+ }
+
+ public void writeAndOwn(byte[] b) throws IOException {
+ if (b.length == 0) {
+ return;
+ }
+ if (count == 0) {
+ // Optimized first-time whole write.
+ // The original buffer will be swapped to swappedBuffer, while the input b is used as buf.
+ swappedBuffer = buf;
+ buf = b;
+ count = b.length;
+ } else {
+ fallback();
+ super.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) {
+ fallback();
+ super.write(b, off, len);
+ }
+
+ @Override
+ public void write(int b) {
+ fallback();
+ super.write(b);
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ // Note: count == buf.length is not a correct criteria to "return buf;", because the internal
+ // buf may be reused after reset().
+ if (!isFallback && count > 0) {
+ return buf;
+ } else {
+ return super.toByteArray();
+ }
+ }
+
+ @Override
+ public void reset() {
+ if (count == 0) {
+ return;
+ }
+ count = 0;
+ if (isFallback) {
+ isFallback = false;
+ } else {
+ buf = swappedBuffer;
+ swappedBuffer = null;
+ }
+ }
+ }
+
+ public static class ExposedByteArrayInputStream extends ByteArrayInputStream {
+
+ public ExposedByteArrayInputStream(byte[] buf) {
+ super(buf);
+ }
+
+ public byte[] readAll() throws IOException {
+ if (pos == 0 && count == buf.length) {
+ pos = count;
+ return buf;
+ }
+ byte[] ret = new byte[count - pos];
+ super.read(ret);
+ return ret;
+ }
+
+ @Override
+ public void close() {
+ try {
+ super.close();
+ } catch (IOException exn) {
+ throw new RuntimeException("Unexpected IOException closing ByteArrayInputStream", exn);
+ }
+ }
+ }
+
+ public static class UnownedOutputStream extends FilterOutputStream {
+ public UnownedOutputStream(OutputStream delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException("Caller does not own the underlying output stream "
+ + " and should not call close().");
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof UnownedOutputStream
+ && ((UnownedOutputStream) obj).out.equals(out);
+ }
+
+ @Override
+ public int hashCode() {
+ return out.hashCode();
+ }
+
+ }
+
+ public static class UnownedInputStream extends FilterInputStream {
+ public UnownedInputStream(InputStream delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException("Caller does not own the underlying input stream "
+ + " and should not call close().");
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof UnownedInputStream
+ && ((UnownedInputStream) obj).in.equals(in);
+ }
+
+ @Override
+ public int hashCode() {
+ return in.hashCode();
+ }
+
+ @SuppressWarnings("UnsynchronizedOverridesSynchronized")
+ @Override
+ public void mark(int readlimit) {
+ throw new UnsupportedOperationException("Caller does not own the underlying input stream "
+ + " and should not call mark().");
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @SuppressWarnings("UnsynchronizedOverridesSynchronized")
+ @Override
+ public void reset() throws IOException {
+ throw new UnsupportedOperationException("Caller does not own the underlying input stream "
+ + " and should not call reset().");
+ }
+
+ }
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java
new file mode 100644
index 0000000..981bee2
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/DoubleCoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class DoubleCoder extends AtomicCoder<Double> {
+
+ public static DoubleCoder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final DoubleCoder INSTANCE = new DoubleCoder();
+
+ private DoubleCoder() {
+ }
+
+ @Override
+ public void encode(Double value, OutputStream outStream)
+ throws CoderException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Double");
+ }
+ try {
+ new DataOutputStream(outStream).writeDouble(value);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public Double decode(InputStream inStream)
+ throws CoderException {
+ try {
+ return new DataInputStream(inStream).readDouble();
+ } catch (EOFException | UTFDataFormatException exn) {
+ // These exceptions correspond to decoding problems, so change
+ // what kind of exception they're branded as.
+ throw new CoderException(exn);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ throw new NonDeterministicException(this,
+ "Floating point encodings are not guaranteed to be deterministic.");
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Double value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(Double value) {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Double");
+ }
+ return 8;
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java
new file mode 100644
index 0000000..29b4aa5
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Observer;
+
+public abstract class ElementByteSizeObservableIterable<
+ V, InputT extends ElementByteSizeObservableIterator<V>>
+ implements Iterable<V> {
+ private List<Observer> observers = new ArrayList<>();
+
+ protected abstract InputT createIterator();
+
+ public void addObserver(Observer observer) {
+ observers.add(observer);
+ }
+
+ @Override
+ public InputT iterator() {
+ InputT iterator = createIterator();
+ for (Observer observer : observers) {
+ iterator.addObserver(observer);
+ }
+ observers.clear();
+ return iterator;
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java
new file mode 100644
index 0000000..946882b
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObservableIterator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.Iterator;
+import java.util.Observable;
+
+public abstract class ElementByteSizeObservableIterator<V>
+ extends Observable implements Iterator<V> {
+ protected final void notifyValueReturned(long byteSize) {
+ setChanged();
+ notifyObservers(byteSize);
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java
new file mode 100644
index 0000000..5464067
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ElementByteSizeObserver.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.Observable;
+import java.util.Observer;
+
+public abstract class ElementByteSizeObserver implements Observer {
+ private boolean isLazy = false;
+ private long totalSize = 0;
+ private double scalingFactor = 1.0;
+
+ public ElementByteSizeObserver() {
+ }
+
+ protected abstract void reportElementSize(long elementByteSize);
+
+ public void setLazy() {
+ isLazy = true;
+ }
+
+ public boolean getIsLazy() {
+ return isLazy;
+ }
+
+ public void update(Object obj) {
+ update(null, obj);
+ }
+
+ public void setScalingFactor(double scalingFactor) {
+ this.scalingFactor = scalingFactor;
+ }
+
+ @Override
+ public void update(Observable obs, Object obj) {
+ if (obj instanceof Long) {
+ totalSize += scalingFactor * (Long) obj;
+ } else if (obj instanceof Integer) {
+ totalSize += scalingFactor * (Integer) obj;
+ } else {
+ throw new AssertionError("unexpected parameter object");
+ }
+ }
+
+ public void advance() {
+ reportElementSize(totalSize);
+ totalSize = 0;
+ isLazy = false;
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java
new file mode 100644
index 0000000..d069068
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableCoder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.List;
+
+public class IterableCoder<T> extends IterableLikeCoder<T, Iterable<T>> {
+
+ public static <T> IterableCoder<T> of(Coder<T> elemCoder) {
+ return new IterableCoder<>(elemCoder);
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // Internal operations below here.
+
+ @Override
+ protected final Iterable<T> decodeToIterable(List<T> decodedElements) {
+ return decodedElements;
+ }
+
+ protected IterableCoder(Coder<T> elemCoder) {
+ super(elemCoder, "Iterable");
+ }
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java
new file mode 100644
index 0000000..5bb6c66
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IterableLikeCoder.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+import java.util.*;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public abstract class IterableLikeCoder<T, IterableT extends Iterable<T>>
+ extends StructuredCoder<IterableT> {
+ public Coder<T> getElemCoder() {
+ return elementCoder;
+ }
+
+ protected abstract IterableT decodeToIterable(List<T> decodedElements);
+
+ /////////////////////////////////////////////////////////////////////////////
+ // Internal operations below here.
+
+ private final Coder<T> elementCoder;
+ private final String iterableName;
+
+ protected IterableLikeCoder(Coder<T> elementCoder, String iterableName) {
+ checkArgument(elementCoder != null, "element Coder for IterableLikeCoder must not be null");
+ checkArgument(iterableName != null, "iterable name for IterableLikeCoder must not be null");
+ this.elementCoder = elementCoder;
+ this.iterableName = iterableName;
+ }
+
+ @Override
+ public void encode(IterableT iterable, OutputStream outStream) {
+ if (iterable == null) {
+ throw new CoderException("cannot encode a null " + iterableName);
+ }
+ DataOutputStream dataOutStream = new DataOutputStream(outStream);
+ try {
+ if (iterable instanceof Collection) {
+ // We can know the size of the Iterable. Use an encoding with a
+ // leading size field, followed by that many elements.
+ Collection<T> collection = (Collection<T>) iterable;
+ dataOutStream.writeInt(collection.size());
+ for (T elem : collection) {
+ elementCoder.encode(elem, dataOutStream);
+ }
+ } else {
+ // We don't know the size without traversing it so use a fixed size buffer
+ // and encode as many elements as possible into it before outputting the size followed
+ // by the elements.
+ dataOutStream.writeInt(-1);
+ BufferedElementCountingOutputStream countingOutputStream =
+ new BufferedElementCountingOutputStream(dataOutStream);
+ for (T elem : iterable) {
+ countingOutputStream.markElementStart();
+ elementCoder.encode(elem, countingOutputStream);
+ }
+ countingOutputStream.finish();
+ }
+ // Make sure all our output gets pushed to the underlying outStream.
+ dataOutStream.flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public IterableT decode(InputStream inStream) {
+ try {
+ DataInputStream dataInStream = new DataInputStream(inStream);
+ int size = dataInStream.readInt();
+ if (size >= 0) {
+ List<T> elements = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ elements.add(elementCoder.decode(dataInStream));
+ }
+ return decodeToIterable(elements);
+ }
+ List<T> elements = new ArrayList<>();
+ // We don't know the size a priori. Check if we're done with
+ // each block of elements.
+ long count = VarInt.decodeLong(dataInStream);
+ while (count > 0L) {
+ elements.add(elementCoder.decode(dataInStream));
+ --count;
+ if (count == 0L) {
+ count = VarInt.decodeLong(dataInStream);
+ }
+ }
+ return decodeToIterable(elements);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(elementCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws Coder.NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "IterableLikeCoder can not guarantee deterministic ordering.");
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(
+ IterableT iterable) {
+ return iterable instanceof ElementByteSizeObservableIterable;
+ }
+
+ @Override
+ public void registerByteSizeObserver(
+ IterableT iterable, ElementByteSizeObserver observer) {
+ if (iterable == null) {
+ throw new CoderException("cannot encode a null Iterable");
+ }
+
+ if (iterable instanceof ElementByteSizeObservableIterable) {
+ observer.setLazy();
+ ElementByteSizeObservableIterable<?, ?> observableIterable =
+ (ElementByteSizeObservableIterable<?, ?>) iterable;
+ observableIterable.addObserver(
+ new IteratorObserver(observer, iterable instanceof Collection));
+ } else {
+ if (iterable instanceof Collection) {
+ // We can know the size of the Iterable. Use an encoding with a
+ // leading size field, followed by that many elements.
+ Collection<T> collection = (Collection<T>) iterable;
+ observer.update(4L);
+ for (T elem : collection) {
+ elementCoder.registerByteSizeObserver(elem, observer);
+ }
+ } else {
+ // TODO: (BEAM-1537) Update to use an accurate count depending on size and count,
+ // currently we are under estimating the size by up to 10 bytes per block of data since we
+ // are not encoding the count prefix which occurs at most once per 64k of data and is upto
+ // 10 bytes long. Since we include the total count we can upper bound the underestimate
+ // to be 10 / 65536 ~= 0.0153% of the actual size.
+ observer.update(4L);
+ long count = 0;
+ for (T elem : iterable) {
+ count += 1;
+ elementCoder.registerByteSizeObserver(elem, observer);
+ }
+ if (count > 0) {
+ // Update the length based upon the number of counted elements, this helps
+ // eliminate the case where all the elements are encoded in the first block and
+ // it is quite short (e.g. Long.MAX_VALUE nulls encoded with VoidCoder).
+ observer.update(VarInt.getLength(count));
+ }
+ // Update with the terminator byte.
+ observer.update(1L);
+ }
+ }
+ }
+
+ private class IteratorObserver implements Observer {
+ private final ElementByteSizeObserver outerObserver;
+ private final boolean countable;
+
+ public IteratorObserver(ElementByteSizeObserver outerObserver,
+ boolean countable) {
+ this.outerObserver = outerObserver;
+ this.countable = countable;
+
+ if (countable) {
+ // Additional 4 bytes are due to size.
+ outerObserver.update(4L);
+ } else {
+ // Additional 5 bytes are due to size = -1 (4 bytes) and
+ // hasNext = false (1 byte).
+ outerObserver.update(5L);
+ }
+ }
+
+ @Override
+ public void update(Observable obs, Object obj) {
+ if (!(obj instanceof Long)) {
+ throw new AssertionError("unexpected parameter object");
+ }
+
+ if (countable) {
+ outerObserver.update(obs, obj);
+ } else {
+ // Additional 1 byte is due to hasNext = true flag.
+ outerObserver.update(obs, 1 + (long) obj);
+ }
+ }
+ }
+
+ public static abstract class ElementByteSizeObservableIterable<
+ V, InputT extends ElementByteSizeObservableIterator<V>>
+ implements Iterable<V> {
+ private List<Observer> observers = new ArrayList<>();
+
+ protected abstract InputT createIterator();
+
+ public void addObserver(Observer observer) {
+ observers.add(observer);
+ }
+
+ @Override
+ public InputT iterator() {
+ InputT iterator = createIterator();
+ for (Observer observer : observers) {
+ iterator.addObserver(observer);
+ }
+ observers.clear();
+ return iterator;
+ }
+ }
+
+ public static abstract class ElementByteSizeObservableIterator<V>
+ extends Observable implements Iterator<V> {
+ protected final void notifyValueReturned(long byteSize) {
+ setChanged();
+ notifyObservers(byteSize);
+ }
+ }
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java
new file mode 100644
index 0000000..f1be5fb
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/IteratorObserver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.Observable;
+import java.util.Observer;
+
+public class IteratorObserver implements Observer {
+ private final ElementByteSizeObserver outerObserver;
+ private final boolean countable;
+
+ public IteratorObserver(ElementByteSizeObserver outerObserver,
+ boolean countable) {
+ this.outerObserver = outerObserver;
+ this.countable = countable;
+
+ if (countable) {
+ // Additional 4 bytes are due to size.
+ outerObserver.update(4L);
+ } else {
+ // Additional 5 bytes are due to size = -1 (4 bytes) and
+ // hasNext = false (1 byte).
+ outerObserver.update(5L);
+ }
+ }
+
+ @Override
+ public void update(Observable obs, Object obj) {
+ if (!(obj instanceof Long)) {
+ throw new AssertionError("unexpected parameter object");
+ }
+
+ if (countable) {
+ outerObserver.update(obs, obj);
+ } else {
+ // Additional 1 byte is due to hasNext = true flag.
+ outerObserver.update(obs, 1 + (long) obj);
+ }
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java
new file mode 100644
index 0000000..3858ec6
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/ListCoder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.List;
+
+public class ListCoder<T> extends IterableLikeCoder<T, List<T>> {
+
+ public static <T> ListCoder<T> of(Coder<T> elemCoder) {
+ return new ListCoder<>(elemCoder);
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // Internal operations below here.
+
+ @Override
+ protected final List<T> decodeToIterable(List<T> decodedElements) {
+ return decodedElements;
+ }
+
+ protected ListCoder(Coder<T> elemCoder) {
+ super(elemCoder, "List");
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic(this, "ListCoder.elemCoder must be deterministic",
+ (Iterable<Coder<?>>)getElemCoder());
+ }
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java
new file mode 100644
index 0000000..66b983c
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/MapCoder.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.collect.Maps;
+
+import java.io.*;
+import java.util.*;
+
+public class MapCoder<K, V> extends StructuredCoder<Map<K, V>> {
+
+ public static <K, V> MapCoder<K, V> of(
+ Coder<K> keyCoder,
+ Coder<V> valueCoder) {
+ return new MapCoder<>(keyCoder, valueCoder);
+ }
+
+ public Coder<K> getKeyCoder() {
+ return keyCoder;
+ }
+
+ public Coder<V> getValueCoder() {
+ return valueCoder;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private Coder<K> keyCoder;
+ private Coder<V> valueCoder;
+
+ private MapCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+ this.keyCoder = keyCoder;
+ this.valueCoder = valueCoder;
+ }
+
+ @Override
+ public void encode(Map<K, V> map, OutputStream outStream) throws CoderException {
+ if (map == null) {
+ throw new CoderException("cannot encode a null Map");
+ }
+ DataOutputStream dataOutStream = new DataOutputStream(outStream);
+
+ int size = map.size();
+ try {
+ dataOutStream.writeInt(size);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (size == 0) {
+ return;
+ }
+
+ // Since we handled size == 0 above, entry is guaranteed to exist before and after loop
+ Iterator<Map.Entry<K, V>> iterator = map.entrySet().iterator();
+ Map.Entry<K, V> entry = iterator.next();
+ while (iterator.hasNext()) {
+ keyCoder.encode(entry.getKey(), outStream);
+ valueCoder.encode(entry.getValue(), outStream);
+ entry = iterator.next();
+ }
+
+ keyCoder.encode(entry.getKey(), outStream);
+ valueCoder.encode(entry.getValue(), outStream);
+ // no flush needed as DataOutputStream does not buffer
+ }
+
+ @Override
+ public Map<K, V> decode(InputStream inStream) {
+ DataInputStream dataInStream = new DataInputStream(inStream);
+ int size = 0;
+ try {
+ size = dataInStream.readInt();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (size == 0) {
+ return Collections.emptyMap();
+ }
+
+ Map<K, V> retval = Maps.newHashMapWithExpectedSize(size);
+ for (int i = 0; i < size - 1; ++i) {
+ K key = keyCoder.decode(inStream);
+ V value = valueCoder.decode(inStream);
+ retval.put(key, value);
+ }
+
+ K key = keyCoder.decode(inStream);
+ V value = valueCoder.decode(inStream);
+ retval.put(key, value);
+ return retval;
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(keyCoder, valueCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "Ordering of entries in a Map may be non-deterministic.");
+ }
+
+ @Override
+ public void registerByteSizeObserver(
+ Map<K, V> map, ElementByteSizeObserver observer) {
+ observer.update(4L);
+ if (map.isEmpty()) {
+ return;
+ }
+ Iterator<Map.Entry<K, V>> entries = map.entrySet().iterator();
+ Map.Entry<K, V> entry = entries.next();
+ while (entries.hasNext()) {
+ keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+ valueCoder.registerByteSizeObserver(entry.getValue(), observer);
+ entry = entries.next();
+ }
+ keyCoder.registerByteSizeObserver(entry.getKey(), observer);
+ valueCoder.registerByteSizeObserver(entry.getValue(), observer);
+ }
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java
new file mode 100644
index 0000000..4147732
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/SetCoder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SetCoder<T> extends IterableLikeCoder<T, Set<T>> {
+
+ public static <T> SetCoder<T> of(Coder<T> elementCoder) {
+ return new SetCoder<>(elementCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "Ordering of elements in a set may be non-deterministic.");
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // Internal operations below here.
+
+ @Override
+ protected final Set<T> decodeToIterable(List<T> decodedElements) {
+ return new HashSet<>(decodedElements);
+ }
+
+ protected SetCoder(Coder<T> elemCoder) {
+ super(elemCoder, "Set");
+ }
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java
new file mode 100644
index 0000000..73ea8eb
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StringUtf8Coder.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.base.Utf8;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+public class StringUtf8Coder extends AtomicCoder<String> {
+
+ public static StringUtf8Coder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final StringUtf8Coder INSTANCE = new StringUtf8Coder();
+
+ private static void writeString(String value, DataOutputStream dos)
+ throws IOException {
+ byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+ VarInt.encode(bytes.length, dos);
+ dos.write(bytes);
+ }
+
+ private static String readString(DataInputStream dis) throws IOException {
+ int len = VarInt.decodeInt(dis);
+ if (len < 0) {
+ throw new CoderException("Invalid encoded string length: " + len);
+ }
+ byte[] bytes = new byte[len];
+ dis.readFully(bytes);
+ return new String(bytes, StandardCharsets.UTF_8);
+ }
+
+ private StringUtf8Coder() {
+ }
+
+ @Override
+ public void encode(String value, OutputStream outStream) throws CoderException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null String");
+ }
+ try {
+ writeString(value, new DataOutputStream(outStream));
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public String decode(InputStream inStream)
+ throws CoderException {
+ try {
+ return readString(new DataInputStream(inStream));
+ } catch (EOFException | UTFDataFormatException exn) {
+ // These exceptions correspond to decoding problems, so change
+ // what kind of exception they're branded as.
+ throw new CoderException(exn);
+ } catch (Exception e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public long getEncodedElementByteSize(String value) {
+ if (value == null) {
+ throw new CoderException("cannot encode a null String");
+ }
+ int size = Utf8.encodedLength(value);
+ return VarInt.getLength(size) + size;
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java
new file mode 100644
index 0000000..6a371f6
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuralByteArray.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import com.google.common.io.BaseEncoding;
+
+import java.util.Arrays;
+
+public class StructuralByteArray {
+ byte[] value;
+
+ public StructuralByteArray(byte[] value) {
+ this.value = value;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof StructuralByteArray) {
+ StructuralByteArray that = (StructuralByteArray) o;
+ return Arrays.equals(this.value, that.value);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(value);
+ }
+
+ @Override
+ public String toString() {
+ return "base64:" + BaseEncoding.base64().encode(value);
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java
new file mode 100644
index 0000000..3e299a6
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/StructuredCoder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class StructuredCoder<T> extends Coder<T> {
+ protected StructuredCoder() {}
+
+ public List<? extends Coder<?>> getComponents() {
+ List<? extends Coder<?>> coderArguments = getCoderArguments();
+ if (coderArguments == null) {
+ return Collections.emptyList();
+ } else {
+ return coderArguments;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || this.getClass() != o.getClass()) {
+ return false;
+ }
+ StructuredCoder<?> that = (StructuredCoder<?>) o;
+ return this.getComponents().equals(that.getComponents());
+ }
+
+ @Override
+ public int hashCode() {
+ return getClass().hashCode() * 31 + getComponents().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ String s = getClass().getName();
+ builder.append(s.substring(s.lastIndexOf('.') + 1));
+
+ List<? extends Coder<?>> componentCoders = getComponents();
+ if (!componentCoders.isEmpty()) {
+ builder.append('(');
+ boolean first = true;
+ for (Coder<?> componentCoder : componentCoders) {
+ if (first) {
+ first = false;
+ } else {
+ builder.append(',');
+ }
+ builder.append(componentCoder.toString());
+ }
+ builder.append(')');
+ }
+ return builder.toString();
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return false;
+ }
+
+ @Override
+ public Object structuralValue(T value) {
+ if (value != null && consistentWithEquals()) {
+ return value;
+ } else {
+ try {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ encode(value, os);
+ return new StructuralByteArray(os.toByteArray());
+ } catch (Exception exn) {
+ throw new IllegalArgumentException(
+ "Unable to encode element '" + value + "' with coder '" + this + "'.", exn);
+ }
+ }
+ }
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java
new file mode 100644
index 0000000..bebc1e4
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarInt.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class VarInt {
+
+ private static long convertIntToLongNoSignExtend(int v) {
+ return v & 0xFFFFFFFFL;
+ }
+
+ public static void encode(int v, OutputStream stream) throws IOException {
+ encode(convertIntToLongNoSignExtend(v), stream);
+ }
+
+ public static void encode(long v, OutputStream stream) throws IOException {
+ do {
+ // Encode next 7 bits + terminator bit
+ long bits = v & 0x7F;
+ v >>>= 7;
+ byte b = (byte) (bits | ((v != 0) ? 0x80 : 0));
+ stream.write(b);
+ } while (v != 0);
+ }
+
+ public static int decodeInt(InputStream stream) throws IOException {
+ long r = decodeLong(stream);
+ if (r < 0 || r >= 1L << 32) {
+ throw new IOException("varint overflow " + r);
+ }
+ return (int) r;
+ }
+
+ public static long decodeLong(InputStream stream) throws IOException {
+ long result = 0;
+ int shift = 0;
+ int b;
+ do {
+ // Get 7 bits from next byte
+ b = stream.read();
+ if (b < 0) {
+ if (shift == 0) {
+ throw new EOFException();
+ } else {
+ throw new IOException("varint not terminated");
+ }
+ }
+ long bits = b & 0x7F;
+ if (shift >= 64 || (shift == 63 && bits > 1)) {
+ // Out of range
+ throw new IOException("varint too long");
+ }
+ result |= bits << shift;
+ shift += 7;
+ } while ((b & 0x80) != 0);
+ return result;
+ }
+
+ public static int getLength(int v) {
+ return getLength(convertIntToLongNoSignExtend(v));
+ }
+
+ public static int getLength(long v) {
+ int result = 0;
+ do {
+ result++;
+ v >>>= 7;
+ } while (v != 0);
+ return result;
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java
new file mode 100644
index 0000000..7dac822
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarIntCoder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+
+public class VarIntCoder extends AtomicCoder<Integer> {
+
+ public static VarIntCoder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final VarIntCoder INSTANCE = new VarIntCoder();
+
+ private VarIntCoder() {}
+
+ @Override
+ public void encode(Integer value, OutputStream outStream)
+ throws CoderException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Integer");
+ }
+ try {
+ VarInt.encode(value.intValue(), outStream);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public Integer decode(InputStream inStream)
+ throws CoderException {
+ try {
+ return VarInt.decodeInt(inStream);
+ } catch (EOFException | UTFDataFormatException exn) {
+ // These exceptions correspond to decoding problems, so change
+ // what kind of exception they're branded as.
+ throw new CoderException(exn);
+ } catch (Exception e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public void verifyDeterministic() {}
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Integer value) {
+ return true;
+ }
+
+ @Override
+ public long getEncodedElementByteSize(Integer value) {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Integer");
+ }
+ return VarInt.getLength(value.longValue());
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java
new file mode 100644
index 0000000..15af634
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VarLongCoder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.List;
+
+public class VarLongCoder extends StructuredCoder<Long> {
+ public static VarLongCoder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final VarLongCoder INSTANCE = new VarLongCoder();
+
+ private VarLongCoder() {}
+
+ @Override
+ public void encode(Long value, OutputStream outStream)
+ throws CoderException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Long");
+ }
+ try {
+ VarInt.encode(value.longValue(), outStream);
+ } catch (IOException e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public Long decode(InputStream inStream)
+ throws CoderException {
+ try {
+ return VarInt.decodeLong(inStream);
+ } catch (EOFException | UTFDataFormatException exn) {
+ // These exceptions correspond to decoding problems, so change
+ // what kind of exception they're branded as.
+ throw new CoderException(exn);
+ } catch (Exception e) {
+ throw new CoderException(e);
+ }
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void verifyDeterministic() {}
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Long value) {
+ return true;
+ }
+
+ @Override
+ public long getEncodedElementByteSize(Long value) {
+ if (value == null) {
+ throw new CoderException("cannot encode a null Long");
+ }
+ return VarInt.getLength(value.longValue());
+ }
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java
new file mode 100644
index 0000000..f4d00f1
--- /dev/null
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/coder/VoidCoder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.coder;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class VoidCoder extends AtomicCoder<Void> {
+
+ public static VoidCoder of() {
+ return INSTANCE;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ private static final VoidCoder INSTANCE = new VoidCoder();
+
+ private VoidCoder() {
+ }
+
+ @Override
+ public void encode(Void value, OutputStream outStream) {
+ // Nothing to write!
+ }
+
+ @Override
+ public Void decode(InputStream inStream) {
+ // Nothing to read!
+ return null;
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Void value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(Void value) {
+ return 0;
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
index 17c93bd..e706f4f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
@@ -20,10 +20,16 @@
import org.apache.gearpump.Message
import org.apache.gearpump.streaming.dsl.window.api.Trigger
+<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
trait ReduceFnRunner {
def process(message: Message): Unit
def onTrigger(trigger: Trigger): Unit
+=======
+trait State {
+
+ def clear: Unit
+>>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/State.scala
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
index d0b84cb..6665766 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
package org.apache.gearpump.streaming.refactor.sink
import akka.actor.ActorSystem
@@ -33,4 +34,20 @@
Processor[DataSinkTask](parallelism, description = description,
taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink))
}
+=======
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.state.api.State
+
+trait StateTag[StateT <: State] extends Serializable {
+
+ def appendTo(sb: Appendable)
+
+ def getId: String
+
+ def getSpec: StateSpec[StateT]
+
+ def bind(binder: StateBinder): StateT
+
+>>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTag.scala
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala
new file mode 100644
index 0000000..4dbb07f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util
+import java.util.Map.Entry
+import java.util.{ArrayList, HashSet, List, Set}
+import java.lang.Iterable
+
+import com.google.common.collect.{HashBasedTable, Table}
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.InMemoryGlobalStateInternals.InMemoryStateBinder
+import org.apache.gearpump.streaming.refactor.state.api._
+
+class InMemoryGlobalStateInternals[K] protected(key: K) extends StateInternals {
+
+ protected val inMemoryStateTable: InMemoryGlobalStateInternals.StateTable =
+ new InMemoryGlobalStateInternals.StateTable {
+ override def binderForNamespace(namespace: StateNamespace): StateBinder = {
+ new InMemoryStateBinder
+ }
+ }
+
+ override def getKey: Any = key
+
+ override def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T =
+ inMemoryStateTable.get(namespace, address)
+
+}
+
+object InMemoryGlobalStateInternals {
+
+ abstract class StateTable {
+
+ val stateTable: Table[StateNamespace, StateTag[_], State] = HashBasedTable.create()
+
+ def get[StateT <: State](namespace: StateNamespace, tag: StateTag[StateT]): StateT = {
+ val storage: State = stateTable.get(namespace, tag)
+ if (storage != null) {
+ storage.asInstanceOf[StateT]
+ }
+
+ val typedStorage: StateT = tag.getSpec.bind(tag.getId, binderForNamespace(namespace))
+ stateTable.put(namespace, tag, typedStorage)
+ typedStorage
+ }
+
+ def clearNamespace(namespace: StateNamespace): Unit = stateTable.rowKeySet().remove(namespace)
+
+ def clear: Unit = stateTable.clear()
+
+ def values: Iterable[State] = stateTable.values().asInstanceOf[Iterable[State]]
+
+ def isNamespaceInUse(namespace: StateNamespace): Boolean = stateTable.containsRow(namespace)
+
+ def getTagsInUse(namespace: StateNamespace): java.util.Map[StateTag[_], State]
+ = stateTable.row(namespace)
+
+ def getNamespacesInUse(): java.util.Set[StateNamespace] = stateTable.rowKeySet()
+
+ def binderForNamespace(namespace: StateNamespace): StateBinder
+
+ }
+
+ class InMemoryStateBinder extends StateBinder {
+
+ override def bindValue[T](id: String, spec: StateSpec[ValueState[T]],
+ coder: Coder[T]): ValueState[T] = new InMemoryValueState[T]()
+
+ override def bindBag[T](id: String, spec: StateSpec[BagState[T]],
+ elemCoder: Coder[T]): BagState[T] = new InMemoryBagState[T]()
+
+ override def bindSet[T](id: String, spec: StateSpec[SetState[T]],
+ elemCoder: Coder[T]): SetState[T] = new InMemorySetState[T]()
+
+ override def bindMap[KeyT, ValueT](id: String, spec: StateSpec[MapState[KeyT, ValueT]],
+ mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): MapState[KeyT, ValueT] =
+ new InMemoryMapState[KeyT, ValueT]()
+ }
+
+ trait InMemoryState[T <: InMemoryState[T]] {
+
+ def isCleared: Boolean
+
+ def copy: T
+
+ }
+
+ class InMemoryBagState[T] extends BagState[T] with InMemoryState[InMemoryBagState[T]] {
+
+ private var contents: List[T] = new ArrayList[T]
+
+ override def readLater: BagState[T] = this
+
+ override def isCleared: Boolean = contents.isEmpty
+
+ override def copy: InMemoryBagState[T] = {
+ val that: InMemoryBagState[T] = new InMemoryBagState[T]
+ that.contents.addAll(this.contents)
+ that
+ }
+
+ override def add(value: T): Unit = contents.add(value)
+
+ override def isEmpty: ReadableState[Boolean] = {
+ new ReadableState[Boolean] {
+ override def readLater: ReadableState[Boolean] = {
+ this
+ }
+
+ override def read: Boolean = {
+ contents.isEmpty
+ }
+ }
+ }
+
+ override def clear: Unit = contents = new ArrayList[T]
+
+ override def read: Iterable[T] = contents.asInstanceOf[Iterable[T]]
+
+ }
+
+ class InMemoryValueState[T] extends ValueState[T] with InMemoryState[InMemoryValueState[T]] {
+
+ private var cleared: Boolean = true
+ private var value: T = _
+
+ def write(input: T): Unit = {
+ cleared = false
+ this.value = input
+ }
+
+ def readLater: InMemoryValueState[T] = this
+
+ def isCleared: Boolean = cleared
+
+ def copy: InMemoryValueState[T] = {
+ val that: InMemoryValueState[T] = new InMemoryValueState[T]
+ if (!this.cleared) {
+ that.cleared = this.cleared
+ that.value = this.value
+ }
+
+ that
+ }
+
+ def clear: Unit = {
+ value = null.asInstanceOf[T]
+ cleared = true
+ }
+
+ def read: T = value
+
+ }
+
+ class InMemoryMapState[K, V] extends MapState[K, V] with InMemoryState[InMemoryMapState[K, V]] {
+
+ private var contents: util.Map[K, V] = new util.HashMap[K, V]
+
+ override def put(key: K, value: V): Unit = contents.put(key, value)
+
+ override def putIfAbsent(key: K, value: V): ReadableState[V] = {
+ var v: V = contents.get(key)
+ if (v == null) {
+ v = contents.put(key, value)
+ }
+
+ ReadableStates.immediate(v)
+ }
+
+ override def remove(key: K): Unit = contents.remove(key)
+
+ override def get(key: K): ReadableState[V] = ReadableStates.immediate(contents.get(key))
+
+ override def keys: ReadableState[Iterable[K]] =
+ ReadableStates.immediate(contents.keySet().asInstanceOf[Iterable[K]])
+
+ override def values: ReadableState[Iterable[V]] =
+ ReadableStates.immediate(contents.values().asInstanceOf[Iterable[V]])
+
+ override def entries: ReadableState[Iterable[Entry[K, V]]] =
+ ReadableStates.immediate(contents.entrySet().asInstanceOf[Iterable[util.Map.Entry[K, V]]])
+
+ override def isCleared: Boolean = contents.isEmpty
+
+ override def copy: InMemoryMapState[K, V] = {
+ val that: InMemoryMapState[K, V] = new InMemoryMapState
+ that.contents.putAll(this.contents)
+ that
+ }
+
+ override def clear: Unit = contents = new util.HashMap[K, V]
+
+ }
+
+ class InMemorySetState[T] extends SetState[T] with InMemoryState[InMemorySetState[T]] {
+
+ private var contents: Set[T] = new HashSet[T]
+
+ override def contains(t: T): ReadableState[Boolean] =
+ ReadableStates.immediate(contents.contains(t))
+
+ override def addIfAbsent(t: T): ReadableState[Boolean] = {
+ val alreadyContained: Boolean = contents.contains(t)
+ contents.add(t)
+ ReadableStates.immediate(!alreadyContained)
+ }
+
+ override def remove(t: T): Unit = contents.remove(t)
+
+ override def readLater: SetState[T] = this
+
+ override def isCleared: Boolean = contents.isEmpty
+
+ override def copy: InMemorySetState[T] = {
+ val that: InMemorySetState[T] = new InMemorySetState[T]
+ that.contents.addAll(this.contents)
+ that
+ }
+
+ override def add(value: T): Unit = contents.add(value)
+
+ override def isEmpty: ReadableState[Boolean] = {
+ new ReadableState[Boolean] {
+
+ override def readLater: ReadableState[Boolean] = this
+
+ override def read: Boolean = contents.isEmpty
+ }
+ }
+
+ override def clear: Unit = contents = new HashSet[T]
+
+ override def read: Iterable[T] = contents.asInstanceOf[Iterable[T]]
+ }
+
+}
+
+object ReadableStates {
+
+ def immediate[T](value: T): ReadableState[T] = {
+ new ReadableState[T] {
+ override def readLater: ReadableState[T] = {
+ this
+ }
+
+ override def read: T = {
+ value
+ }
+ }
+ }
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
index c387960..8832aee 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
@@ -23,10 +23,16 @@
import org.apache.gearpump.streaming.refactor.coder.Coder
import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+<<<<<<< HEAD:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
/**
*
*/
trait RuntimeContext {
+=======
+trait StateSpec[StateT <: State] extends Serializable {
+
+ def bind(id: String, binder: StateBinder): StateT
+>>>>>>> e6ce91c... [Gearpump 311] refactor state management:streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpec.scala
def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala
new file mode 100644
index 0000000..db39142
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateBinder.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState}
+
+trait StateBinder {
+
+ def bindValue[T](id: String, spec: StateSpec[ValueState[T]], coder: Coder[T]): ValueState[T]
+
+ def bindBag[T](id: String, spec: StateSpec[BagState[T]], elemCoder: Coder[T]): BagState[T]
+
+ def bindSet[T](id: String, spec: StateSpec[SetState[T]], elemCoder: Coder[T]): SetState[T]
+
+ def bindMap[KeyT, ValueT](id: String, spec: StateSpec[MapState[KeyT, ValueT]],
+ mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): MapState[KeyT, ValueT]
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala
new file mode 100644
index 0000000..dbc2320
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespace.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+trait StateNamespace {
+
+ def stringKey: String
+
+ def appendTo(sb: Appendable): Unit
+
+ def getCacheKey: Object
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala
new file mode 100644
index 0000000..c2cba51
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateNamespaces.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+object StateNamespaces {
+
+ def global: StateNamespace = {
+ new GlobalNameSpace
+ }
+
+ private object NameSpace extends Enumeration {
+ type NameSpace = Value
+ val GLOBAL, WINDOW, WINDOW_AND_TRIGGER = Value
+ }
+
+ class GlobalNameSpace extends StateNamespace {
+
+ private val GLOBAL_STRING: String = "/"
+
+ override def stringKey: String = {
+ GLOBAL_STRING
+ }
+
+ override def appendTo(sb: Appendable): Unit = {
+ sb.append(GLOBAL_STRING)
+ }
+
+ override def getCacheKey: AnyRef = {
+ GLOBAL_STRING
+ }
+
+ override def equals(obj: Any): Boolean = {
+ obj == this || obj.isInstanceOf[GlobalNameSpace]
+ }
+
+ override def hashCode(): Int = {
+ Objects.hash(NameSpace.GLOBAL)
+ }
+ }
+
+ // TODO : implement WindowNamespace & WindowAndTriggerNamespace
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
new file mode 100644
index 0000000..f056915
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState}
+
+object StateSpecs {
+
+ private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] {
+
+ var aCoder: Coder[T] = coder
+
+ override def bind(id: String, binder: StateBinder): ValueState[T] = {
+ binder.bindValue(id, this, aCoder)
+ }
+
+ override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+ if (this.aCoder == null) {
+ if (coders(0) != null) {
+ this.aCoder = coders(0).asInstanceOf[Coder[T]]
+ }
+ }
+ }
+
+ override def finishSpecifying: Unit = {
+ if (aCoder == null) throw new IllegalStateException(
+ "Unable to infer a coder for ValueState and no Coder"
+ + " was specified. Please set a coder by either invoking"
+ + " StateSpecs.value(Coder<T> valueCoder) or by registering the coder in the"
+ + " Pipeline's CoderRegistry.")
+ }
+
+ override def equals(obj: Any): Boolean = {
+ var result = false
+ if (obj == this) result = true
+
+ if (!(obj.isInstanceOf[ValueStateSpec[T]])) result = false
+
+ val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]]
+ result = Objects.equals(this.aCoder, that.aCoder)
+ result
+ }
+
+ override def hashCode(): Int = {
+ Objects.hashCode(this.aCoder)
+ }
+ }
+
+ private class BagStateSpec[T](coder: Coder[T]) extends StateSpec[BagState[T]] {
+
+ private implicit var elemCoder = coder
+
+ override def bind(id: String, binder: StateBinder): BagState[T] =
+ binder.bindBag(id, this, elemCoder)
+
+ override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = {
+ if (this.elemCoder == null) {
+ if (coders(0) != null) {
+ this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+ }
+ }
+ }
+
+ override def finishSpecifying: Unit = {
+ if (elemCoder == null) {
+ throw new IllegalStateException("Unable to infer a coder for BagState and no Coder"
+ + " was specified. Please set a coder by either invoking"
+ + " StateSpecs.bag(Coder<T> elemCoder) or by registering the coder in the"
+ + " Pipeline's CoderRegistry.");
+ }
+ }
+
+ override def equals(obj: Any): Boolean = {
+ var result = false
+ if (obj == this) result = true
+
+ if (!obj.isInstanceOf[BagStateSpec[_]]) result = false
+
+ val that = obj.asInstanceOf[BagStateSpec[_]]
+ result = Objects.equals(this.elemCoder, that.elemCoder)
+ result
+ }
+
+ override def hashCode(): Int = Objects.hash(getClass, elemCoder)
+ }
+
+ private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: Coder[V])
+ extends StateSpec[MapState[K, V]] {
+
+ private implicit var kCoder = keyCoder
+ private implicit var vCoder = valueCoder
+
+ override def bind(id: String, binder: StateBinder): MapState[K, V] =
+ binder.bindMap(id, this, keyCoder, valueCoder)
+
+ override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = {
+ if (this.kCoder == null) {
+ if (coders(0) != null) {
+ this.kCoder = coders(0).asInstanceOf[Coder[K]]
+ }
+ }
+
+ if (this.vCoder == null) {
+ if (coders(1) != null) {
+ this.vCoder = coders(1).asInstanceOf[Coder[V]]
+ }
+ }
+ }
+
+ override def finishSpecifying: Unit = {
+ if (keyCoder == null || valueCoder == null) {
+ throw new IllegalStateException("Unable to infer a coder for MapState and no Coder"
+ + " was specified. Please set a coder by either invoking"
+ + " StateSpecs.map(Coder<K> keyCoder, Coder<V> valueCoder) or by registering the"
+ + " coder in the Pipeline's CoderRegistry.");
+ }
+ }
+
+ override def hashCode(): Int = Objects.hash(getClass, kCoder, vCoder)
+
+ override def equals(obj: Any): Boolean = {
+ var result = false
+ if (obj == this) result = true
+
+ if (!obj.isInstanceOf[MapStateSpec[_, _]]) result = false
+
+ implicit var that = obj.asInstanceOf[MapStateSpec[_, _]]
+ result = Objects.equals(this.kCoder, that.vCoder) && Objects.equals(this.vCoder, that.vCoder)
+ result
+ }
+ }
+
+ private class SetStateSpec[T](coder: Coder[T]) extends StateSpec[SetState[T]] {
+
+ private implicit var elemCoder = coder
+
+ override def bind(id: String, binder: StateBinder): SetState[T] =
+ binder.bindSet(id, this, elemCoder)
+
+ override def offerCoders(coders: Array[Coder[SetState[T]]]): Unit = {
+ if (this.elemCoder == null) {
+ if (coders(0) != null) {
+ this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+ }
+ }
+ }
+
+ override def finishSpecifying: Unit = {
+ if (elemCoder == null) {
+ throw new IllegalStateException("Unable to infer a coder for SetState and no Coder"
+ + " was specified. Please set a coder by either invoking"
+ + " StateSpecs.set(Coder<T> elemCoder) or by registering the coder in the"
+ + " Pipeline's CoderRegistry.");
+ }
+ }
+
+ override def equals(obj: Any): Boolean = {
+ var result = false
+ if (obj == this) result = true
+
+ if (!obj.isInstanceOf[SetStateSpec[_]]) result = false
+
+ implicit var that = obj.asInstanceOf[SetStateSpec[_]]
+ result = Objects.equals(this.elemCoder, that.elemCoder)
+ result
+ }
+
+ override def hashCode(): Int = Objects.hash(getClass, elemCoder)
+ }
+
+ def value[T]: StateSpec[ValueState[T]] = new ValueStateSpec[T](null)
+
+ def value[T](valueCoder: Coder[T]): StateSpec[ValueState[T]] = {
+ if (valueCoder == null) {
+ throw new NullPointerException("valueCoder should not be null. Consider value() instead")
+ }
+
+ new ValueStateSpec[T](valueCoder)
+ }
+
+ def bag[T]: StateSpec[BagState[T]] = new BagStateSpec[T](null)
+
+ def bag[T](elemCoder: Coder[T]): StateSpec[BagState[T]] = new BagStateSpec[T](elemCoder)
+
+ def set[T]: StateSpec[SetState[T]] = new SetStateSpec[T](null)
+
+ def set[T](elemCoder: Coder[T]): StateSpec[SetState[T]] = new SetStateSpec[T](elemCoder)
+
+ def map[K, V]: StateSpec[MapState[K, V]] = new MapStateSpec[K, V](null, null)
+
+ def map[K, V](keyCoder: Coder[K], valueCoder: Coder[V]): StateSpec[MapState[K, V]] =
+ new MapStateSpec[K, V](keyCoder, valueCoder)
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala
new file mode 100644
index 0000000..cbd050a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateTags.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.StateTags.StateKind.StateKind
+import org.apache.gearpump.streaming.refactor.state.api._
+
+object StateTags {
+
+ object StateKind extends Enumeration {
+ type StateKind = Value
+ val SYSTEM = Value("s")
+ val USER = Value("u")
+ }
+
+ private trait SystemStateTag[StateT <: State] {
+ def asKind(kind: StateKind): StateTag[StateT]
+ }
+
+ def tagForSpec[StateT <: State](id: String, spec: StateSpec[StateT]): StateTag[StateT] =
+ new SimpleStateTag[StateT](new StructureId(id), spec)
+
+ def value[T](id: String, valueCoder: Coder[T]): StateTag[ValueState[T]] =
+ new SimpleStateTag[ValueState[T]](new StructureId(id), StateSpecs.value(valueCoder))
+
+ def bag[T](id: String, elemCoder: Coder[T]): StateTag[BagState[T]] =
+ new SimpleStateTag[BagState[T]](new StructureId(id), StateSpecs.bag(elemCoder))
+
+ def set[T](id: String, elemCoder: Coder[T]): StateTag[SetState[T]] =
+ new SimpleStateTag[SetState[T]](new StructureId(id), StateSpecs.set(elemCoder))
+
+ def map[K, V](id: String, keyCoder: Coder[K], valueCoder: Coder[V]): StateTag[MapState[K, V]] =
+ new SimpleStateTag[MapState[K, V]](new StructureId(id), StateSpecs.map(keyCoder, valueCoder))
+
+ private class SimpleStateTag[StateT <: State](id: StructureId, spec: StateSpec[StateT])
+ extends StateTag[StateT] with SystemStateTag[StateT] {
+
+ val aSpec: StateSpec[StateT] = spec
+ val aId: StructureId = id
+
+ override def appendTo(sb: Appendable): Unit = aId.appendTo(sb)
+
+
+ override def getId: String = id.getRawId
+
+ override def getSpec: StateSpec[StateT] = aSpec
+
+ override def bind(binder: StateBinder): StateT = aSpec.bind(aId.getRawId, binder)
+
+ override def asKind(kind: StateKind): StateTag[StateT] =
+ new SimpleStateTag[StateT](aId.asKind(kind), aSpec)
+
+ override def hashCode(): Int = Objects.hash(getClass, getId, getSpec)
+
+ override def equals(obj: Any): Boolean = {
+ if (!(obj.isInstanceOf[SimpleStateTag[_]])) false
+
+ val otherTag: SimpleStateTag[_] = obj.asInstanceOf[SimpleStateTag[_]]
+ Objects.equals(getId, otherTag.getId) && Objects.equals(getSpec, otherTag.getSpec)
+ }
+ }
+
+ private class StructureId(kind: StateKind, rawId: String) extends Serializable {
+
+ private val k: StateKind = kind
+ private val r: String = rawId
+
+ def this(rawId: String) {
+ this(StateKind.USER, rawId)
+ }
+
+ def asKind(kind: StateKind): StructureId = new StructureId(kind, r)
+
+ def appendTo(sb: Appendable): Unit = sb.append(k.toString).append(r)
+
+ def getRawId: String = r
+
+ override def hashCode(): Int = Objects.hash(k, r)
+
+ override def equals(obj: Any): Boolean = {
+ if (obj == this) true
+
+ if (!(obj.isInstanceOf[StructureId])) false
+
+ val that : StructureId = obj.asInstanceOf[StructureId]
+ Objects.equals(k, that.k) && Objects.equals(r, that.r)
+ }
+ }
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
index 6d72e78..0f94052 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
@@ -34,6 +34,13 @@
import org.apache.gearpump.util.LogUtil
import org.apache.gearpump.{Message, TimeStamp}
+<<<<<<< HEAD
+=======
+object StatefulTask {
+ val LOG = LogUtil.getLogger(getClass)
+}
+
+>>>>>>> e6ce91c... [Gearpump 311] refactor state management
abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
extends Task(taskContext, conf) {
@@ -53,7 +60,11 @@
// core state data
var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null
+<<<<<<< HEAD
def open(runtimeContext: RuntimeContext): Unit = {}
+=======
+ def open: Unit = {}
+>>>>>>> e6ce91c... [Gearpump 311] refactor state management
def invoke(message: Message): Unit
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala
new file mode 100644
index 0000000..38d918e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/BagState.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+import java.lang.Iterable
+
+trait BagState[T] extends GroupingState[T, Iterable[T]] {
+
+ def readLater: BagState[T]
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala
new file mode 100644
index 0000000..640cc9e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/CombiningState.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait CombiningState[InputT, AccumT, OutputT] extends GroupingState[InputT, OutputT] {
+
+ def getAccum: AccumT
+
+ def addAccum(accumT: AccumT)
+
+ def mergeAccumulators(accumulators: Iterable[AccumT]): AccumT
+
+ def readLater: CombiningState[InputT, AccumT, OutputT]
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala
new file mode 100644
index 0000000..2f8939a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/GroupingState.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait GroupingState[InputT, OutputT] extends ReadableState[OutputT] with State {
+
+ def add(value: InputT): Unit
+
+ def isEmpty: ReadableState[Boolean]
+
+ def readLater: GroupingState[InputT, OutputT]
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala
new file mode 100644
index 0000000..25de704
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/MapState.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+import java.lang.Iterable
+
+trait MapState[K, V] extends State {
+
+ def put(key : K, value : V): Unit
+
+ def putIfAbsent(key : K, value : V): ReadableState[V]
+
+ def remove(key : K): Unit
+
+ def get(key : K): ReadableState[V]
+
+ def keys: ReadableState[Iterable[K]]
+
+ def values: ReadableState[Iterable[V]]
+
+ def entries: ReadableState[Iterable[java.util.Map.Entry[K, V]]]
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala
new file mode 100644
index 0000000..f6f4d98
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ReadableState.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait ReadableState[T] {
+
+ def read: T
+
+ def readLater: ReadableState[T]
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala
new file mode 100644
index 0000000..e1990b2
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/SetState.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+import java.lang.Iterable
+
+trait SetState[T] extends GroupingState[T, Iterable[T]]{
+
+ def contains(t: T): ReadableState[Boolean]
+
+ def addIfAbsent(t: T): ReadableState[Boolean]
+
+ def remove(t: T): Unit
+
+ def readLater: SetState[T]
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala
new file mode 100644
index 0000000..e3a136d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternals.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+import org.apache.gearpump.streaming.refactor.state.{StateNamespace, StateTag}
+
+trait StateInternals {
+
+ def getKey: Any
+
+ def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala
new file mode 100644
index 0000000..215528c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/StateInternalsFactory.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait StateInternalsFactory[K] extends Serializable {
+
+ def stateInternalsForKey(key: K): StateInternals
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala
new file mode 100644
index 0000000..3555ec4
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/api/ValueState.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.api
+
+trait ValueState[T] extends ReadableState[T] with State {
+
+ def write(input : T): Unit
+
+ def readLater: ValueState[T]
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala
new file mode 100644
index 0000000..12b6e42
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream}
+import java.lang.Iterable
+import java.util
+import java.util.Map.Entry
+import java.util._
+import java.util.Objects
+
+import com.google.common.collect.Table
+import org.apache.gearpump.streaming.refactor.coder.{Coder, ListCoder, MapCoder, SetCoder}
+import org.apache.gearpump.streaming.refactor.state.{StateBinder, StateNamespace, StateSpec, StateTag}
+import org.apache.gearpump.streaming.refactor.state.api._
+import org.apache.gearpump.util.LogUtil
+
+class HeapStateInternals[K](key: K, stateTable: Table[String, String, Array[Byte]])
+ extends StateInternals {
+
+ val LOG = LogUtil.getLogger(getClass)
+
+ private class HeapStateBinder(namespace: StateNamespace, address: StateTag[_])
+ extends StateBinder {
+
+ private val ns: StateNamespace = namespace
+ private val addr: StateTag[_] = address
+
+ override def bindValue[T](id: String, spec: StateSpec[ValueState[T]],
+ coder: Coder[T]): ValueState[T] =
+ new HeapValueState[T](ns, addr.asInstanceOf[StateTag[ValueState[T]]], coder)
+
+ override def bindBag[T](id: String, spec: StateSpec[BagState[T]],
+ elemCoder: Coder[T]): BagState[T] =
+ new HeapBagState[T](ns, addr.asInstanceOf[StateTag[BagState[T]]], elemCoder)
+
+ override def bindSet[T](id: String, spec: StateSpec[SetState[T]],
+ elemCoder: Coder[T]): SetState[T] =
+ new HeapSetState[T](ns, addr.asInstanceOf[StateTag[SetState[T]]], elemCoder)
+
+ override def bindMap[KeyT, ValueT](id: String, spec: StateSpec[MapState[KeyT, ValueT]],
+ mapKeyCoder: Coder[KeyT], mapValueCoder: Coder[ValueT]): MapState[KeyT, ValueT] =
+ new HeapMapState[KeyT, ValueT](ns,
+ addr.asInstanceOf[StateTag[MapState[KeyT, ValueT]]], mapKeyCoder, mapValueCoder)
+
+ }
+
+ override def getKey: Any = key
+
+ override def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T =
+ address.bind(new HeapStateBinder(namespace, address))
+
+ private class AbstractState[T](namespace: StateNamespace, address: StateTag[_ <: State],
+ coder: Coder[T]) {
+
+ protected val ns: StateNamespace = namespace
+ protected val addr: StateTag[_ <: State] = address
+ protected val c: Coder[T] = coder
+
+ protected def readValue: T = {
+ var value: T = null.asInstanceOf[T]
+ val buf: Array[Byte] = stateTable.get(ns.stringKey, addr.getId)
+ if (buf != null) {
+ val is: InputStream = new ByteArrayInputStream(buf)
+ try {
+ value = c.decode(is)
+ } catch {
+ case ex: Exception => throw new RuntimeException(ex)
+ }
+ }
+
+ value
+ }
+
+ def writeValue(input: T): Unit = {
+ val output: ByteArrayOutputStream = new ByteArrayOutputStream();
+ try {
+ c.encode(input, output)
+ stateTable.put(ns.stringKey, addr.getId, output.toByteArray)
+ } catch {
+ case ex: Exception => throw new RuntimeException(ex)
+ }
+ }
+
+ def clear: Unit = stateTable.remove(ns.stringKey, addr.getId)
+
+ override def hashCode(): Int = Objects.hash(ns, addr)
+
+ override def equals(obj: Any): Boolean = {
+ if (obj == this) true
+
+ if (null == obj || getClass != obj.getClass) false
+
+ val that: AbstractState[_] = obj.asInstanceOf[AbstractState[_]]
+ Objects.equals(ns, that.ns) && Objects.equals(addr, that.addr)
+ }
+ }
+
+ private class HeapValueState[T](namespace: StateNamespace,
+ address: StateTag[ValueState[T]], coder: Coder[T])
+ extends AbstractState[T](namespace, address, coder) with ValueState[T] {
+
+ override def write(input: T): Unit = writeValue(input)
+
+ override def readLater: ValueState[T] = this
+
+ override def read: T = readValue
+ }
+
+ private class HeapMapState[MapKT, MapVT](namespace: StateNamespace,
+ address: StateTag[MapState[MapKT, MapVT]], mapKCoder: Coder[MapKT], mapVCoder: Coder[MapVT])
+ extends AbstractState[Map[MapKT, MapVT]](
+ namespace, address, MapCoder.of(mapKCoder, mapVCoder))
+ with MapState[MapKT, MapVT] {
+
+ private def readMap: Map[MapKT, MapVT] = {
+ implicit var map = super.readValue
+ if (map == null || map.size() == 0) {
+ map = new util.HashMap[MapKT, MapVT]
+ }
+
+ map
+ }
+
+ override def put(key: MapKT, value: MapVT): Unit = {
+ implicit var map = readMap
+ map.put(key, value)
+ super.writeValue(map)
+ }
+
+ override def putIfAbsent(key: MapKT, value: MapVT): ReadableState[MapVT] = {
+ implicit var map = readMap
+ implicit val previousVal = map.putIfAbsent(key, value)
+ super.writeValue(map)
+ new ReadableState[MapVT] {
+
+ override def readLater: ReadableState[MapVT] = this
+
+ override def read: MapVT = previousVal
+ }
+ }
+
+ override def remove(key: MapKT): Unit = {
+ implicit var map = readMap
+ map.remove(key)
+ super.writeValue(map)
+ }
+
+ override def get(key: MapKT): ReadableState[MapVT] = {
+ implicit var map = readMap
+ new ReadableState[MapVT] {
+
+ override def read: MapVT = map.get(key)
+
+ override def readLater: ReadableState[MapVT] = this
+ }
+ }
+
+ override def keys: ReadableState[Iterable[MapKT]] = {
+ implicit val map = readMap
+ new ReadableState[Iterable[MapKT]] {
+
+ override def readLater: ReadableState[Iterable[MapKT]] = this
+
+ override def read: Iterable[MapKT] = map.keySet()
+ }
+ }
+
+ override def values: ReadableState[Iterable[MapVT]] = {
+ implicit val map = readMap
+ new ReadableState[Iterable[MapVT]] {
+
+ override def readLater: ReadableState[Iterable[MapVT]] = this
+
+ override def read: Iterable[MapVT] = map.values()
+ }
+ }
+
+ override def entries: ReadableState[Iterable[Entry[MapKT, MapVT]]] = {
+ implicit var map = readMap
+ new ReadableState[Iterable[Entry[MapKT, MapVT]]] {
+
+ override def readLater: ReadableState[Iterable[Entry[MapKT, MapVT]]] = this
+
+ override def read: Iterable[Entry[MapKT, MapVT]] = map.entrySet()
+ }
+ }
+
+ override def clear: Unit = {
+ implicit var map = readMap
+ map.clear()
+ super.writeValue(map)
+ }
+}
+
+ private class HeapBagState[T](namespace: StateNamespace,
+ address: StateTag[BagState[T]], coder: Coder[T])
+ extends AbstractState[List[T]](namespace, address, ListCoder.of(coder)) with BagState[T] {
+
+ override def readLater: BagState[T] = this
+
+ override def add(input: T): Unit = {
+ val value: List[T] = read
+ value.add(input)
+ writeValue(value)
+ }
+
+ override def isEmpty: ReadableState[Boolean] = {
+ new ReadableState[Boolean] {
+
+ override def readLater: ReadableState[Boolean] = this
+
+ override def read: Boolean = stateTable.get(ns.stringKey, addr.getId) == null
+ }
+ }
+
+ override def read: List[T] = {
+ var value: List[T] = super.readValue
+ if (value == null || value.size() == 0) {
+ value = new ArrayList[T]
+ }
+
+ value
+ }
+ }
+
+ private class HeapSetState[T](namespace: StateNamespace,
+ address: StateTag[SetState[T]], coder: Coder[T])
+ extends AbstractState[Set[T]](namespace, address, SetCoder.of(coder)) with SetState[T] {
+
+ override def contains(t: T): ReadableState[Boolean] = {
+ implicit val set = read
+ new ReadableState[Boolean] {
+
+ override def readLater: ReadableState[Boolean] = this
+
+ override def read: Boolean = set.contains(t)
+ }
+ }
+
+ override def addIfAbsent(t: T): ReadableState[Boolean] = {
+ implicit val set = read
+ val success = set.add(t)
+ super.writeValue(set)
+ new ReadableState[Boolean] {
+
+ override def readLater: ReadableState[Boolean] = this
+
+ override def read: Boolean = success
+ }
+ }
+
+ override def remove(t: T): Unit = {
+ implicit var set = read
+ set.remove(t)
+ writeValue(set)
+ }
+
+ override def readLater: SetState[T] = this
+
+ override def add(value: T): Unit = {
+ implicit var set = read
+ set.add(value)
+ writeValue(set)
+ }
+
+ override def isEmpty: ReadableState[Boolean] = {
+ implicit val set = read
+ new ReadableState[Boolean] {
+
+ override def readLater: ReadableState[Boolean] = this
+
+ override def read: Boolean = set.isEmpty
+ }
+ }
+
+ override def read: Set[T] = {
+ var value: Set[T] = super.readValue
+ if (value == null || value.size() == 0) {
+ value = new util.HashSet[T]()
+ }
+
+ value
+ }
+ }
+
+}
+
+
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala
new file mode 100644
index 0000000..db20d66
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsFactory.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import org.apache.gearpump.streaming.refactor.coder.{Coder, CoderException, CoderUtils}
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, StateInternalsFactory}
+import java.util._
+
+import com.google.common.collect.{HashBasedTable, Table}
+import org.apache.gearpump.util.LogUtil
+
+class HeapStateInternalsFactory[K](keyCoder: Coder[K],
+ map: Map[String, Table[String, String, Array[Byte]]])
+ extends StateInternalsFactory[K] with Serializable {
+
+ private val LOG = LogUtil.getLogger(getClass)
+
+ private val kc: Coder[K] = keyCoder
+ private val perKeyState: Map[String, Table[String, String, Array[Byte]]] = map
+
+ def getKeyCoder: Coder[K] = {
+ this.kc
+ }
+
+ override def stateInternalsForKey(key: K): StateInternals = {
+ var keyBytes: Option[Array[Byte]] = None
+ if (key != null) {
+ keyBytes = Some(CoderUtils.encodeToByteArray(kc, key))
+ }
+
+ if (keyBytes.isEmpty) {
+ throw new RuntimeException("key bytes is null or empty, encode key occurs a error")
+ }
+
+ val keyBased64Str = Base64.getEncoder.encodeToString(keyBytes.get)
+ var stateTable: Table[String, String, Array[Byte]] = perKeyState.get(keyBased64Str)
+ if (stateTable == null) {
+ LOG.info("stateTable is null, will create!")
+ stateTable = HashBasedTable.create()
+ perKeyState.put(keyBased64Str, stateTable)
+ }
+
+ new HeapStateInternals[K](key, stateTable)
+ }
+
+}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala
new file mode 100644
index 0000000..2f85dd9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsProxy.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.{StateNamespace, StateTag}
+import org.apache.gearpump.streaming.refactor.state.api.{State, StateInternals, StateInternalsFactory}
+
+class HeapStateInternalsProxy[K](heapStateInternalsFactory: HeapStateInternalsFactory[K])
+ extends StateInternals with Serializable {
+
+ private val factory: HeapStateInternalsFactory[K] = heapStateInternalsFactory
+
+ @transient
+ private var currentKey: K = _
+
+ def getFactory: StateInternalsFactory[K] = {
+ factory
+ }
+
+ def getKeyCoder: Coder[K] = {
+ factory.getKeyCoder
+ }
+
+ override def getKey: K = {
+ currentKey
+ }
+
+ def setKey(key: K): Unit = {
+ currentKey = key
+ }
+
+ override def state[T <: State](namespace: StateNamespace, address: StateTag[T]): T = {
+ factory.stateInternalsForKey(currentKey).state(namespace, address)
+ }
+}
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
new file mode 100644
index 0000000..f11299e
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternalsSpec.scala
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import java.util
+import java.util.{Iterator, Map}
+
+import com.google.common.collect.Table
+import org.apache.gearpump.streaming.refactor.coder.StringUtf8Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, SetState, ValueState}
+import org.apache.gearpump.streaming.refactor.state.{StateNamespaces, StateTags}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class HeapStateInternalsSpec
+ extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
+
+ property("HeapStateInternalsProxy should return correct key coder") {
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory: HeapStateInternalsFactory[String] =
+ new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+ val proxy: HeapStateInternalsProxy[String] = new HeapStateInternalsProxy[String](factory)
+
+ factory.getKeyCoder shouldBe StringUtf8Coder.of
+ }
+
+ // region value state
+ property("test heap value state: write heap state should equals read state") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val value = "hello world"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+ val stateInternals = factory.stateInternalsForKey(key)
+ val valueState = stateInternals.state[ValueState[String]](namespace,
+ StateTags.value(stateId, StringUtf8Coder.of))
+
+ valueState.write(value)
+ valueState.read shouldBe value
+ }
+
+ property("test heap value state: write heap state should not equals read state " +
+ "for different state id") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val newStateId = "02"
+ implicit val value = "hello world"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+ val stateInternals = factory.stateInternalsForKey(key)
+ val valueState = stateInternals.state[ValueState[String]](namespace,
+ StateTags.value(stateId, StringUtf8Coder.of))
+
+ valueState.write(value)
+
+ val newValueState = stateInternals.state[ValueState[String]](namespace,
+ StateTags.value(newStateId, StringUtf8Coder.of))
+ newValueState.read shouldNot be(value)
+ }
+
+ property("test heap value state: write heap state should equals read state " +
+ "for different key") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val value = "hello world"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val valueState = stateInternals.state[ValueState[String]](namespace,
+ StateTags.value(stateId, StringUtf8Coder.of))
+
+ val newStateInternals = factory.stateInternalsForKey(newKey)
+ val newValueState = newStateInternals.state[ValueState[String]](namespace,
+ StateTags.value(stateId, StringUtf8Coder.of))
+
+ valueState.write(value)
+ newValueState.read shouldNot be(value)
+ }
+ // endregion
+
+ // region bag state
+ property("test heap Bag state: write heap state should equals read state") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val bagValue1 = "bagValue1"
+ implicit val bagValue2 = "bagValue2"
+ implicit val stateId = "01"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val bagState = stateInternals.state[BagState[String]](namespace,
+ StateTags.bag(stateId, StringUtf8Coder.of))
+
+ bagState.add(bagValue1)
+ bagState.add(bagValue2)
+
+ val bagIterator: Iterator[String] = bagState.read.iterator()
+
+ implicit var counter = 0
+
+ while (bagIterator.hasNext) {
+ counter += 1
+ if (counter == 1) {
+ bagIterator.next() shouldBe bagValue1
+ }
+ if (counter == 2) {
+ bagIterator.next() shouldBe bagValue2
+ }
+ }
+
+ counter shouldBe 2
+ }
+
+ property("test heap Bag state: write heap state should not equal read state with " +
+ "different key") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val bagValue1 = "bagValue1"
+ implicit val bagValue2 = "bagValue2"
+ implicit val stateId = "01"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val newStateInternals = factory.stateInternalsForKey(newKey)
+
+ val bagState = stateInternals.state[BagState[String]](namespace,
+ StateTags.bag(stateId, StringUtf8Coder.of))
+ val newBagState = newStateInternals.state[BagState[String]](namespace,
+ StateTags.bag(stateId, StringUtf8Coder.of))
+
+ bagState.add(bagValue1)
+ bagState.add(bagValue2)
+
+ val newBagIterator: Iterator[String] = newBagState.read.iterator()
+
+ implicit var counter = 0
+
+ while (newBagIterator.hasNext) {
+ counter += 1
+ newBagIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+
+ property("test heap Bag state: write heap state should not equal read state " +
+ "with different stateId") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val bagValue1 = "bagValue1"
+ implicit val bagValue2 = "bagValue2"
+ implicit val stateId = "01"
+ implicit val newStateId = "02"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val bagState = stateInternals.state[BagState[String]](namespace,
+ StateTags.bag(stateId, StringUtf8Coder.of))
+ val newBagState = stateInternals.state[BagState[String]](namespace,
+ StateTags.bag(newStateId, StringUtf8Coder.of))
+
+ bagState.add(bagValue1)
+ bagState.add(bagValue2)
+
+ val newBagIterator: Iterator[String] = newBagState.read.iterator()
+
+ implicit var counter = 0
+
+ while (newBagIterator.hasNext) {
+ counter += 1
+ newBagIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+ // endregion
+
+ // region set state
+ property("test heap set state, generic methods") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val setValue1 = "setValue1"
+ implicit val setValue2 = "setValue2"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val setState = stateInternals.state[SetState[String]](namespace,
+ StateTags.set(stateId, StringUtf8Coder.of))
+
+ setState.add(setValue1)
+ setState.add(setValue2)
+
+ implicit var setStateIterator = setState.read.iterator()
+
+ implicit var counter = 0
+ while (setStateIterator.hasNext) {
+ counter += 1
+ setStateIterator.next()
+ }
+
+ counter shouldBe 2
+
+ setState.addIfAbsent(setValue2).read shouldBe false
+
+ setStateIterator = setState.read.iterator()
+
+ counter = 0
+ while (setStateIterator.hasNext) {
+ counter += 1
+ setStateIterator.next()
+ }
+
+ counter shouldBe 2
+
+ setState.contains(setValue1).read shouldBe true
+ setState.contains("setValue03").read shouldBe false
+
+ setState.isEmpty.read shouldBe false
+
+ setState.remove(setValue1)
+ setState.remove(setValue2)
+
+ setState.isEmpty.read shouldBe true
+ }
+
+ property("test heap set state, write state should not equal read state " +
+ "with different key") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val setValue1 = "setValue1"
+ implicit val setValue2 = "setValue2"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val newStateInternals = factory.stateInternalsForKey(newKey)
+
+ val setState = stateInternals.state[SetState[String]](namespace,
+ StateTags.set(stateId, StringUtf8Coder.of))
+ val newSetState = newStateInternals.state(namespace,
+ StateTags.set(stateId, StringUtf8Coder.of))
+
+ setState.add(setValue1)
+ setState.add(setValue2)
+
+ implicit val newSetStateIterator = newSetState.read.iterator()
+
+ var counter = 0
+ while (newSetStateIterator.hasNext) {
+ counter += 1
+ newSetStateIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+
+ property("test heap set state, write state shuold not equal read state " +
+ "with different state id") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val newStateId = "02"
+ implicit val setValue1 = "setValue1"
+ implicit val setValue2 = "setValue2"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+
+ val setState = stateInternals.state(namespace, StateTags.set(stateId, StringUtf8Coder.of))
+ val newSetState = stateInternals.state(namespace,
+ StateTags.set(newStateId, StringUtf8Coder.of))
+
+ setState.add(setValue1)
+ setState.addIfAbsent(setValue2)
+
+ implicit val setStateIterator = newSetState.read.iterator()
+
+ var counter = 0
+ while (setStateIterator.hasNext) {
+ counter += 1
+ setStateIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+ // endregion
+
+ // region map state
+ property("test map state, generic methods") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val mapStateKey1 = "mapKey01"
+ implicit val mapStateValue1 = "mapValue01"
+ implicit val mapStateKey2 = "mapKey02"
+ implicit val mapStateValue2 = "mapValue02"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val mapState = stateInternals.state(namespace,
+ StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+
+ mapState.put(mapStateKey1, mapStateValue1)
+
+ implicit var mapKeysIterator = mapState.keys.read.iterator()
+
+ mapState.putIfAbsent(mapStateKey1, mapStateValue2).read shouldBe mapStateValue1
+
+ var counter = 0
+ while (mapKeysIterator.hasNext) {
+ counter += 1
+ mapKeysIterator.next()
+ }
+
+ counter shouldBe 1
+
+ counter = 0
+ implicit val mapValuesIterator = mapState.values.read.iterator()
+ while (mapValuesIterator.hasNext) {
+ counter += 1
+ mapValuesIterator.next()
+ }
+
+ counter shouldBe 1
+
+ counter = 0
+ implicit val mapEntriesIterator = mapState.entries.read.iterator()
+ while (mapEntriesIterator.hasNext) {
+ counter += 1
+ mapEntriesIterator.next()
+ }
+
+ counter shouldBe 1
+
+ mapState.get(mapStateKey1).read shouldBe mapStateValue1
+ mapState.get("test01").read shouldBe null
+
+ mapState.remove(mapStateKey1)
+
+ counter = 0
+ mapKeysIterator = mapState.keys.read.iterator()
+ while (mapKeysIterator.hasNext) {
+ counter += 1
+ mapKeysIterator.next()
+ }
+
+ counter shouldBe 0
+
+ mapState.putIfAbsent(mapStateKey2, mapStateValue2)
+
+ counter = 0
+ mapKeysIterator = mapState.keys.read.iterator()
+ while (mapKeysIterator.hasNext) {
+ counter += 1
+ mapKeysIterator.next()
+ }
+
+ counter shouldBe 1
+
+ mapState.clear
+
+ counter = 0
+ mapKeysIterator = mapState.keys.read.iterator()
+ while (mapKeysIterator.hasNext) {
+ counter += 1
+ mapKeysIterator.next()
+ }
+
+ counter shouldBe 0
+ }
+
+ property("test map state, write state should not equal read state " +
+ "with different key") {
+ implicit val key = "key"
+ implicit val newKey = "newKey"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val mapStateKey1 = "mapKey01"
+ implicit val mapStateValue1 = "mapValue01"
+ implicit val mapStateKey2 = "mapKey02"
+ implicit val mapStateValue2 = "mapValue02"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+ val newStateInternals = factory.stateInternalsForKey(newKey)
+
+ val mapState = stateInternals.state(namespace,
+ StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+ val newMapState = newStateInternals.state(namespace,
+ StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+
+ mapState.put(mapStateKey1, mapStateValue1)
+
+ mapState.get(mapStateKey1).read shouldBe mapStateValue1
+ newMapState.get(mapStateKey1).read shouldNot be(mapStateValue1)
+ }
+
+ property("test map state, write state should not equal read state " +
+ "with different state id") {
+ implicit val key = "key"
+ implicit val namespace = StateNamespaces.global
+ implicit val stateId = "01"
+ implicit val newStateId = "02"
+ implicit val mapStateKey1 = "mapKey01"
+ implicit val mapStateValue1 = "mapValue01"
+ implicit val mapStateKey2 = "mapKey02"
+ implicit val mapStateValue2 = "mapValue02"
+
+ val map: Map[String, Table[String, String, Array[Byte]]]
+ = new util.HashMap[String, Table[String, String, Array[Byte]]]()
+ val factory = new HeapStateInternalsFactory[String](StringUtf8Coder.of, map)
+
+ val stateInternals = factory.stateInternalsForKey(key)
+
+ val mapState = stateInternals.state(namespace,
+ StateTags.map(stateId, StringUtf8Coder.of, StringUtf8Coder.of))
+ val newMapState = stateInternals.state(namespace,
+ StateTags.map(newStateId, StringUtf8Coder.of, StringUtf8Coder.of))
+
+ mapState.put(mapStateKey1, mapStateValue1)
+
+ mapState.get(mapStateKey1).read shouldBe mapStateValue1
+ newMapState.get(mapStateKey1).read shouldNot be(mapStateValue1)
+ }
+ // endregion
+
+}