blob: de195e7dd689cdc5469f979e078eaf4ce2c4a303 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.util;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
/**
* Keeps track of a added long values. Collapses ranges of numbers using a
* Sequence representation. Use to keep track of received message ids to find
* out if a message is duplicate or if there are any missing messages.
*
* @author chirino
*/
public class SequenceSet extends LinkedNodeList<Sequence> {
public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet> {
public static final Marshaller INSTANCE = new Marshaller();
public SequenceSet readPayload(DataInput in) throws IOException {
SequenceSet value = new SequenceSet();
int count = in.readInt();
for (int i = 0; i < count; i++) {
if( in.readBoolean() ) {
Sequence sequence = new Sequence(in.readLong(), in.readLong());
value.addLast(sequence);
} else {
Sequence sequence = new Sequence(in.readLong());
value.addLast(sequence);
}
}
return value;
}
public void writePayload(SequenceSet value, DataOutput out) throws IOException {
out.writeInt(value.size());
Sequence sequence = value.getHead();
while (sequence != null ) {
if( sequence.range() > 1 ) {
out.writeBoolean(true);
out.writeLong(sequence.first);
out.writeLong(sequence.last);
} else {
out.writeBoolean(false);
out.writeLong(sequence.first);
}
sequence = sequence.getNext();
}
}
public int getFixedSize() {
return -1;
}
public SequenceSet deepCopy(SequenceSet value) {
SequenceSet rc = new SequenceSet();
Sequence sequence = value.getHead();
while (sequence != null ) {
rc.add(new Sequence(sequence.first, sequence.last));
sequence = sequence.getNext();
}
return rc;
}
public boolean isDeepCopySupported() {
return true;
}
}
public void add(Sequence value) {
// TODO we can probably optimize this a bit
for(long i=value.first; i<value.last+1; i++) {
add(i);
}
}
/**
*
* @param value
* the value to add to the list
* @return false if the value was a duplicate.
*/
public boolean add(long value) {
if (isEmpty()) {
addFirst(new Sequence(value));
return true;
}
Sequence sequence = getHead();
while (sequence != null) {
if (sequence.isAdjacentToLast(value)) {
// grow the sequence...
sequence.last = value;
// it might connect us to the next sequence..
if (sequence.getNext() != null) {
Sequence next = sequence.getNext();
if (next.isAdjacentToFirst(value)) {
// Yep the sequence connected.. so join them.
sequence.last = next.last;
next.unlink();
}
}
return true;
}
if (sequence.isAdjacentToFirst(value)) {
// grow the sequence...
sequence.first = value;
// it might connect us to the previous
if (sequence.getPrevious() != null) {
Sequence prev = sequence.getPrevious();
if (prev.isAdjacentToLast(value)) {
// Yep the sequence connected.. so join them.
sequence.first = prev.first;
prev.unlink();
}
}
return true;
}
// Did that value land before this sequence?
if (value < sequence.first) {
// Then insert a new entry before this sequence item.
sequence.linkBefore(new Sequence(value));
return true;
}
// Did that value land within the sequence? The it's a duplicate.
if (sequence.contains(value)) {
return false;
}
sequence = sequence.getNext();
}
// Then the value is getting appended to the tail of the sequence.
addLast(new Sequence(value));
return true;
}
/**
* Removes and returns the first element from this list.
*
* @return the first element from this list.
* @throws NoSuchElementException if this list is empty.
*/
public long removeFirst() {
if (isEmpty()) {
throw new NoSuchElementException();
}
Sequence rc = removeFirstSequence(1);
return rc.first;
}
public Sequence removeLastSequence() {
if (isEmpty()) {
return null;
}
Sequence rc = getTail();
rc.unlink();
return rc;
}
/**
* Removes and returns the first sequence that is count range large.
*
* @return a sequence that is count range large, or null if no sequence is that large in the list.
*/
public Sequence removeFirstSequence(long count) {
if (isEmpty()) {
return null;
}
Sequence sequence = getHead();
while (sequence != null ) {
if (sequence.range() == count ) {
sequence.unlink();
return sequence;
}
if (sequence.range() > count ) {
Sequence rc = new Sequence(sequence.first, sequence.first+count);
sequence.first+=count;
return rc;
}
sequence = sequence.getNext();
}
return null;
}
/**
* @return all the id Sequences that are missing from this set that are not
* in between the range provided.
*/
public List<Sequence> getMissing(long first, long last) {
ArrayList<Sequence> rc = new ArrayList<Sequence>();
if (first > last) {
throw new IllegalArgumentException("First cannot be more than last");
}
if (isEmpty()) {
// We are missing all the messages.
rc.add(new Sequence(first, last));
return rc;
}
Sequence sequence = getHead();
while (sequence != null && first <= last) {
if (sequence.contains(first)) {
first = sequence.last + 1;
} else {
if (first < sequence.first) {
if (last < sequence.first) {
rc.add(new Sequence(first, last));
return rc;
} else {
rc.add(new Sequence(first, sequence.first - 1));
first = sequence.last + 1;
}
}
}
sequence = sequence.getNext();
}
if (first <= last) {
rc.add(new Sequence(first, last));
}
return rc;
}
/**
* @return all the Sequence that are in this list
*/
public List<Sequence> getReceived() {
ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
Sequence sequence = getHead();
while (sequence != null) {
rc.add(new Sequence(sequence.first, sequence.last));
sequence = sequence.getNext();
}
return rc;
}
public boolean contains(int first, int last) {
if (isEmpty()) {
return false;
}
Sequence sequence = getHead();
while (sequence != null) {
if (sequence.first <= first && first <= sequence.last ) {
return last <= sequence.last ;
}
sequence = sequence.getNext();
}
return false;
}
}