blob: 5c91350fa55aa1aa25254a1595321eab11ac442c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.state.spillable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.google.common.base.Preconditions;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.datatorrent.api.Context;
import com.datatorrent.netlet.util.Slice;
/**
* This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
*
* @since 3.5.0
*/
@DefaultSerializer(FieldSerializer.class)
@InterfaceStability.Evolving
public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
Spillable.SpillableComponent
{
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
private transient boolean isRunning = false;
private transient boolean isInWindow = false;
private int batchSize = DEFAULT_BATCH_SIZE;
@NotNull
private SpillableByteMapImpl<byte[], Integer> map;
private SpillableStateStore store;
private byte[] identifier;
private long bucket;
private Serde<K, Slice> serdeKey;
private Serde<V, Slice> serdeValue;
private SpillableByteArrayListMultimapImpl()
{
// for kryo
}
/**
* Creates a {@link SpillableByteArrayListMultimapImpl}.
* @param store The {@link SpillableStateStore} in which to spill to.
* @param identifier The Id of this {@link SpillableByteArrayListMultimapImpl}.
* @param bucket The Id of the bucket used to store this
* {@link SpillableByteArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
* @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
* @param serdeKey The {@link Serde} to use when serializing and deserializing values.
*/
public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
Serde<K, Slice> serdeKey,
Serde<V, Slice> serdeValue)
{
this.store = Preconditions.checkNotNull(store);
this.identifier = Preconditions.checkNotNull(identifier);
this.bucket = bucket;
this.serdeKey = Preconditions.checkNotNull(serdeKey);
this.serdeValue = Preconditions.checkNotNull(serdeValue);
map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
}
public SpillableStateStore getStore()
{
return store;
}
@Override
public List<V> get(@Nullable K key)
{
return getHelper(key);
}
private SpillableArrayListImpl<V> getHelper(@Nullable K key)
{
SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
if (spillableArrayList == null) {
Slice keySlice = serdeKey.serialize(key);
Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray());
if (size == null) {
return null;
}
Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
spillableArrayList.setSize(size);
}
cache.put(key, spillableArrayList);
return spillableArrayList;
}
@Override
public Set<K> keySet()
{
throw new UnsupportedOperationException();
}
@Override
public Multiset<K> keys()
{
throw new UnsupportedOperationException();
}
@Override
public Collection<V> values()
{
throw new UnsupportedOperationException();
}
@Override
public Collection<Map.Entry<K, V>> entries()
{
throw new UnsupportedOperationException();
}
@Override
public List<V> removeAll(@Nullable Object key)
{
throw new UnsupportedOperationException();
}
@Override
public void clear()
{
throw new UnsupportedOperationException();
}
@Override
public int size()
{
return map.size();
}
@Override
public boolean isEmpty()
{
return map.isEmpty();
}
@Override
public boolean containsKey(@Nullable Object key)
{
return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
SIZE_KEY_SUFFIX).toByteArray());
}
@Override
public boolean containsValue(@Nullable Object value)
{
throw new UnsupportedOperationException();
}
@Override
public boolean containsEntry(@Nullable Object key, @Nullable Object value)
{
throw new UnsupportedOperationException();
}
@Override
public boolean put(@Nullable K key, @Nullable V value)
{
SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
if (spillableArrayList == null) {
Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
cache.put(key, spillableArrayList);
}
spillableArrayList.add(value);
return true;
}
@Override
public boolean remove(@Nullable Object key, @Nullable Object value)
{
throw new UnsupportedOperationException();
}
@Override
public boolean putAll(@Nullable K key, Iterable<? extends V> values)
{
boolean changed = false;
for (V value: values) {
changed |= put(key, value);
}
return changed;
}
@Override
public boolean putAll(Multimap<? extends K, ? extends V> multimap)
{
boolean changed = false;
for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
changed |= put(entry.getKey(), entry.getValue());
}
return changed;
}
@Override
public List<V> replaceValues(K key, Iterable<? extends V> values)
{
throw new UnsupportedOperationException();
}
@Override
public Map<K, Collection<V>> asMap()
{
throw new UnsupportedOperationException();
}
@Override
public void setup(Context.OperatorContext context)
{
map.setup(context);
isRunning = true;
}
@Override
public void beginWindow(long windowId)
{
map.beginWindow(windowId);
isInWindow = true;
}
@Override
public void endWindow()
{
isInWindow = false;
for (K key: cache.getChangedKeys()) {
SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
spillableArrayList.endWindow();
Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX).toByteArray(),
spillableArrayList.size());
}
Preconditions.checkState(cache.getRemovedKeys().isEmpty());
cache.endWindow();
map.endWindow();
}
@Override
public void teardown()
{
isRunning = false;
map.teardown();
}
}