MicroBatchIBackingMap: avoid store timeouts on multiput and multiget
diff --git a/src/jvm/storm/trident/state/map/MicroBatchIBackingMap.java b/src/jvm/storm/trident/state/map/MicroBatchIBackingMap.java
new file mode 100644
index 0000000..2f356b1
--- /dev/null
+++ b/src/jvm/storm/trident/state/map/MicroBatchIBackingMap.java
@@ -0,0 +1,68 @@
+package storm.trident.state.map;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class MicroBatchIBackingMap<T> implements IBackingMap<T> {
+    IBackingMap<T> _delegate;
+    Options _options;
+
+
+    public static class Options implements Serializable {
+        public int maxMultiGetBatchSize = 0; // 0 means delegate batch size = trident batch size.
+        public int maxMultiPutBatchSize = 0;
+    }
+
+    public MicroBatchIBackingMap(final Options options, final IBackingMap<T> delegate) {
+        _options = options;
+        _delegate = delegate;
+        assert options.maxMultiPutBatchSize >= 0;
+        assert options.maxMultiGetBatchSize >= 0;
+    }
+
+    @Override
+    public void multiPut(final List<List<Object>> keys, final List<T> values) {
+        int thisBatchSize;
+        if(_options.maxMultiPutBatchSize == 0) { thisBatchSize = keys.size(); }
+        else { thisBatchSize = _options.maxMultiPutBatchSize; }
+
+        LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
+        LinkedList<T> valuesTodo = new LinkedList<T>(values);
+
+        while(!keysTodo.isEmpty()) {
+            List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
+            List<T> valuesBatch = new ArrayList<T>(thisBatchSize);
+            for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
+                keysBatch.add(keysTodo.removeFirst());
+                valuesBatch.add(valuesTodo.removeFirst());
+            }
+
+            _delegate.multiPut(keysBatch, valuesBatch);
+        }
+    }
+
+    @Override
+    public List<T> multiGet(final List<List<Object>> keys) {
+        int thisBatchSize;
+        if(_options.maxMultiGetBatchSize == 0) { thisBatchSize = keys.size(); }
+        else { thisBatchSize = _options.maxMultiGetBatchSize; }
+
+        LinkedList<List<Object>> keysTodo = new LinkedList<List<Object>>(keys);
+
+        List<T> ret = new ArrayList<T>(keys.size());
+
+        while(!keysTodo.isEmpty()) {
+            List<List<Object>> keysBatch = new ArrayList<List<Object>>(thisBatchSize);
+            for(int i=0; i<thisBatchSize && !keysTodo.isEmpty(); i++) {
+                keysBatch.add(keysTodo.removeFirst());
+            }
+
+            List<T> retSubset = _delegate.multiGet(keysBatch);
+            ret.addAll(retSubset);
+        }
+
+        return ret;
+    }
+}