blob: c227ed71752046a3b4ae598452acd461582f3302 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.state.spillable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice;
import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.google.common.base.Preconditions;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.datatorrent.api.Context;
import com.datatorrent.netlet.util.Slice;
/**
* This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
*
* @since 3.5.0
*/
@DefaultSerializer(FieldSerializer.class)
@InterfaceStability.Evolving
public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMultimap<K, V>,
Spillable.SpillableComponent
{
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>();
@NotNull
private SpillableByteMapImpl<Slice, Pair<Integer, V>> map;
private SpillableStateStore store;
private byte[] identifier;
private long bucket;
private Serde<K, Slice> serdeKey;
private Serde<V, Slice> serdeValue;
private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>();
private SpillableSetMultimapImpl()
{
// for kryo
}
/**
* Creates a {@link SpillableSetMultimapImpl}.
* @param store The {@link SpillableStateStore} in which to spill to.
* @param identifier The Id of this {@link SpillableSetMultimapImpl}.
* @param bucket The Id of the bucket used to store this
* {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}.
* @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
* @param serdeKey The {@link Serde} to use when serializing and deserializing values.
*/
public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
Serde<K, Slice> serdeKey,
Serde<V, Slice> serdeValue)
{
this.store = Preconditions.checkNotNull(store);
this.identifier = Preconditions.checkNotNull(identifier);
this.bucket = bucket;
this.serdeKey = Preconditions.checkNotNull(serdeKey);
this.serdeValue = Preconditions.checkNotNull(serdeValue);
map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
}
public SpillableStateStore getStore()
{
return store;
}
@Override
public Set<V> get(@NotNull K key)
{
return getHelper(key);
}
private SpillableSetImpl<V> getHelper(@NotNull K key)
{
SpillableSetImpl<V> spillableSet = cache.get(key);
if (spillableSet == null) {
Slice keySlice = serdeKey.serialize(key);
Pair<Integer, V> meta = map.get(SliceUtils.concatenate(keySlice, META_KEY_SUFFIX));
if (meta == null) {
return null;
}
Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
spillableSet.setSize(meta.getLeft());
spillableSet.setHead(meta.getRight());
}
cache.put(key, spillableSet);
return spillableSet;
}
@Override
public Set<K> keySet()
{
throw new UnsupportedOperationException();
}
@Override
public Multiset<K> keys()
{
throw new UnsupportedOperationException();
}
@Override
public Collection<V> values()
{
throw new UnsupportedOperationException();
}
@Override
public Set<Map.Entry<K, V>> entries()
{
throw new UnsupportedOperationException();
}
/**
* Note that this always returns null because the set is no longer valid after this call
*
* @param key
* @return null
*/
@Override
public Set<V> removeAll(@NotNull Object key)
{
SpillableSetImpl<V> spillableSet = getHelper((K)key);
if (spillableSet != null) {
cache.remove((K)key);
Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead()));
spillableSet.clear();
removedSets.add(spillableSet);
}
return null;
}
@Override
public void clear()
{
throw new UnsupportedOperationException();
}
@Override
public int size()
{
// TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys
return map.size();
}
@Override
public boolean isEmpty()
{
return map.isEmpty();
}
@Override
public boolean containsKey(Object key)
{
if (cache.contains((K)key)) {
return true;
}
Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
Pair<Integer, V> meta = map.get(keySlice);
return meta != null && meta.getLeft() > 0;
}
@Override
public boolean containsValue(@NotNull Object value)
{
throw new UnsupportedOperationException();
}
@Override
public boolean containsEntry(Object key, Object value)
{
Set<V> set = get((K)key);
if (set == null) {
return false;
} else {
return set.contains(value);
}
}
@Override
public boolean put(K key, V value)
{
SpillableSetImpl<V> spillableSet = getHelper(key);
if (spillableSet == null) {
Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
cache.put(key, spillableSet);
}
return spillableSet.add(value);
}
@Override
public boolean remove(@NotNull Object key, @NotNull Object value)
{
Set<V> set = get((K)key);
if (set == null) {
return false;
} else {
return set.remove(value);
}
}
@Override
public boolean putAll(@Nullable K key, Iterable<? extends V> values)
{
boolean changed = false;
for (V value: values) {
changed |= put(key, value);
}
return changed;
}
@Override
public boolean putAll(Multimap<? extends K, ? extends V> multimap)
{
boolean changed = false;
for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
changed |= put(entry.getKey(), entry.getValue());
}
return changed;
}
@Override
public Set<V> replaceValues(K key, Iterable<? extends V> values)
{
throw new UnsupportedOperationException();
}
@Override
public Map<K, Collection<V>> asMap()
{
throw new UnsupportedOperationException();
}
@Override
public void setup(Context.OperatorContext context)
{
map.setup(context);
}
@Override
public void beginWindow(long windowId)
{
map.beginWindow(windowId);
}
@Override
public void endWindow()
{
for (K key: cache.getChangedKeys()) {
SpillableSetImpl<V> spillableSet = cache.get(key);
spillableSet.endWindow();
map.put(SliceUtils.concatenate(serdeKey.serialize(key), META_KEY_SUFFIX),
new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
}
for (SpillableSetImpl removedSet : removedSets) {
removedSet.endWindow();
}
cache.endWindow();
map.endWindow();
}
@Override
public void teardown()
{
map.teardown();
}
}