blob: 6c592901ddbe9ecdeb989aa24dbed8da6ffb86ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.metastorage.watch;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.metastorage.client.EntryEvent;
import org.apache.ignite.metastorage.client.WatchEvent;
import org.apache.ignite.metastorage.client.WatchListener;
import org.jetbrains.annotations.NotNull;
/**
* Needed to aggregate multiple watches to one aggregated watch.
* This approach needed to provide the following additional guarantees to watching mechanism:
* - watch events will be processed sequentially
* - watch events will be resolved in the order of watch registration
*/
public class WatchAggregator {
/**
* Watches' map must be synchronized because of changes from WatchListener in separate thread.
*/
private final Map<Long, Watch> watches = Collections.synchronizedMap(new LinkedHashMap<>());
/** Simple auto increment id for internal watches. */
private final AtomicLong idCntr = new AtomicLong(0);
/**
* Adds new watch with simple exact criterion.
*
* @param key Key for watching.
* @param lsnr Listener which will be executed on watch event.
* @return id of registered watch. Can be used for remove watch from later.
*/
public long add(ByteArray key, WatchListener lsnr) {
var watch = new Watch(new KeyCriterion.ExactCriterion(key), lsnr);
var id = idCntr.incrementAndGet();
watches.put(id, watch);
return id;
}
/**
* Adds new watch with filter by key prefix.
*
* @param key Prefix for key.
* @param lsnr Listener which will be executed on watch event.
* @return id of registered watch. Can be used for remove watch from later.
*/
public long addPrefix(ByteArray key, WatchListener lsnr) {
var watch = new Watch(KeyCriterion.RangeCriterion.fromPrefixKey(key), lsnr);
var id = idCntr.incrementAndGet();
watches.put(id, watch);
return id;
}
/**
* Adds new watch with filter by collection of keys.
*
* @param keys Collection of keys to listen.
* @param lsnr Listener which will be executed on watch event.
* @return id of registered watch. Can be used for remove watch from later.
*/
public long add(Collection<ByteArray> keys, WatchListener lsnr) {
var watch = new Watch(new KeyCriterion.CollectionCriterion(keys), lsnr);
var id = idCntr.incrementAndGet();
watches.put(id, watch);
return id;
}
/**
* Adds new watch with filter by collection of keys.
*
* @param from Start key of range to listen.
* @param to End key of range (exclusively)..
* @param lsnr Listener which will be executed on watch event.
* @return id of registered watch. Can be used for remove watch from later.
*/
public long add(ByteArray from, ByteArray to, WatchListener lsnr) {
var watch = new Watch(new KeyCriterion.RangeCriterion(from, to), lsnr);
var id = idCntr.incrementAndGet();
watches.put(id, watch);
return id;
}
/**
* Cancel watch by id.
*
* @param id of watch to cancel.
*/
public void cancel(long id) {
watches.remove(id);
}
/**
* Cancel multiple watches by ids.
*
* @param ids of watches to cancel.
*/
public void cancelAll(Collection<Long> ids) {
watches.keySet().removeAll(ids);
}
/**
* Produce watch with aggregated key criterion and general watch listener dispatcher.
*
* @param revision start revision to listen event.
* @param saveRevisionAct action to commit keys-revision pair to persistent store for processed keys.
* @return result aggregated watch.
*/
public Optional<AggregatedWatch> watch(
long revision,
BiConsumer<Collection<IgniteBiTuple<ByteArray, byte[]>>, Long> saveRevisionAct
) {
synchronized (watches) {
if (watches.isEmpty())
return Optional.empty();
else
return Optional.of(new AggregatedWatch(inferGeneralCriteria(), revision, watchListener(saveRevisionAct)));
}
}
/**
* Returns general criterion, which overlays all aggregated criteria.
*
* @return aggregated criterion.
*/
private KeyCriterion inferGeneralCriteria() {
return
watches.values().stream()
.map(Watch::keyCriterion)
.reduce(KeyCriterion::union)
.get();
}
/**
* Produces the watch listener, which will dispatch events to appropriate watches.
*
* @param storeRevision action to commit keys-revision pair to persistent store for processed keys.
* @return watch listener, which will dispatch events to appropriate watches.
*/
private WatchListener watchListener(BiConsumer<Collection<IgniteBiTuple<ByteArray, byte[]>>, Long> storeRevision) {
// Copy watches to separate collection, because all changes on the WatchAggregator watches
// shouldn't be propagated to listener watches immediately.
// WatchAggregator will be redeployed with new watches if needed instead.
final LinkedHashMap<Long, Watch> cpWatches = new LinkedHashMap<>(watches);
return new WatchListener() {
@Override public boolean onUpdate(@NotNull WatchEvent evt) {
var watchIt = cpWatches.entrySet().iterator();
Collection<Long> toCancel = new ArrayList<>();
while (watchIt.hasNext()) {
Map.Entry<Long, WatchAggregator.Watch> entry = watchIt.next();
WatchAggregator.Watch watch = entry.getValue();
var filteredEvts = new ArrayList<EntryEvent>();
for (EntryEvent entryEvt : evt.entryEvents()) {
if (watch.keyCriterion().contains(entryEvt.oldEntry().key()))
filteredEvts.add(entryEvt);
}
if (!filteredEvts.isEmpty()) {
if (!watch.listener().onUpdate(new WatchEvent(filteredEvts))) {
watchIt.remove();
toCancel.add(entry.getKey());
}
}
}
// Cancel finished watches from the global watch map
// to prevent finished watches from redeploy.
if (!toCancel.isEmpty())
cancelAll(toCancel);
var revision = 0L;
var entries = new ArrayList<IgniteBiTuple<ByteArray, byte[]>>();
for (EntryEvent entryEvt: evt.entryEvents()) {
revision = entryEvt.newEntry().revision();
entries.add(new IgniteBiTuple<>(entryEvt.newEntry().key(), entryEvt.newEntry().value()));
}
storeRevision.accept(entries, revision);
return true;
}
@Override public void onError(@NotNull Throwable e) {
watches.values().forEach(w -> w.listener().onError(e));
}
};
}
/**
* (key criterion, watch listener) container.
*/
private static class Watch {
/** Key criterion. */
private final KeyCriterion keyCriterion;
/** Watch listener. */
private final WatchListener lsnr;
/** Creates the watch. */
private Watch(KeyCriterion keyCriterion, WatchListener lsnr) {
this.keyCriterion = keyCriterion;
this.lsnr = lsnr;
}
/**
* @return key criterion.
*/
public KeyCriterion keyCriterion() {
return keyCriterion;
}
/**
* @return watch listener.
*/
public WatchListener listener() {
return lsnr;
}
}
}