blob: a72f728f7b405dfe37e7d48559b3275cf0b1df35 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.alert.engine.sorter;
import org.apache.eagle.alert.engine.PartitionedEventCollector;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.eagle.common.DateTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* TODO: Make sure thread-safe.
* TODO: Leverage Off-Heap Memory to persist append-only events collection.
*/
public abstract class BaseStreamWindow implements StreamWindow {
private final long endTime;
private final long startTime;
private final long margin;
private final AtomicBoolean expired;
private final long createdTime;
private static final Logger LOG = LoggerFactory.getLogger(BaseStreamWindow.class);
private PartitionedEventCollector collector;
private final AtomicLong lastFlushedStreamTime;
private final AtomicLong lastFlushedSystemTime;
public BaseStreamWindow(long startTime, long endTime, long marginTime) {
if (startTime >= endTime) {
throw new IllegalArgumentException("startTime: " + startTime + " >= endTime: " + endTime + ", expected: startTime < endTime");
}
if (marginTime > endTime - startTime) {
throw new IllegalArgumentException("marginTime: " + marginTime + " > endTime: " + endTime + " - startTime " + startTime + ", expected: marginTime < endTime - startTime");
}
this.startTime = startTime;
this.endTime = endTime;
this.margin = marginTime;
this.expired = new AtomicBoolean(false);
this.createdTime = System.currentTimeMillis();
this.lastFlushedStreamTime = new AtomicLong(0);
this.lastFlushedSystemTime = new AtomicLong(this.createdTime);
}
@Override
public void register(PartitionedEventCollector collector) {
if (this.collector != null) {
throw new IllegalArgumentException("Duplicated collector error");
}
this.collector = collector;
}
@Override
public long createdTime() {
return createdTime;
}
public long startTime() {
return this.startTime;
}
@Override
public long rejectTime() {
return this.lastFlushedStreamTime.get();
}
@Override
public long margin() {
return this.margin;
}
public long endTime() {
return this.endTime;
}
public boolean accept(final long eventTime) {
return !expired() && eventTime >= startTime && eventTime < endTime
&& eventTime >= lastFlushedStreamTime.get(); // dropped
}
public boolean expired() {
return expired.get();
}
@Override
public boolean alive() {
return !expired.get();
}
/**
* Expire when
* 1) If stream time >= endTime + marginTime, then flush and expire
* 2) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime >= endTime, then flush and expire.
* 3) If systemTime - flushedTime > endTime - startTime + marginTime && streamTime < endTime, then flush but not expire.
* 4) else do nothing
*
* @param clock stream time clock
* @param globalSystemTime system time clock
*/
@Override
public synchronized void onTick(StreamTimeClock clock, long globalSystemTime) {
if (!expired()) {
if (clock.getTime() >= endTime + margin) {
LOG.info("Expiring {} at stream time:{}, latency:{}, window: {}", clock.getStreamId(),
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
lastFlushedStreamTime.set(clock.getTime());
lastFlushedSystemTime.set(globalSystemTime);
flush();
expired.set(true);
} else if (globalSystemTime - lastFlushedSystemTime.get() >= endTime + margin - startTime && size() > 0) {
LOG.info("Flushing {} at system time: {}, stream time: {}, latency: {}, window: {}", clock.getStreamId(),
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(globalSystemTime),
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(clock.getTime()), globalSystemTime - lastFlushedSystemTime.get(), this);
lastFlushedStreamTime.set(clock.getTime());
lastFlushedSystemTime.set(globalSystemTime);
flush();
if (lastFlushedStreamTime.get() >= this.endTime) {
expired.set(true);
}
}
} else {
LOG.warn("Window has already expired, should not tick: {}", this.toString());
}
}
public void close() {
flush();
expired.set(true);
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(startTime).append(endTime).append(margin).build();
}
@Override
public boolean equals(Object obj) {
if (obj != null && obj instanceof BaseStreamWindow) {
BaseStreamWindow another = (BaseStreamWindow) obj;
return another.startTime == this.startTime && another.endTime == this.endTime && another.margin == this.margin;
}
return false;
}
@Override
public void flush() {
if (this.collector == null) {
throw new NullPointerException("Collector is not given before window flush");
}
this.flush(collector);
}
/**
* @param collector PartitionedEventCollector.
* @return max timestamp.
*/
protected abstract void flush(PartitionedEventCollector collector);
@Override
public String toString() {
return String.format("StreamWindow[period=[%s,%s), margin=%s ms, size=%s, reject=%s]",
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime),
DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.endTime),
this.margin,
size(),
this.rejectTime() == 0 ? DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.startTime) : DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.rejectTime())
);
}
}