Fixes #82 - Moved TypeLayer from Fluo API to Fluo Recipes
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/Encoder.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/Encoder.java
new file mode 100644
index 0000000..6b1b626
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/Encoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * Transforms Java primitives to and from bytes using desired encoding
+ *
+ * @since 1.0.0
+ */
+public interface Encoder {
+
+ /**
+ * Encodes an integer to {@link Bytes}
+ */
+ Bytes encode(int i);
+
+ /**
+ * Encodes a long to {@link Bytes}
+ */
+ Bytes encode(long l);
+
+ /**
+ * Encodes a String to {@link Bytes}
+ */
+ Bytes encode(String s);
+
+ /**
+ * Encodes a float to {@link Bytes}
+ */
+ Bytes encode(float f);
+
+ /**
+ * Encodes a double to {@link Bytes}
+ */
+ Bytes encode(double d);
+
+ /**
+ * Encodes a boolean to {@link Bytes}
+ */
+ Bytes encode(boolean b);
+
+ /**
+ * Decodes an integer from {@link Bytes}
+ */
+ int decodeInteger(Bytes b);
+
+ /**
+ * Decodes a long from {@link Bytes}
+ */
+ long decodeLong(Bytes b);
+
+ /**
+ * Decodes a String from {@link Bytes}
+ */
+ String decodeString(Bytes b);
+
+ /**
+ * Decodes a float from {@link Bytes}
+ */
+ float decodeFloat(Bytes b);
+
+ /**
+ * Decodes a double from {@link Bytes}
+ */
+ double decodeDouble(Bytes b);
+
+ /**
+ * Decodes a boolean from {@link Bytes}
+ */
+ boolean decodeBoolean(Bytes b);
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/StringEncoder.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/StringEncoder.java
new file mode 100644
index 0000000..10524ed
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/StringEncoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.data.Bytes;
+
+/**
+ * Transforms Java primitives to and from bytes using a String encoding
+ *
+ * @since 1.0.0
+ */
+public class StringEncoder implements Encoder {
+
+ @Override
+ public Bytes encode(int i) {
+ return encode(Integer.toString(i));
+ }
+
+ @Override
+ public Bytes encode(long l) {
+ return encode(Long.toString(l));
+ }
+
+ @Override
+ public Bytes encode(String s) {
+ return Bytes.of(s);
+ }
+
+ @Override
+ public Bytes encode(float f) {
+ return encode(Float.toString(f));
+ }
+
+ @Override
+ public Bytes encode(double d) {
+ return encode(Double.toString(d));
+ }
+
+ @Override
+ public Bytes encode(boolean b) {
+ return encode(Boolean.toString(b));
+ }
+
+ @Override
+ public int decodeInteger(Bytes b) {
+ return Integer.parseInt(decodeString(b));
+ }
+
+ @Override
+ public long decodeLong(Bytes b) {
+ return Long.parseLong(decodeString(b));
+ }
+
+ @Override
+ public String decodeString(Bytes b) {
+ return b.toString();
+ }
+
+ @Override
+ public float decodeFloat(Bytes b) {
+ return Float.parseFloat(decodeString(b));
+ }
+
+ @Override
+ public double decodeDouble(Bytes b) {
+ return Double.parseDouble(decodeString(b));
+ }
+
+ @Override
+ public boolean decodeBoolean(Bytes b) {
+ return Boolean.parseBoolean(decodeString(b));
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypeLayer.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypeLayer.java
new file mode 100644
index 0000000..0bd189f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypeLayer.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.nio.ByteBuffer;
+
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+
+/**
+ * A simple convenience layer for Fluo. This layer attempts to make the following common operations
+ * easier.
+ *
+ * <UL>
+ * <LI>Working with different types.
+ * <LI>Supplying default values
+ * <LI>Dealing with null return types.
+ * <LI>Working with row/column and column maps
+ * </UL>
+ *
+ * <p>
+ * This layer was intentionally loosely coupled with the basic API. This allows other convenience
+ * layers for Fluo to build directly on the basic API w/o having to consider the particulars of this
+ * layer. Also its expected that integration with other languages may only use the basic API.
+ * </p>
+ *
+ * <h3>Using</h3>
+ *
+ * <p>
+ * A TypeLayer is created with a certain encoder that is used for converting from bytes to
+ * primitives and visa versa. In order to ensure that all of your code uses the same encoder, its
+ * probably best to centralize the choice of an encoder within your project. There are many ways do
+ * to this, below is an example of one way to centralize and use.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * public class MyTypeLayer extends TypeLayer {
+ * public MyTypeLayer() {
+ * super(new MyEncoder());
+ * }
+ * }
+ *
+ * public class MyObserver extends TypedObserver {
+ * MyObserver(){
+ * super(new MyTypeLayer());
+ * }
+ *
+ * public abstract void process(TypedTransaction tx, Bytes row, Column col){
+ * //do something w/ typed transaction
+ * }
+ * }
+ *
+ * public class MyUtil {
+ * //A little util to print out some stuff
+ * public void printStuff(Snapshot snap, byte[] row){
+ * TypedSnapshot tsnap = new MytTypeLayer().wrap(snap);
+ *
+ * System.out.println(tsnap.get().row(row).fam("b90000").qual(137).toString("NP"));
+ * }
+ * }
+ * </code>
+ * </pre>
+ *
+ * <h3>Working with different types</h3>
+ *
+ * <p>
+ * The following example code shows using the basic fluo API with different types.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void process(Transaction tx, byte[] row, byte[] cf, int cq, long val){
+ * tx.set(Bytes.of(row), new Column(Bytes.of(cf), Bytes.of(Integer.toString(cq))),
+ * Bytes.of(Long.toString(val));
+ * }
+ * </code>
+ * </pre>
+ *
+ * <p>
+ * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
+ * following way. Because row(), fam(), qual(), and set() each take many different types, this
+ * enables many different permutations that would not be achievable with overloading.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void process(TypedTransaction tx, byte[] r, byte[] cf, int cq, long v){
+ * tx.mutate().row(r).fam(cf).qual(cq).set(v);
+ * }
+ * </code>
+ * </pre>
+ *
+ * <h3>Default values</h3>
+ *
+ * <p>
+ * The following example code shows using the basic fluo API to read a value and default to zero if
+ * it does not exist.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void add(Transaction tx, byte[] row, Column col, long amount){
+ *
+ * long balance = 0;
+ * Bytes bval = tx.get(Bytes.of(row), col);
+ * if(bval != null)
+ * balance = Long.parseLong(bval.toString());
+ *
+ * balance += amount;
+ *
+ * tx.set(Bytes.of(row), col, Bytes.of(Long.toString(amount)));
+ *
+ * }
+ * </code>
+ * </pre>
+ *
+ * <p>
+ * Alternatively, the same thing can be written using a {@link TypedTransactionBase} in the
+ * following way. This code avoids the null check by supplying a default value of zero.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void add(TypedTransaction tx, byte[] r, Column c, long amount){
+ * long balance = tx.get().row(r).col(c).toLong(0);
+ * balance += amount;
+ * tx.mutate().row(r).col(c).set(balance);
+ * }
+ * </code>
+ * </pre>
+ *
+ * <p>
+ * For this particular case, shorter code can be written by using the increment method.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void add(TypedTransaction tx, byte[] r, Column c, long amount){
+ * tx.mutate().row(r).col(c).increment(amount);
+ * }
+ * </code>
+ * </pre>
+ *
+ * <h3>Null return types</h3>
+ *
+ * <p>
+ * When using the basic API, you must ensure the return type is not null before converting a string
+ * or long.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void process(Transaction tx, byte[] row, Column col, long amount) {
+ * Bytes val = tx.get(Bytes.of(row), col);
+ * if(val == null)
+ * return;
+ * long balance = Long.parseLong(val.toString());
+ * }
+ * </code>
+ * </pre>
+ *
+ * <p>
+ * With {@link TypedTransactionBase} if no default value is supplied, then the null is passed
+ * through.
+ * </p>
+ *
+ * <pre>
+ * <code>
+ *
+ * void process(TypedTransaction tx, byte[] r, Column c, long amount){
+ * Long balance = tx.get().row(r).col(c).toLong();
+ * if(balance == null)
+ * return;
+ * }
+ * </code>
+ * </pre>
+ *
+ * <h3>Defaulted maps</h3>
+ *
+ * <p>
+ * The operations that return maps, return defaulted maps which make it easy to specify defaults and
+ * avoid null.
+ * </p>
+ *
+ * <pre>
+ * {@code
+ * // pretend this method has curly braces. javadoc has issues with less than.
+ *
+ * void process(TypedTransaction tx, byte[] r, Column c1, Column c2, Column c3, long amount)
+ *
+ * Map<Column, Value> columns = tx.get().row(r).columns(c1,c2,c3);
+ *
+ * // If c1 does not exist in map, a Value that wraps null will be returned.
+ * // When c1 does not exist val1 will be set to null and no NPE will be thrown.
+ * String val1 = columns.get(c1).toString();
+ *
+ * // If c2 does not exist in map, then val2 will be set to empty string.
+ * String val2 = columns.get(c2).toString("");
+ *
+ * // If c3 does not exist in map, then val9 will be set to 9.
+ * Long val3 = columns.get(c3).toLong(9);
+ * }
+ * </pre>
+ *
+ * <p>
+ * This also applies to getting sets of rows.
+ * </p>
+ *
+ * <pre>
+ * {@code
+ * // pretend this method has curly braces. javadoc has issues with less than.
+ *
+ * void process(TypedTransaction tx, List<String> rows, Column c1, Column c2, Column c3,
+ * long amount)
+ *
+ * Map<String,Map<Column,Value>> rowCols =
+ * tx.get().rowsString(rows).columns(c1,c2,c3).toStringMap();
+ *
+ * // this will set val1 to null if row does not exist in map and/or column does not
+ * // exist in child map
+ * String val1 = rowCols.get("row1").get(c1).toString();
+ * }
+ * </pre>
+ *
+ * @since 1.0.0
+ */
+public class TypeLayer {
+
+ private Encoder encoder;
+
+ static class Data {
+ Bytes row;
+ Bytes family;
+ Bytes qual;
+ Bytes vis;
+
+ Column getCol() {
+ if (qual == null) {
+ return new Column(family);
+ } else if (vis == null) {
+ return new Column(family, qual);
+ } else {
+ return new Column(family, qual, vis);
+ }
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public abstract class RowMethods<R> {
+
+ abstract R create(Data data);
+
+ public R row(String row) {
+ return row(encoder.encode(row));
+ }
+
+ public R row(int row) {
+ return row(encoder.encode(row));
+ }
+
+ public R row(long row) {
+ return row(encoder.encode(row));
+ }
+
+ public R row(byte[] row) {
+ return row(Bytes.of(row));
+ }
+
+ public R row(ByteBuffer row) {
+ return row(Bytes.of(row));
+ }
+
+ public R row(Bytes row) {
+ Data data = new Data();
+ data.row = row;
+ R result = create(data);
+ return result;
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public abstract class SimpleFamilyMethods<R1> {
+
+ protected Data data;
+
+ SimpleFamilyMethods(Data data) {
+ this.data = data;
+ }
+
+ abstract R1 create1(Data data);
+
+ public R1 fam(String family) {
+ return fam(encoder.encode(family));
+ }
+
+ public R1 fam(int family) {
+ return fam(encoder.encode(family));
+ }
+
+ public R1 fam(long family) {
+ return fam(encoder.encode(family));
+ }
+
+ public R1 fam(byte[] family) {
+ return fam(Bytes.of(family));
+ }
+
+ public R1 fam(ByteBuffer family) {
+ return fam(Bytes.of(family));
+ }
+
+ public R1 fam(Bytes family) {
+ data.family = family;
+ return create1(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public abstract class FamilyMethods<R1, R2> extends SimpleFamilyMethods<R1> {
+
+ FamilyMethods(Data data) {
+ super(data);
+ }
+
+ abstract R2 create2(Data data);
+
+ public R2 col(Column col) {
+ data.family = col.getFamily();
+ data.qual = col.getQualifier();
+ data.vis = col.getVisibility();
+ return create2(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public abstract class QualifierMethods<R> {
+
+ protected Data data;
+
+ QualifierMethods(Data data) {
+ this.data = data;
+ }
+
+ abstract R create(Data data);
+
+ public R qual(String qualifier) {
+ return qual(encoder.encode(qualifier));
+ }
+
+ public R qual(int qualifier) {
+ return qual(encoder.encode(qualifier));
+ }
+
+ public R qual(long qualifier) {
+ return qual(encoder.encode(qualifier));
+ }
+
+ public R qual(byte[] qualifier) {
+ return qual(Bytes.of(qualifier));
+ }
+
+ public R qual(ByteBuffer qualifier) {
+ return qual(Bytes.of(qualifier));
+ }
+
+ public R qual(Bytes qualifier) {
+ data.qual = qualifier;
+ return create(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public static class VisibilityMethods {
+
+ private Data data;
+
+ public VisibilityMethods(Data data) {
+ this.data = data;
+ }
+
+ public Column vis() {
+ return new Column(data.family, data.qual);
+ }
+
+ public Column vis(String cv) {
+ return vis(Bytes.of(cv));
+ }
+
+ public Column vis(Bytes cv) {
+ return new Column(data.family, data.qual, cv);
+ }
+
+ public Column vis(ByteBuffer cv) {
+ return vis(Bytes.of(cv));
+ }
+
+ public Column vis(byte[] cv) {
+ return vis(Bytes.of(cv));
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class CQB extends QualifierMethods<VisibilityMethods> {
+ CQB(Data data) {
+ super(data);
+ }
+
+ @Override
+ VisibilityMethods create(Data data) {
+ return new VisibilityMethods(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class CFB extends SimpleFamilyMethods<CQB> {
+ CFB() {
+ super(new Data());
+ }
+
+ @Override
+ CQB create1(Data data) {
+ return new CQB(data);
+ }
+ }
+
+ public TypeLayer(Encoder encoder) {
+ this.encoder = encoder;
+ }
+
+ /**
+ * Initiates the chain of calls needed to build a column.
+ *
+ * @return a column builder
+ */
+ public CFB bc() {
+ return new CFB();
+ }
+
+ public TypedSnapshot wrap(Snapshot snap) {
+ return new TypedSnapshot(snap, encoder, this);
+ }
+
+ public TypedTransactionBase wrap(TransactionBase tx) {
+ return new TypedTransactionBase(tx, encoder, this);
+ }
+
+ public TypedTransaction wrap(Transaction tx) {
+ return new TypedTransaction(tx, encoder, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedLoader.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedLoader.java
new file mode 100644
index 0000000..be5625c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedLoader.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.Loader;
+import org.apache.fluo.api.client.TransactionBase;
+
+/**
+ * A {@link Loader} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public abstract class TypedLoader implements Loader {
+
+ private final TypeLayer tl;
+
+ public TypedLoader() {
+ tl = new TypeLayer(new StringEncoder());
+ }
+
+ public TypedLoader(TypeLayer tl) {
+ this.tl = tl;
+ }
+
+ @Override
+ public void load(TransactionBase tx, Context context) throws Exception {
+ load(tl.wrap(tx), context);
+ }
+
+ public abstract void load(TypedTransactionBase tx, Context context) throws Exception;
+
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedObserver.java
new file mode 100644
index 0000000..799ed50
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedObserver.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+
+/**
+ * An {@link AbstractObserver} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public abstract class TypedObserver extends AbstractObserver {
+
+ private final TypeLayer tl;
+
+ public TypedObserver() {
+ tl = new TypeLayer(new StringEncoder());
+ }
+
+ public TypedObserver(TypeLayer tl) {
+ this.tl = tl;
+ }
+
+ @Override
+ public void process(TransactionBase tx, Bytes row, Column col) {
+ process(tl.wrap(tx), row, col);
+ }
+
+ public abstract void process(TypedTransactionBase tx, Bytes row, Column col);
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshot.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshot.java
new file mode 100644
index 0000000..033d4de
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshot.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.Snapshot;
+
+/**
+ * A {@link Snapshot} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedSnapshot extends TypedSnapshotBase implements Snapshot {
+
+ private final Snapshot closeSnapshot;
+
+ TypedSnapshot(Snapshot snapshot, Encoder encoder, TypeLayer tl) {
+ super(snapshot, encoder, tl);
+ closeSnapshot = snapshot;
+ }
+
+ @Override
+ public void close() {
+ closeSnapshot.close();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshotBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshotBase.java
new file mode 100644
index 0000000..47722bd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedSnapshotBase.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.map.DefaultedMap;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.recipes.types.TypeLayer.Data;
+import org.apache.fluo.recipes.types.TypeLayer.FamilyMethods;
+import org.apache.fluo.recipes.types.TypeLayer.QualifierMethods;
+import org.apache.fluo.recipes.types.TypeLayer.RowMethods;
+
+// TODO need to refactor column to use Encoder
+
+/**
+ * A {@link SnapshotBase} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedSnapshotBase implements SnapshotBase {
+
+ private SnapshotBase snapshot;
+ private Encoder encoder;
+ private TypeLayer tl;
+
+ /**
+ * @since 1.0.0
+ */
+ public class VisibilityMethods extends Value {
+
+ public VisibilityMethods(Data data) {
+ super(data);
+ }
+
+ public Value vis(Bytes cv) {
+ data.vis = cv;
+ return new Value(data);
+ }
+
+ public Value vis(byte[] cv) {
+ data.vis = Bytes.of(cv);
+ return new Value(data);
+ }
+
+ public Value vis(ByteBuffer bb) {
+ data.vis = Bytes.of(bb);
+ return new Value(data);
+ }
+
+ public Value vis(String cv) {
+ data.vis = Bytes.of(cv);
+ return new Value(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class Value {
+ private Bytes bytes;
+ private boolean gotBytes = false;
+ protected Data data;
+
+ public Bytes getBytes() {
+ if (!gotBytes) {
+ try {
+ bytes = snapshot.get(data.row, data.getCol());
+ gotBytes = true;
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ return bytes;
+ }
+
+ private Value(Bytes bytes) {
+ this.bytes = bytes;
+ this.gotBytes = true;
+ }
+
+ private Value(Data data) {
+ this.data = data;
+ this.gotBytes = false;
+ }
+
+ public Integer toInteger() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeInteger(getBytes());
+ }
+
+ public int toInteger(int defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeInteger(getBytes());
+ }
+
+ public Long toLong() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeLong(getBytes());
+ }
+
+ public long toLong(long defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeLong(getBytes());
+ }
+
+ @Override
+ public String toString() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeString(getBytes());
+ }
+
+ public String toString(String defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeString(getBytes());
+ }
+
+ public Float toFloat() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeFloat(getBytes());
+ }
+
+ public float toFloat(float defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeFloat(getBytes());
+ }
+
+ public Double toDouble() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeDouble(getBytes());
+ }
+
+ public double toDouble(double defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeDouble(getBytes());
+ }
+
+ public Boolean toBoolean() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return encoder.decodeBoolean(getBytes());
+ }
+
+ public boolean toBoolean(boolean defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return encoder.decodeBoolean(getBytes());
+ }
+
+ public byte[] toBytes() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return getBytes().toArray();
+ }
+
+ public byte[] toBytes(byte[] defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return getBytes().toArray();
+ }
+
+ public ByteBuffer toByteBuffer() {
+ if (getBytes() == null) {
+ return null;
+ }
+ return ByteBuffer.wrap(getBytes().toArray());
+ }
+
+ public ByteBuffer toByteBuffer(ByteBuffer defaultValue) {
+ if (getBytes() == null) {
+ return defaultValue;
+ }
+ return toByteBuffer();
+ }
+
+ @Override
+ public int hashCode() {
+ if (getBytes() == null) {
+ return 0;
+ }
+
+ return getBytes().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof Value) {
+ Value ov = (Value) o;
+ if (getBytes() == null) {
+ return ov.getBytes() == null;
+ } else {
+ return getBytes().equals(ov.getBytes());
+ }
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class ValueQualifierBuilder extends QualifierMethods<VisibilityMethods> {
+
+ ValueQualifierBuilder(Data data) {
+ tl.super(data);
+ }
+
+ @Override
+ VisibilityMethods create(Data data) {
+ return new VisibilityMethods(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class ValueFamilyMethods extends FamilyMethods<ValueQualifierBuilder, Value> {
+
+ ValueFamilyMethods(Data data) {
+ tl.super(data);
+ }
+
+ @Override
+ ValueQualifierBuilder create1(Data data) {
+ return new ValueQualifierBuilder(data);
+ }
+
+ @Override
+ Value create2(Data data) {
+ return new Value(data);
+ }
+
+ public Map<Column, Value> columns(Set<Column> columns) {
+ try {
+ return wrap(snapshot.get(data.row, columns));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Map<Column, Value> columns(Column... columns) {
+ try {
+ return wrap(snapshot.get(data.row, new HashSet<>(Arrays.asList(columns))));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class MapConverter {
+ private Collection<Bytes> rows;
+ private Set<Column> columns;
+
+ public MapConverter(Collection<Bytes> rows, Set<Column> columns) {
+ this.rows = rows;
+ this.columns = columns;
+ }
+
+ private Map<Bytes, Map<Column, Bytes>> getInput() {
+ try {
+ return snapshot.get(rows, columns);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private Map wrap2(Map m) {
+ return Collections.unmodifiableMap(DefaultedMap.decorate(m, new DefaultedMap(new Value(
+ (Bytes) null))));
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String, Map<Column, Value>> toStringMap() {
+ Map<Bytes, Map<Column, Bytes>> in = getInput();
+ Map<String, Map<Column, Value>> out = new HashMap<>();
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+ out.put(encoder.decodeString(rowEntry.getKey()), wrap(rowEntry.getValue()));
+ }
+
+ return wrap2(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<Long, Map<Column, Value>> toLongMap() {
+ Map<Bytes, Map<Column, Bytes>> in = getInput();
+ Map<Long, Map<Column, Value>> out = new HashMap<>();
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+ out.put(encoder.decodeLong(rowEntry.getKey()), wrap(rowEntry.getValue()));
+ }
+
+ return wrap2(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<Integer, Map<Column, Value>> toIntegerMap() {
+ Map<Bytes, Map<Column, Bytes>> in = getInput();
+ Map<Integer, Map<Column, Value>> out = new HashMap<>();
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+ out.put(encoder.decodeInteger(rowEntry.getKey()), wrap(rowEntry.getValue()));
+ }
+
+ return wrap2(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<Bytes, Map<Column, Value>> toBytesMap() {
+ Map<Bytes, Map<Column, Bytes>> in = getInput();
+ Map<Bytes, Map<Column, Value>> out = new HashMap<>();
+
+ for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) {
+ out.put(rowEntry.getKey(), wrap(rowEntry.getValue()));
+ }
+
+ return wrap2(out);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class ColumnsMethods {
+ private Collection<Bytes> rows;
+
+ public ColumnsMethods(Collection<Bytes> rows) {
+ this.rows = rows;
+ }
+
+ public MapConverter columns(Set<Column> columns) {
+ return new MapConverter(rows, columns);
+ }
+
+ public MapConverter columns(Column... columns) {
+ return columns(new HashSet<>(Arrays.asList(columns)));
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class ValueRowMethods extends RowMethods<ValueFamilyMethods> {
+
+ ValueRowMethods() {
+ tl.super();
+ }
+
+ @Override
+ ValueFamilyMethods create(Data data) {
+ return new ValueFamilyMethods(data);
+ }
+
+ public ColumnsMethods rows(Collection<Bytes> rows) {
+ return new ColumnsMethods(rows);
+ }
+
+ public ColumnsMethods rows(Bytes... rows) {
+ return new ColumnsMethods(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsString(String... rows) {
+ return rowsString(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsString(Collection<String> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (String row : rows) {
+ conv.add(encoder.encode(row));
+ }
+
+ return rows(conv);
+ }
+
+ public ColumnsMethods rowsLong(Long... rows) {
+ return rowsLong(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsLong(Collection<Long> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (Long row : rows) {
+ conv.add(encoder.encode(row));
+ }
+
+ return rows(conv);
+ }
+
+ public ColumnsMethods rowsInteger(Integer... rows) {
+ return rowsInteger(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsInteger(Collection<Integer> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (Integer row : rows) {
+ conv.add(encoder.encode(row));
+ }
+
+ return rows(conv);
+ }
+
+ public ColumnsMethods rowsBytes(byte[]... rows) {
+ return rowsBytes(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsBytes(Collection<byte[]> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (byte[] row : rows) {
+ conv.add(Bytes.of(row));
+ }
+
+ return rows(conv);
+ }
+
+ public ColumnsMethods rowsByteBuffers(ByteBuffer... rows) {
+ return rowsByteBuffers(Arrays.asList(rows));
+ }
+
+ public ColumnsMethods rowsByteBuffers(Collection<ByteBuffer> rows) {
+ ArrayList<Bytes> conv = new ArrayList<>();
+ for (ByteBuffer row : rows) {
+ conv.add(Bytes.of(row));
+ }
+
+ return rows(conv);
+ }
+
+ }
+
+ TypedSnapshotBase(SnapshotBase snapshot, Encoder encoder, TypeLayer tl) {
+ this.snapshot = snapshot;
+ this.encoder = encoder;
+ this.tl = tl;
+ }
+
+ @Override
+ public Bytes get(Bytes row, Column column) {
+ return snapshot.get(row, column);
+ }
+
+ @Override
+ public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+ return snapshot.get(row, columns);
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+ return snapshot.get(rowColumns);
+ }
+
+ @Override
+ public RowIterator get(ScannerConfiguration config) {
+ return snapshot.get(config);
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+ return snapshot.get(rows, columns);
+ }
+
+ public ValueRowMethods get() {
+ return new ValueRowMethods();
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private Map<Column, Value> wrap(Map<Column, Bytes> map) {
+ Map<Column, Value> ret = Maps.transformValues(map, new Function<Bytes, Value>() {
+ @Override
+ public Value apply(Bytes input) {
+ return new Value(input);
+ }
+ });
+
+ return Collections.unmodifiableMap(DefaultedMap.decorate(ret, new Value((Bytes) null)));
+ }
+
+ @Override
+ public long getStartTimestamp() {
+ return snapshot.getStartTimestamp();
+ }
+
+ @Override
+ public String gets(String row, Column column) {
+ return snapshot.gets(row, column);
+ }
+
+ @Override
+ public Map<Column, String> gets(String row, Set<Column> columns) {
+ return snapshot.gets(row, columns);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+ return snapshot.gets(rows, columns);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+ return snapshot.gets(rowColumns);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransaction.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransaction.java
new file mode 100644
index 0000000..1e22cd4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransaction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+/**
+ * A {@link Transaction} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedTransaction extends TypedTransactionBase implements Transaction {
+
+ private final Transaction closeTx;
+
+ @VisibleForTesting
+ protected TypedTransaction(Transaction tx, Encoder encoder, TypeLayer tl) {
+ super(tx, encoder, tl);
+ closeTx = tx;
+ }
+
+ @Override
+ public void commit() throws CommitException {
+ closeTx.commit();
+ }
+
+ @Override
+ public void close() {
+ closeTx.close();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransactionBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransactionBase.java
new file mode 100644
index 0000000..3247ba9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/fluo/recipes/types/TypedTransactionBase.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+import org.apache.fluo.recipes.types.TypeLayer.Data;
+import org.apache.fluo.recipes.types.TypeLayer.FamilyMethods;
+import org.apache.fluo.recipes.types.TypeLayer.QualifierMethods;
+import org.apache.fluo.recipes.types.TypeLayer.RowMethods;
+
+/**
+ * A {@link TransactionBase} that uses a {@link TypeLayer}
+ *
+ * @since 1.0.0
+ */
+public class TypedTransactionBase extends TypedSnapshotBase implements TransactionBase {
+
+ private final TransactionBase tx;
+ private final Encoder encoder;
+ private final TypeLayer tl;
+
+ /**
+ * @since 1.0.0
+ */
+ public class Mutator {
+
+ private boolean set = false;
+ protected Data data;
+
+ public Mutator(Data data) {
+ this.data = data;
+ }
+
+ void checkNotSet() {
+ if (set) {
+ throw new IllegalStateException("Already set value");
+ }
+ }
+
+ public void set(Bytes bytes) throws AlreadySetException {
+ checkNotSet();
+ tx.set(data.row, data.getCol(), bytes);
+ set = true;
+ }
+
+ public void set(String s) throws AlreadySetException {
+ set(encoder.encode(s));
+ }
+
+ public void set(int i) throws AlreadySetException {
+ set(encoder.encode(i));
+ }
+
+ public void set(long l) throws AlreadySetException {
+ set(encoder.encode(l));
+ }
+
+ public void set(float f) throws AlreadySetException {
+ set(encoder.encode(f));
+ }
+
+ public void set(double d) throws AlreadySetException {
+ set(encoder.encode(d));
+ }
+
+ public void set(boolean b) throws AlreadySetException {
+ set(encoder.encode(b));
+ }
+
+ public void set(byte[] ba) throws AlreadySetException {
+ set(Bytes.of(ba));
+ }
+
+ public void set(ByteBuffer bb) throws AlreadySetException {
+ set(Bytes.of(bb));
+ }
+
+ /**
+ * Set an empty value
+ */
+ public void set() throws AlreadySetException {
+ set(Bytes.EMPTY);
+ }
+
+ /**
+ * Reads the current value of the row/column, adds i, sets the sum. If the row/column does not
+ * have a current value, then it defaults to zero.
+ *
+ * @param i Integer increment amount
+ * @throws AlreadySetException if value was previously set in transaction
+ */
+ public void increment(int i) throws AlreadySetException {
+ checkNotSet();
+ Bytes val = tx.get(data.row, data.getCol());
+ int v = 0;
+ if (val != null) {
+ v = encoder.decodeInteger(val);
+ }
+ tx.set(data.row, data.getCol(), encoder.encode(v + i));
+ }
+
+ /**
+ * Reads the current value of the row/column, adds l, sets the sum. If the row/column does not
+ * have a current value, then it defaults to zero.
+ *
+ * @param l Long increment amount
+ * @throws AlreadySetException if value was previously set in transaction
+ */
+ public void increment(long l) throws AlreadySetException {
+ checkNotSet();
+ Bytes val = tx.get(data.row, data.getCol());
+ long v = 0;
+ if (val != null) {
+ v = encoder.decodeLong(val);
+ }
+ tx.set(data.row, data.getCol(), encoder.encode(v + l));
+ }
+
+ public void delete() throws AlreadySetException {
+ checkNotSet();
+ tx.delete(data.row, data.getCol());
+ set = true;
+ }
+
+ public void weaklyNotify() {
+ checkNotSet();
+ tx.setWeakNotification(data.row, data.getCol());
+ set = true;
+ }
+
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class VisibilityMutator extends Mutator {
+
+ public VisibilityMutator(Data data) {
+ super(data);
+ }
+
+ public Mutator vis(String cv) {
+ checkNotSet();
+ data.vis = Bytes.of(cv);
+ return new Mutator(data);
+ }
+
+ public Mutator vis(Bytes cv) {
+ checkNotSet();
+ data.vis = cv;
+ return new Mutator(data);
+ }
+
+ public Mutator vis(byte[] cv) {
+ checkNotSet();
+ data.vis = Bytes.of(cv);
+ return new Mutator(data);
+ }
+
+ public Mutator vis(ByteBuffer cv) {
+ checkNotSet();
+ data.vis = Bytes.of(cv);
+ return new Mutator(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class MutatorQualifierMethods extends QualifierMethods<VisibilityMutator> {
+
+ MutatorQualifierMethods(Data data) {
+ tl.super(data);
+ }
+
+ @Override
+ VisibilityMutator create(Data data) {
+ return new VisibilityMutator(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class MutatorFamilyMethods extends FamilyMethods<MutatorQualifierMethods, Mutator> {
+
+ MutatorFamilyMethods(Data data) {
+ tl.super(data);
+ }
+
+ @Override
+ MutatorQualifierMethods create1(Data data) {
+ return new MutatorQualifierMethods(data);
+ }
+
+ @Override
+ Mutator create2(Data data) {
+ return new Mutator(data);
+ }
+ }
+
+ /**
+ * @since 1.0.0
+ */
+ public class MutatorRowMethods extends RowMethods<MutatorFamilyMethods> {
+
+ MutatorRowMethods() {
+ tl.super();
+ }
+
+ @Override
+ MutatorFamilyMethods create(Data data) {
+ return new MutatorFamilyMethods(data);
+ }
+
+ }
+
+ @VisibleForTesting
+ protected TypedTransactionBase(TransactionBase tx, Encoder encoder, TypeLayer tl) {
+ super(tx, encoder, tl);
+ this.tx = tx;
+ this.encoder = encoder;
+ this.tl = tl;
+ }
+
+ public MutatorRowMethods mutate() {
+ return new MutatorRowMethods();
+ }
+
+ @Override
+ public void set(Bytes row, Column col, Bytes value) throws AlreadySetException {
+ tx.set(row, col, value);
+ }
+
+ @Override
+ public void set(String row, Column col, String value) throws AlreadySetException {
+ tx.set(row, col, value);
+ }
+
+ @Override
+ public void setWeakNotification(Bytes row, Column col) {
+ tx.setWeakNotification(row, col);
+ }
+
+ @Override
+ public void setWeakNotification(String row, Column col) {
+ tx.setWeakNotification(row, col);
+ }
+
+ @Override
+ public void delete(Bytes row, Column col) throws AlreadySetException {
+ tx.delete(row, col);
+ }
+
+ @Override
+ public void delete(String row, Column col) {
+ tx.delete(row, col);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshot.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshot.java
new file mode 100644
index 0000000..3cdc9ea
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshot.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.Snapshot;
+
+public class MockSnapshot extends MockSnapshotBase implements Snapshot {
+
+ MockSnapshot(String... entries) {
+ super(entries);
+ }
+
+ @Override
+ public void close() {
+ // no resources need to be closed
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshotBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshotBase.java
new file mode 100644
index 0000000..93372dc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockSnapshotBase.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.config.ScannerConfiguration;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumn;
+import org.apache.fluo.api.iterator.RowIterator;
+import org.apache.fluo.core.impl.TxStringUtil;
+
+public class MockSnapshotBase implements SnapshotBase {
+
+ final Map<Bytes, Map<Column, Bytes>> getData;
+
+ /**
+ * Initializes {@link #getData} using {@link #toRCVM(String...)}
+ */
+ MockSnapshotBase(String... entries) {
+ getData = toRCVM(entries);
+ }
+
+ @Override
+ public Bytes get(Bytes row, Column column) {
+ Map<Column, Bytes> cols = getData.get(row);
+ if (cols != null) {
+ return cols.get(column);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Map<Column, Bytes> get(Bytes row, Set<Column> columns) {
+ Map<Column, Bytes> ret = new HashMap<>();
+ Map<Column, Bytes> cols = getData.get(row);
+ if (cols != null) {
+ for (Column column : columns) {
+ Bytes val = cols.get(column);
+ if (val != null) {
+ ret.put(column, val);
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) {
+
+ Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
+
+ for (Bytes row : rows) {
+ Map<Column, Bytes> colMap = get(row, columns);
+ if (colMap != null && colMap.size() > 0) {
+ ret.put(row, colMap);
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public RowIterator get(ScannerConfiguration config) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * toRCVM stands for "To Row Column Value Map". This is a convenience function that takes strings
+ * of the format {@code <row>,<col fam>:<col qual>[:col vis],
+ * <value>} and generates a row, column, value map.
+ */
+ public static Map<Bytes, Map<Column, Bytes>> toRCVM(String... entries) {
+ Map<Bytes, Map<Column, Bytes>> ret = new HashMap<>();
+
+ for (String entry : entries) {
+ String[] rcv = entry.split(",");
+ if (rcv.length != 3 && !(rcv.length == 2 && entry.trim().endsWith(","))) {
+ throw new IllegalArgumentException(
+ "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+ }
+
+ Bytes row = Bytes.of(rcv[0]);
+ String[] colFields = rcv[1].split(":");
+
+ Column col;
+ if (colFields.length == 3) {
+ col = new Column(colFields[0], colFields[1], colFields[2]);
+ } else if (colFields.length == 2) {
+ col = new Column(colFields[0], colFields[1]);
+ } else {
+ throw new IllegalArgumentException(
+ "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+ }
+
+ Bytes val;
+ if (rcv.length == 2) {
+ val = Bytes.EMPTY;
+ } else {
+ val = Bytes.of(rcv[2]);
+ }
+
+ Map<Column, Bytes> cols = ret.get(row);
+ if (cols == null) {
+ cols = new HashMap<>();
+ ret.put(row, cols);
+ }
+
+ cols.put(col, val);
+ }
+ return ret;
+ }
+
+ /**
+ * toRCM stands for "To Row Column Map". This is a convenience function that takes strings of the
+ * format {@code <row>,<col fam>:<col qual>[:col vis]} and generates a row, column map.
+ */
+ public static Map<Bytes, Set<Column>> toRCM(String... entries) {
+ Map<Bytes, Set<Column>> ret = new HashMap<>();
+
+ for (String entry : entries) {
+ String[] rcv = entry.split(",");
+ if (rcv.length != 2) {
+ throw new IllegalArgumentException(
+ "expected <row>,<col fam>:<col qual>[:col vis] but saw : " + entry);
+ }
+
+ Bytes row = Bytes.of(rcv[0]);
+ String[] colFields = rcv[1].split(":");
+
+ Column col;
+ if (colFields.length == 3) {
+ col = new Column(colFields[0], colFields[1], colFields[2]);
+ } else if (colFields.length == 2) {
+ col = new Column(colFields[0], colFields[1]);
+ } else {
+ throw new IllegalArgumentException(
+ "expected <row>,<col fam>:<col qual>[:col vis],<value> but saw : " + entry);
+ }
+
+ Set<Column> cols = ret.get(row);
+ if (cols == null) {
+ cols = new HashSet<>();
+ ret.put(row, cols);
+ }
+
+ cols.add(col);
+ }
+ return ret;
+ }
+
+ @Override
+ public long getStartTimestamp() {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public String gets(String row, Column column) {
+ return TxStringUtil.gets(this, row, column);
+ }
+
+ @Override
+ public Map<Column, String> gets(String row, Set<Column> columns) {
+ return TxStringUtil.gets(this, row, columns);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) {
+ return TxStringUtil.gets(this, rows, columns);
+ }
+
+ @Override
+ public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) {
+ return TxStringUtil.gets(this, rowColumns);
+ }
+
+ @Override
+ public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransaction.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransaction.java
new file mode 100644
index 0000000..f187234
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransaction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.exceptions.CommitException;
+
+public class MockTransaction extends MockTransactionBase implements Transaction {
+
+ MockTransaction(String... entries) {
+ super(entries);
+ }
+
+ @Override
+ public void commit() throws CommitException {
+ // does nothing
+ }
+
+ @Override
+ public void close() {
+ // no resources to close
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransactionBase.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransactionBase.java
new file mode 100644
index 0000000..07a95e9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/MockTransactionBase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.exceptions.AlreadySetException;
+
+/**
+ * A very simple implementation of {@link TransactionBase} used for testing. All reads are serviced
+ * from {@link #getData}. Updates are stored in {@link #setData}, {@link #deletes}, or
+ * {@link #weakNotifications} depending on the update type.
+ */
+public class MockTransactionBase extends MockSnapshotBase implements TransactionBase {
+
+ final Map<Bytes, Map<Column, Bytes>> setData = new HashMap<>();
+ final Map<Bytes, Set<Column>> deletes = new HashMap<>();
+ final Map<Bytes, Set<Column>> weakNotifications = new HashMap<>();
+
+ MockTransactionBase(String... entries) {
+ super(entries);
+ }
+
+ @Override
+ public void setWeakNotification(Bytes row, Column col) {
+ Set<Column> cols = weakNotifications.get(row);
+ if (cols == null) {
+ cols = new HashSet<>();
+ weakNotifications.put(row, cols);
+ }
+
+ cols.add(col);
+ }
+
+ @Override
+ public void set(Bytes row, Column col, Bytes value) {
+ Map<Column, Bytes> cols = setData.get(row);
+ if (cols == null) {
+ cols = new HashMap<>();
+ setData.put(row, cols);
+ }
+
+ cols.put(col, value);
+ }
+
+ @Override
+ public void delete(Bytes row, Column col) {
+ Set<Column> cols = deletes.get(row);
+ if (cols == null) {
+ cols = new HashSet<>();
+ deletes.put(row, cols);
+ }
+
+ cols.add(col);
+ }
+
+ @Override
+ public void setWeakNotification(String row, Column col) {
+ setWeakNotification(Bytes.of(row), col);
+ }
+
+ @Override
+ public void set(String row, Column col, String value) throws AlreadySetException {
+ set(Bytes.of(row), col, Bytes.of(value));
+ }
+
+ @Override
+ public void delete(String row, Column col) {
+ delete(Bytes.of(row), col);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/fluo/recipes/types/TypeLayerTest.java b/modules/core/src/test/java/org/apache/fluo/recipes/types/TypeLayerTest.java
new file mode 100644
index 0000000..1139481
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/fluo/recipes/types/TypeLayerTest.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.fluo.recipes.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.recipes.types.TypedSnapshotBase.Value;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TypeLayerTest {
+
+ @Test
+ public void testColumns() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt =
+ new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+ "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ Map<Column, Value> results =
+ ttx.get().row("r2")
+ .columns(ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7")));
+
+ Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
+ Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
+ Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
+ Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
+
+ Assert.assertEquals(1, results.size());
+
+ results =
+ ttx.get()
+ .row("r2")
+ .columns(
+ ImmutableSet.of(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2",
+ "8")));
+
+ Assert.assertNull(results.get(new Column("cf2", "6")).toInteger());
+ Assert.assertEquals(0, results.get(new Column("cf2", "6")).toInteger(0));
+ Assert.assertEquals(12, (int) results.get(new Column("cf2", "7")).toInteger());
+ Assert.assertEquals(12, results.get(new Column("cf2", "7")).toInteger(0));
+ Assert.assertEquals(13, (int) results.get(new Column("cf2", "8")).toInteger());
+ Assert.assertEquals(13, results.get(new Column("cf2", "8")).toInteger(0));
+
+ Assert.assertEquals(2, results.size());
+
+ // test var args
+ Map<Column, Value> results2 =
+ ttx.get().row("r2")
+ .columns(new Column("cf2", "6"), new Column("cf2", "7"), new Column("cf2", "8"));
+ Assert.assertEquals(results, results2);
+ }
+
+ @Test
+ public void testVis() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt = new MockTransactionBase("r1,cf1:cq1:A,v1", "r1,cf1:cq2:A&B,v2");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ Assert.assertNull(ttx.get().row("r1").fam("cf1").qual("cq1").toString());
+ Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A").toString());
+ Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A".getBytes())
+ .toString());
+ Assert.assertEquals("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A"))
+ .toString());
+ Assert.assertEquals("v1",
+ ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A".getBytes())).toString());
+
+ Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString());
+ Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
+ .toString());
+ Assert.assertNull("v1", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
+ .toString());
+ Assert.assertNull("v1",
+ ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
+ .toString());
+
+ Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B").toString("v3"));
+ Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis("A&B".getBytes())
+ .toString("v3"));
+ Assert.assertEquals("v3", ttx.get().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&B"))
+ .toString("v3"));
+ Assert.assertEquals(
+ "v3",
+ ttx.get().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&B".getBytes()))
+ .toString("v3"));
+
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").set(3);
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).set(4);
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).set(5);
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).set(7);
+
+ Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1:A&B,3", "r1,cf1:cq1:A&C,4",
+ "r1,cf1:cq1:A&D,5", "r1,cf1:cq1:A&F,7"), tt.setData);
+ tt.setData.clear();
+
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&B").delete();
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis("A&C".getBytes()).delete();
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(Bytes.of("A&D")).delete();
+ ttx.mutate().row("r1").fam("cf1").qual("cq1").vis(ByteBuffer.wrap("A&F".getBytes())).delete();
+
+ Assert.assertEquals(MockTransactionBase.toRCM("r1,cf1:cq1:A&B", "r1,cf1:cq1:A&C",
+ "r1,cf1:cq1:A&D", "r1,cf1:cq1:A&F"), tt.deletes);
+ tt.deletes.clear();
+ Assert.assertEquals(0, tt.setData.size());
+ Assert.assertEquals(0, tt.weakNotifications.size());
+
+ }
+
+ @Test
+ public void testBuildColumn() {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0".getBytes()).qual("q0".getBytes())
+ .vis());
+ Assert.assertEquals(new Column("f0", "q0"), tl.bc().fam("f0").qual("q0").vis());
+ Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5).qual(7).vis());
+ Assert.assertEquals(new Column("5", "7"), tl.bc().fam(5l).qual(7l).vis());
+ Assert.assertEquals(new Column("5", "7"), tl.bc().fam(Bytes.of("5")).qual(Bytes.of("7")).vis());
+ Assert.assertEquals(new Column("5", "7"),
+ tl.bc().fam(ByteBuffer.wrap("5".getBytes())).qual(ByteBuffer.wrap("7".getBytes())).vis());
+
+ Assert.assertEquals(new Column("f0", "q0", "A&B"),
+ tl.bc().fam("f0".getBytes()).qual("q0".getBytes()).vis("A&B"));
+ Assert.assertEquals(new Column("f0", "q0", "A&C"),
+ tl.bc().fam("f0").qual("q0").vis("A&C".getBytes()));
+ Assert.assertEquals(new Column("5", "7", "A&D"), tl.bc().fam(5).qual(7).vis(Bytes.of("A&D")));
+ Assert.assertEquals(new Column("5", "7", "A&D"),
+ tl.bc().fam(5).qual(7).vis(ByteBuffer.wrap("A&D".getBytes())));
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockSnapshot ms =
+ new MockSnapshot("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+ "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20",
+ "r3,cf3:cq3,28.195", "r4,cf4:cq4,true");
+
+ TypedSnapshot tts = tl.wrap(ms);
+
+ Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString());
+ Assert.assertEquals("v1", tts.get().row("r1").fam("cf1").qual("cq1").toString("b"));
+ Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString());
+ Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual("8").toString("b"));
+ Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString());
+ Assert.assertEquals("28.195", tts.get().row("r3").fam("cf3").qual("cq3").toString("b"));
+ Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString());
+ Assert.assertEquals("true", tts.get().row("r4").fam("cf4").qual("cq4").toString("b"));
+
+ // try converting to different types
+ Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString());
+ Assert.assertEquals("13", tts.get().row("r2").fam("cf2").qual(8).toString("b"));
+ Assert.assertEquals((Integer) 13, tts.get().row("r2").fam("cf2").qual(8).toInteger());
+ Assert.assertEquals(13, tts.get().row("r2").fam("cf2").qual(8).toInteger(14));
+ Assert.assertEquals((Long) 13l, tts.get().row("r2").fam("cf2").qual(8).toLong());
+ Assert.assertEquals(13l, tts.get().row("r2").fam("cf2").qual(8).toLong(14l));
+ Assert.assertEquals("13", new String(tts.get().row("r2").fam("cf2").qual(8).toBytes()));
+ Assert.assertEquals("13",
+ new String(tts.get().row("r2").fam("cf2").qual(8).toBytes("14".getBytes())));
+ Assert
+ .assertEquals("13", new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes()));
+ Assert.assertEquals("13",
+ new String(tts.get().row("r2").col(new Column("cf2", "8")).toBytes("14".getBytes())));
+ Assert.assertEquals("13",
+ Bytes.of(tts.get().row("r2").col(new Column("cf2", "8")).toByteBuffer()).toString());
+ Assert.assertEquals(
+ "13",
+ Bytes.of(
+ tts.get().row("r2").col(new Column("cf2", "8"))
+ .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
+
+ // test non-existent
+ Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toInteger());
+ Assert.assertEquals(14, tts.get().row("r2").fam("cf3").qual(8).toInteger(14));
+ Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toLong());
+ Assert.assertEquals(14l, tts.get().row("r2").fam("cf3").qual(8).toLong(14l));
+ Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toString());
+ Assert.assertEquals("14", tts.get().row("r2").fam("cf3").qual(8).toString("14"));
+ Assert.assertNull(tts.get().row("r2").fam("cf3").qual(8).toBytes());
+ Assert.assertEquals("14",
+ new String(tts.get().row("r2").fam("cf3").qual(8).toBytes("14".getBytes())));
+ Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toBytes());
+ Assert.assertEquals("14",
+ new String(tts.get().row("r2").col(new Column("cf3", "8")).toBytes("14".getBytes())));
+ Assert.assertNull(tts.get().row("r2").col(new Column("cf3", "8")).toByteBuffer());
+ Assert.assertEquals(
+ "14",
+ Bytes.of(
+ tts.get().row("r2").col(new Column("cf3", "8"))
+ .toByteBuffer(ByteBuffer.wrap("14".getBytes()))).toString());
+
+ // test float & double
+ Assert.assertEquals((Float) 28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat());
+ Assert.assertEquals(28.195f, tts.get().row("r3").fam("cf3").qual("cq3").toFloat(39.383f), 0.0);
+ Assert.assertEquals((Double) 28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble());
+ Assert.assertEquals(28.195d, tts.get().row("r3").fam("cf3").qual("cq3").toDouble(39.383d), 0.0);
+
+ // test boolean
+ Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
+ Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean());
+ Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
+ Assert.assertEquals(true, tts.get().row("r4").fam("cf4").qual("cq4").toBoolean(false));
+
+ // try different types for row
+ Assert.assertEquals("20", tts.get().row(13).fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row(13l).fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13".getBytes()).fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row(ByteBuffer.wrap("13".getBytes())).fam("9").qual("17")
+ .toString());
+
+ // try different types for cf
+ Assert.assertEquals("20", tts.get().row("13").fam(9).qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam(9l).qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9".getBytes()).qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam(ByteBuffer.wrap("9".getBytes())).qual("17")
+ .toString());
+
+ // try different types for cq
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17").toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17l).toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual(17).toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual("17".getBytes()).toString());
+ Assert.assertEquals("20", tts.get().row("13").fam("9").qual(ByteBuffer.wrap("17".getBytes()))
+ .toString());
+
+ ms.close();
+ tts.close();
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt =
+ new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+ "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ // test increments data
+ ttx.mutate().row("13").fam("9").qual("17").increment(1);
+ ttx.mutate().row("13").fam("9").qual(18).increment(2);
+ ttx.mutate().row("13").fam("9").qual(19l).increment(3);
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4);
+ ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5); // increment non existent
+ ttx.mutate().row("13").col(new Column("9", "22")).increment(6); // increment non existent
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7); // increment
+ // non
+ // existent
+
+ Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
+ "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
+ tt.setData.clear();
+
+ // test increments long
+ ttx.mutate().row("13").fam("9").qual("17").increment(1l);
+ ttx.mutate().row("13").fam("9").qual(18).increment(2l);
+ ttx.mutate().row("13").fam("9").qual(19l).increment(3l);
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).increment(4l);
+ ttx.mutate().row("13").fam("9").qual(Bytes.of("21")).increment(5l); // increment non existent
+ ttx.mutate().row("13").col(new Column("9", "22")).increment(6l); // increment non existent
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("23".getBytes())).increment(7l); // increment
+ // non
+ // existent
+
+ Assert.assertEquals(MockTransactionBase.toRCVM("13,9:17,21", "13,9:18,22", "13,9:19,23",
+ "13,9:20,24", "13,9:21,5", "13,9:22,6", "13,9:23,7"), tt.setData);
+ tt.setData.clear();
+
+ // test setting data
+ ttx.mutate().row("13").fam("9").qual("16").set();
+ ttx.mutate().row("13").fam("9").qual("17").set(3);
+ ttx.mutate().row("13").fam("9").qual(18).set(4l);
+ ttx.mutate().row("13").fam("9").qual(19l).set("5");
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).set("6".getBytes());
+ ttx.mutate().row("13").col(new Column("9", "21")).set("7".getBytes());
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes()))
+ .set(ByteBuffer.wrap("8".getBytes()));
+ ttx.mutate().row("13").fam("9").qual("23").set(2.54f);
+ ttx.mutate().row("13").fam("9").qual("24").set(-6.135d);
+ ttx.mutate().row("13").fam("9").qual("25").set(false);
+
+ Assert.assertEquals(MockTransactionBase.toRCVM("13,9:16,", "13,9:17,3", "13,9:18,4",
+ "13,9:19,5", "13,9:20,6", "13,9:21,7", "13,9:22,8", "13,9:23,2.54", "13,9:24,-6.135",
+ "13,9:25,false"), tt.setData);
+ tt.setData.clear();
+
+ // test deleting data
+ ttx.mutate().row("13").fam("9").qual("17").delete();
+ ttx.mutate().row("13").fam("9").qual(18).delete();
+ ttx.mutate().row("13").fam("9").qual(19l).delete();
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).delete();
+ ttx.mutate().row("13").col(new Column("9", "21")).delete();
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).delete();
+
+ Assert
+ .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
+ "13,9:21", "13,9:22"), tt.deletes);
+ tt.deletes.clear();
+ Assert.assertEquals(0, tt.setData.size());
+ Assert.assertEquals(0, tt.weakNotifications.size());
+
+ // test weak notifications
+ ttx.mutate().row("13").fam("9").qual("17").weaklyNotify();
+ ttx.mutate().row("13").fam("9").qual(18).weaklyNotify();
+ ttx.mutate().row("13").fam("9").qual(19l).weaklyNotify();
+ ttx.mutate().row("13").fam("9").qual("20".getBytes()).weaklyNotify();
+ ttx.mutate().row("13").col(new Column("9", "21")).weaklyNotify();
+ ttx.mutate().row("13").fam("9").qual(ByteBuffer.wrap("22".getBytes())).weaklyNotify();
+
+ Assert
+ .assertEquals(MockTransactionBase.toRCM("13,9:17", "13,9:18", "13,9:19", "13,9:20",
+ "13,9:21", "13,9:22"), tt.weakNotifications);
+ tt.weakNotifications.clear();
+ Assert.assertEquals(0, tt.setData.size());
+ Assert.assertEquals(0, tt.deletes.size());
+ }
+
+ @Test
+ public void testMultiRow() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt =
+ new MockTransactionBase("11,cf1:cq1,1", "11,cf1:cq2,2", "12,cf1:cq1,3", "12,cf1:cq2,4",
+ "13,cf1:cq1,5", "13,cf1:cq2,6");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ Bytes br1 = Bytes.of("11");
+ Bytes br2 = Bytes.of("12");
+ Bytes br3 = Bytes.of("13");
+
+ Column c1 = new Column("cf1", "cq1");
+ Column c2 = new Column("cf1", "cq2");
+
+ Map<Bytes, Map<Column, Value>> map1 =
+ ttx.get().rows(Arrays.asList(br1, br2)).columns(c1).toBytesMap();
+
+ Assert.assertEquals(map1, ttx.get().rows(br1, br2).columns(c1).toBytesMap());
+
+ Assert.assertEquals("1", map1.get(br1).get(c1).toString());
+ Assert.assertEquals("1", map1.get(br1).get(c1).toString("5"));
+ Assert.assertEquals((Long) (1l), map1.get(br1).get(c1).toLong());
+ Assert.assertEquals(1l, map1.get(br1).get(c1).toLong(5));
+ Assert.assertEquals((Integer) (1), map1.get(br1).get(c1).toInteger());
+ Assert.assertEquals(1, map1.get(br1).get(c1).toInteger(5));
+
+ Assert.assertEquals("5", map1.get(br3).get(c1).toString("5"));
+ Assert.assertNull(map1.get(br3).get(c1).toString());
+ Assert.assertEquals(5l, map1.get(br3).get(c1).toLong(5l));
+ Assert.assertNull(map1.get(br3).get(c1).toLong());
+ Assert.assertEquals(5, map1.get(br1).get(c2).toInteger(5));
+ Assert.assertNull(map1.get(br1).get(c2).toInteger());
+
+ Assert.assertEquals(2, map1.size());
+ Assert.assertEquals(1, map1.get(br1).size());
+ Assert.assertEquals(1, map1.get(br2).size());
+ Assert.assertEquals("3", map1.get(br2).get(c1).toString());
+
+ Map<String, Map<Column, Value>> map2 =
+ ttx.get().rowsString(Arrays.asList("11", "13")).columns(c1).toStringMap();
+
+ Assert.assertEquals(map2, ttx.get().rowsString("11", "13").columns(c1).toStringMap());
+
+ Assert.assertEquals(2, map2.size());
+ Assert.assertEquals(1, map2.get("11").size());
+ Assert.assertEquals(1, map2.get("13").size());
+ Assert.assertEquals((Long) (1l), map2.get("11").get(c1).toLong());
+ Assert.assertEquals(5l, map2.get("13").get(c1).toLong(6));
+
+ Map<Long, Map<Column, Value>> map3 =
+ ttx.get().rowsLong(Arrays.asList(11l, 13l)).columns(c1).toLongMap();
+
+ Assert.assertEquals(map3, ttx.get().rowsLong(11l, 13l).columns(c1).toLongMap());
+
+ Assert.assertEquals(2, map3.size());
+ Assert.assertEquals(1, map3.get(11l).size());
+ Assert.assertEquals(1, map3.get(13l).size());
+ Assert.assertEquals((Long) (1l), map3.get(11l).get(c1).toLong());
+ Assert.assertEquals(5l, map3.get(13l).get(c1).toLong(6));
+
+ Map<Integer, Map<Column, Value>> map4 =
+ ttx.get().rowsInteger(Arrays.asList(11, 13)).columns(c1).toIntegerMap();
+
+ Assert.assertEquals(map4, ttx.get().rowsInteger(11, 13).columns(c1).toIntegerMap());
+
+ Assert.assertEquals(2, map4.size());
+ Assert.assertEquals(1, map4.get(11).size());
+ Assert.assertEquals(1, map4.get(13).size());
+ Assert.assertEquals((Long) (1l), map4.get(11).get(c1).toLong());
+ Assert.assertEquals(5l, map4.get(13).get(c1).toLong(6));
+
+ Map<Integer, Map<Column, Value>> map5 =
+ ttx.get().rowsBytes(Arrays.asList("11".getBytes(), "13".getBytes())).columns(c1)
+ .toIntegerMap();
+
+ Assert.assertEquals(map5, ttx.get().rowsBytes("11".getBytes(), "13".getBytes()).columns(c1)
+ .toIntegerMap());
+
+ Assert.assertEquals(2, map5.size());
+ Assert.assertEquals(1, map5.get(11).size());
+ Assert.assertEquals(1, map5.get(13).size());
+ Assert.assertEquals((Long) (1l), map5.get(11).get(c1).toLong());
+ Assert.assertEquals(5l, map5.get(13).get(c1).toLong(6));
+
+ Map<Integer, Map<Column, Value>> map6 =
+ ttx.get()
+ .rowsByteBuffers(
+ Arrays.asList(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes())))
+ .columns(c1).toIntegerMap();
+
+ Assert.assertEquals(
+ map6,
+ ttx.get()
+ .rowsByteBuffers(ByteBuffer.wrap("11".getBytes()), ByteBuffer.wrap("13".getBytes()))
+ .columns(c1).toIntegerMap());
+
+ Assert.assertEquals(2, map6.size());
+ Assert.assertEquals(1, map6.get(11).size());
+ Assert.assertEquals(1, map6.get(13).size());
+ Assert.assertEquals((Long) (1l), map6.get(11).get(c1).toLong());
+ Assert.assertEquals(5l, map6.get(13).get(c1).toLong(6));
+
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ TypeLayer tl = new TypeLayer(new StringEncoder());
+
+ MockTransactionBase tt =
+ new MockTransactionBase("r1,cf1:cq1,v1", "r1,cf1:cq2,v2", "r1,cf1:cq3,9", "r2,cf2:7,12",
+ "r2,cf2:8,13", "13,9:17,20", "13,9:18,20", "13,9:19,20", "13,9:20,20");
+
+ TypedTransactionBase ttx = tl.wrap(tt);
+
+ Assert.assertEquals(Bytes.of("12"), ttx.get(Bytes.of("r2"), new Column("cf2", "7")));
+ Assert.assertNull(ttx.get(Bytes.of("r2"), new Column("cf2", "9")));
+
+ Map<Column, Bytes> map =
+ ttx.get(Bytes.of("r2"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
+ Assert.assertEquals(2, map.size());
+ Assert.assertEquals("12", map.get(new Column("cf2", "7")).toString());
+ Assert.assertEquals("13", map.get(new Column("cf2", "8")).toString());
+
+ map = ttx.get(Bytes.of("r6"), ImmutableSet.of(new Column("cf2", "7"), new Column("cf2", "8")));
+ Assert.assertEquals(0, map.size());
+
+ ttx.set(Bytes.of("r6"), new Column("cf2", "7"), Bytes.of("3"));
+ Assert.assertEquals(MockTransactionBase.toRCVM("r6,cf2:7,3"), tt.setData);
+ tt.setData.clear();
+
+ Map<Bytes, Map<Column, Bytes>> map2 =
+ ttx.get(ImmutableSet.of(Bytes.of("r1"), Bytes.of("r2")),
+ ImmutableSet.of(new Column("cf1", "cq1"), new Column("cf2", "8")));
+ Assert.assertEquals(MockTransactionBase.toRCVM("r1,cf1:cq1,v1", "r2,cf2:8,13"), map2);
+
+ ttx.delete(Bytes.of("r6"), new Column("cf2", "7"));
+ Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:7"), tt.deletes);
+ tt.deletes.clear();
+
+ ttx.setWeakNotification(Bytes.of("r6"), new Column("cf2", "8"));
+ Assert.assertEquals(MockTransactionBase.toRCM("r6,cf2:8"), tt.weakNotifications);
+ tt.weakNotifications.clear();
+
+ }
+}