blob: 0cd99c5a03ec583f292507b861382d300608e5d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.datastreamer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.stream.StreamReceiver;
import org.jetbrains.annotations.Nullable;
/**
* Bundled factory for cache updaters.
*/
public class DataStreamerCacheUpdaters {
/** */
private static final StreamReceiver INDIVIDUAL = new Individual();
/** */
private static final StreamReceiver BATCHED = new Batched();
/** */
private static final StreamReceiver BATCHED_SORTED = new BatchedSorted();
/**
* Updates cache using independent {@link IgniteCache#put(Object, Object)}and
* {@link IgniteCache#remove(Object)} operations. Thus it is safe from deadlocks but performance
* is not the best.
*
* @return Single updater.
*/
public static <K, V> StreamReceiver<K, V> individual() {
return INDIVIDUAL;
}
/**
* Updates cache using batched methods {@link IgniteCache#putAll(Map)}and
* {@link IgniteCache#removeAll()}. Can cause deadlocks if the same keys are getting
* updated concurrently. Performance is generally better than in {@link #individual()}.
*
* @return Batched updater.
*/
public static <K, V> StreamReceiver<K, V> batched() {
return BATCHED;
}
/**
* Updates cache using batched methods {@link IgniteCache#putAll(Map)} and
* {@link IgniteCache#removeAll(Set)}. Keys are sorted in natural order and if all updates
* use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
*
* @return Batched sorted updater.
*/
public static <K extends Comparable<?>, V> StreamReceiver<K, V> batchedSorted() {
return BATCHED_SORTED;
}
/**
* Updates cache.
*
* @param cache Cache.
* @param rmvCol Keys to remove.
* @param putMap Entries to put.
* @throws IgniteException If failed.
*/
protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
Map<K, V> putMap) {
assert rmvCol != null || putMap != null;
// Here we assume that there are no key duplicates, so the following calls are valid.
if (rmvCol != null)
cache.removeAll(rmvCol);
if (putMap != null)
cache.putAll(putMap);
}
/**
* Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
*/
private static class Individual<K, V> implements StreamReceiver<K, V>, InternalUpdater {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
assert cache != null;
assert !F.isEmpty(entries);
for (Map.Entry<K, V> entry : entries) {
K key = entry.getKey();
assert key != null;
V val = entry.getValue();
if (val == null)
cache.remove(key);
else
cache.put(key, val);
}
}
}
/**
* Batched updater. Updates cache using batch operations thus is dead lock prone.
*/
private static class Batched<K, V> implements StreamReceiver<K, V>, InternalUpdater {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
assert cache != null;
assert !F.isEmpty(entries);
Map<K, V> putAll = null;
Set<K> rmvAll = null;
for (Map.Entry<K, V> entry : entries) {
K key = entry.getKey();
assert key != null;
V val = entry.getValue();
if (val == null) {
if (rmvAll == null)
rmvAll = new HashSet<>();
rmvAll.add(key);
}
else {
if (putAll == null)
putAll = new HashMap<>();
putAll.put(key, val);
}
}
updateAll(cache, rmvAll, putAll);
}
}
/**
* Batched updater. Updates cache using batch operations thus is dead lock prone.
*/
private static class BatchedSorted<K, V> implements StreamReceiver<K, V> {
/** */
private static final long serialVersionUID = 0L;
/** {@inheritDoc} */
@Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
assert cache != null;
assert !F.isEmpty(entries);
Map<K, V> putAll = null;
Set<K> rmvAll = null;
for (Map.Entry<K, V> entry : entries) {
K key = entry.getKey();
assert key instanceof Comparable;
V val = entry.getValue();
if (val == null) {
if (rmvAll == null)
rmvAll = new TreeSet<>();
rmvAll.add(key);
}
else {
if (putAll == null)
putAll = new TreeMap<>();
putAll.put(key, val);
}
}
updateAll(cache, rmvAll, putAll);
}
}
/**
* Marker interface for updaters which do not need to unwrap cache objects.
*/
public static interface InternalUpdater {
// No-op.
}
}