blob: da5b14046df54bd7fb0ade777e0fa6809fd8f91d [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.apex.malhar.lib.state.spillable;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.datatorrent.api.Context;
import com.datatorrent.netlet.util.Slice;
* A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
* @param <T> The type of object stored in the {@link SpillableArrayListImpl}.
* @since 3.5.0
public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T>, Spillable.SpillableComponent
public static final int DEFAULT_BATCH_SIZE = 1000;
private int batchSize = DEFAULT_BATCH_SIZE;
private long bucketId;
private byte[] prefix;
private SpillableStateStore store;
private Serde<T, Slice> serde;
private SpillableByteMapImpl<Integer, List<T>> map;
private boolean sizeCached = false;
private int size;
private int numBatches;
private SpillableArrayListImpl()
//for kryo
public SpillableStateStore getStore()
return store;
* Creates a {@link SpillableArrayListImpl}.
* @param bucketId The Id of the bucket used to store this
* {@link SpillableArrayListImpl} in the provided {@link SpillableStateStore}.
* @param prefix The Id of this {@link SpillableArrayListImpl}.
* @param store The {@link SpillableStateStore} in which to spill to.
* @param serde The {@link Serde} to use when serializing and deserializing data.
public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
@NotNull SpillableStateStore store,
@NotNull Serde<T, Slice> serde)
this.bucketId = bucketId;
this.prefix = Preconditions.checkNotNull(prefix); = Preconditions.checkNotNull(store);
this.serde = Preconditions.checkNotNull(serde);
map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), new SerdeListSlice(serde));
* Creates a {@link SpillableArrayListImpl}.
* @param bucketId The Id of the bucket used to store this
* {@link SpillableArrayListImpl} in the provided {@link SpillableStateStore}.
* @param prefix The Id of this {@link SpillableArrayListImpl}.
* @param store The {@link SpillableStateStore} in which to spill to.
* @param serde The {@link Serde} to use when serializing and deserializing data.
* @param batchSize When spilled to a {@link SpillableStateStore} data is stored in a batch. This determines the
* number of elements a batch will contain when it's spilled. Having small batches will increase
* the number of keys stored by your {@link SpillableStateStore} but will improve random reads and
* writes. Increasing the batch size will improve sequential read and write speed.
public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
@NotNull SpillableStateStore store,
@NotNull Serde<T, Slice> serde,
int batchSize)
this(bucketId, prefix, store, serde);
Preconditions.checkArgument(this.batchSize > 0);
this.batchSize = batchSize;
public void setSize(int size)
Preconditions.checkArgument(size >= 0);
this.size = size;
public int size()
return size;
public boolean isEmpty()
return size == 0;
public boolean contains(Object o)
throw new UnsupportedOperationException();
public Iterator<T> iterator()
throw new UnsupportedOperationException();
public Object[] toArray()
throw new UnsupportedOperationException();
public <T1> T1[] toArray(T1[] t1s)
throw new UnsupportedOperationException();
public boolean add(T t)
Preconditions.checkArgument((size() + 1) > 0);
int batchIndex = (size / batchSize);
List<T> batch = null;
if (batchIndex == numBatches) {
batch = Lists.newArrayListWithCapacity(batchSize);
} else {
batch = map.get(batchIndex);
map.put(batchIndex, batch);
return true;
public boolean remove(Object o)
throw new UnsupportedOperationException();
public boolean containsAll(Collection<?> collection)
throw new UnsupportedOperationException();
public boolean addAll(Collection<? extends T> collection)
for (T element: collection) {
return true;
public boolean addAll(int i, Collection<? extends T> collection)
throw new UnsupportedOperationException();
public boolean removeAll(Collection<?> collection)
throw new UnsupportedOperationException();
public boolean retainAll(Collection<?> collection)
throw new UnsupportedOperationException();
public void clear()
throw new UnsupportedOperationException();
public T get(int i)
if (!(i < size)) {
throw new IndexOutOfBoundsException();
int batchIndex = i / batchSize;
int batchOffset = i % batchSize;
List<T> batch = map.get(batchIndex);
return batch.get(batchOffset);
public T set(int i, T t)
if (!(i < size)) {
throw new IndexOutOfBoundsException();
int batchIndex = i / batchSize;
int batchOffset = i % batchSize;
List<T> batch = map.get(batchIndex);
T old = batch.get(batchOffset);
batch.set(batchOffset, t);
map.put(batchIndex, batch);
return old;
public void add(int i, T t)
throw new UnsupportedOperationException();
public T remove(int i)
throw new UnsupportedOperationException();
public int indexOf(Object o)
throw new UnsupportedOperationException();
public int lastIndexOf(Object o)
throw new UnsupportedOperationException();
public ListIterator<T> listIterator()
throw new UnsupportedOperationException();
public ListIterator<T> listIterator(int i)
throw new UnsupportedOperationException();
public List<T> subList(int i, int i1)
throw new UnsupportedOperationException();
public void setup(Context.OperatorContext context)
public void beginWindow(long windowId)
public void endWindow()
public void teardown()