blob: 4b45d72ae5611a203908810bc559f889b9a0a034 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.query.continuous;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.Nullable;
/**
*
*/
public class CacheContinuousQueryEventBuffer {
/** */
private static final int BUF_SIZE =
IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 1000);
/** */
private static final Object RETRY = new Object();
/** */
protected final int part;
/** */
private AtomicReference<Batch> curBatch = new AtomicReference<>();
/** */
private FastSizeDeque<CacheContinuousQueryEntry> backupQ = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
/** */
private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
/**
* @param part Partition number.
*/
CacheContinuousQueryEventBuffer(int part) {
this.part = part;
}
/**
* @param updateCntr Acknowledged counter.
*/
void cleanupBackupQueue(Long updateCntr) {
Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
while (it.hasNext()) {
CacheContinuousQueryEntry backupEntry = it.next();
if (backupEntry.updateCounter() <= updateCntr)
it.remove();
}
}
/**
* @return Backup entries.
*/
@Nullable Collection<CacheContinuousQueryEntry> flushOnExchange() {
TreeMap<Long, CacheContinuousQueryEntry> ret = null;
int size = backupQ.sizex();
if (size > 0) {
ret = new TreeMap<>();
for (int i = 0; i < size; i++) {
CacheContinuousQueryEntry e = backupQ.pollFirst();
if (e != null)
ret.put(e.updateCounter(), e);
else
break;
}
}
Batch batch = curBatch.get();
if (batch != null)
ret = batch.flushCurrentEntries(ret);
if (!pending.isEmpty()) {
if (ret == null)
ret = new TreeMap<>();
for (CacheContinuousQueryEntry e : pending.values())
ret.put(e.updateCounter(), e);
}
return ret != null ? ret.values() : null;
}
/**
* @param backup {@code True} if backup context.
* @return Initial partition counter.
*/
protected long currentPartitionCounter(boolean backup) {
return 0;
}
/**
* For test purpose only.
*
* @return Current number of filtered events.
*/
long currentFiltered() {
Batch batch = curBatch.get();
return batch != null ? batch.filtered : 0;
}
/**
* @param e Entry to process.
* @param backup Backup entry flag.
* @return Collected entries to pass to listener (single entry or entries list).
*/
@Nullable Object processEntry(CacheContinuousQueryEntry e, boolean backup) {
return process0(e.updateCounter(), e, backup);
}
/**
* @param backup Backup entry flag.
* @param cntr Entry counter.
* @param entry Entry.
* @return Collected entries.
*/
private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) {
assert cntr >= 0 : cntr;
Batch batch;
Object res = null;
for (;;) {
batch = initBatch(entry.topologyVersion(), backup);
if (batch == null || cntr < batch.startCntr) {
if (backup) {
backupQ.add(entry);
return null;
}
return entry;
}
if (cntr <= batch.endCntr) {
res = batch.processEntry0(null, cntr, entry, backup);
if (res == RETRY)
continue;
}
else
pending.put(cntr, entry);
break;
}
Batch batch0 = curBatch.get();
if (batch0 != batch) {
do {
batch = batch0;
res = processPending(res, batch, backup);
batch0 = initBatch(entry.topologyVersion(), backup);
}
while (batch != batch0);
}
return res;
}
/**
* @param topVer Current event topology version.
* @param backup {@code True} if backup entry.
* @return Current batch.
*/
private Batch initBatch(AffinityTopologyVersion topVer, boolean backup) {
Batch batch = curBatch.get();
if (batch != null)
return batch;
for (;;) {
long curCntr = currentPartitionCounter(backup);
if (curCntr == -1)
return null;
batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer);
if (curBatch.compareAndSet(null, batch))
return batch;
batch = curBatch.get();
if (batch != null)
return batch;
}
}
/**
* @param res Current result.
* @param batch Current batch.
* @param backup Backup entry flag.
* @return New result.
*/
@Nullable private Object processPending(@Nullable Object res, Batch batch, boolean backup) {
if (pending.floorKey(batch.endCntr) != null) {
for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr, true).entrySet()) {
long cntr = p.getKey();
assert cntr <= batch.endCntr;
if (pending.remove(p.getKey()) != null) {
if (cntr < batch.startCntr)
res = addResult(res, p.getValue(), backup);
else
res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
}
}
}
return res;
}
/**
* @param res Current result.
* @param entry Entry to add.
* @param backup Backup entry flag.
* @return Updated result.
*/
@Nullable private Object addResult(@Nullable Object res, CacheContinuousQueryEntry entry, boolean backup) {
if (res == null) {
if (backup)
backupQ.add(entry);
else
res = entry;
}
else {
assert !backup;
List<CacheContinuousQueryEntry> resList;
if (res instanceof CacheContinuousQueryEntry) {
resList = new ArrayList<>();
resList.add((CacheContinuousQueryEntry)res);
}
else {
assert res instanceof List : res;
resList = (List<CacheContinuousQueryEntry>)res;
}
resList.add(entry);
res = resList;
}
return res;
}
/**
*
*/
private class Batch {
/** */
private long filtered;
/** */
private final long startCntr;
/** */
private final long endCntr;
/** */
private int lastProc = -1;
/** */
private CacheContinuousQueryEntry[] entries;
/** */
private final AffinityTopologyVersion topVer;
/**
* @param filtered Number of filtered events before this batch.
* @param entries Entries array.
* @param topVer Current event topology version.
* @param startCntr Start counter.
*/
Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
assert startCntr >= 0;
assert filtered >= 0;
this.startCntr = startCntr;
this.filtered = filtered;
this.entries = entries;
this.topVer = topVer;
endCntr = startCntr + BUF_SIZE - 1;
}
/**
* @param res Current entries.
* @return Entries to send as part of backup queue.
*/
@Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
@Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
if (entries == null)
return res;
long filtered = this.filtered;
long cntr = startCntr;
for (int i = 0; i < entries.length; i++) {
CacheContinuousQueryEntry e = entries[i];
CacheContinuousQueryEntry flushEntry = null;
if (e == null) {
if (filtered != 0) {
flushEntry = filteredEntry(cntr - 1, filtered - 1);
filtered = 0;
}
}
else {
if (e.isFiltered())
filtered++;
else {
flushEntry = new CacheContinuousQueryEntry(e.cacheId(),
e.eventType(),
e.key(),
e.value(),
e.oldValue(),
e.isKeepBinary(),
e.partition(),
e.updateCounter(),
e.topologyVersion(),
e.flags());
flushEntry.filteredCount(filtered);
filtered = 0;
}
}
if (flushEntry != null) {
if (res == null)
res = new TreeMap<>();
res.put(flushEntry.updateCounter(), flushEntry);
}
cntr++;
}
if (filtered != 0L) {
if (res == null)
res = new TreeMap<>();
CacheContinuousQueryEntry flushEntry = filteredEntry(cntr - 1, filtered - 1);
res.put(flushEntry.updateCounter(), flushEntry);
}
return res;
}
/**
* @param cntr Entry counter.
* @param filtered Number of entries filtered before this entry.
* @return Entry.
*/
private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) {
CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0,
null,
null,
null,
null,
false,
part,
cntr,
topVer,
(byte)0);
e.markFiltered();
e.filteredCount(filtered);
return e;
}
/**
* @param res Current result.
* @param cntr Entry counter.
* @param entry Entry.
* @param backup Backup entry flag.
* @return New result.
*/
@Nullable private Object processEntry0(
@Nullable Object res,
long cntr,
CacheContinuousQueryEntry entry,
boolean backup) {
int pos = (int)(cntr - startCntr);
synchronized (this) {
if (entries == null)
return RETRY;
entry = entry.copyWithDataReset();
entries[pos] = entry;
int next = lastProc + 1;
if (next == pos) {
for (int i = next; i < entries.length; i++) {
CacheContinuousQueryEntry entry0 = entries[i];
if (entry0 != null) {
if (!entry0.isFiltered()) {
entry0.filteredCount(filtered);
filtered = 0;
res = addResult(res, entry0, backup);
}
else
filtered++;
pos = i;
}
else
break;
}
lastProc = pos;
if (pos == entries.length - 1) {
Arrays.fill(entries, null);
Batch nextBatch = new Batch(this.startCntr + BUF_SIZE,
filtered,
entries,
entry.topologyVersion());
entries = null;
assert curBatch.get() == this;
curBatch.set(nextBatch);
}
}
else
return res;
}
return res;
}
}
}