blob: a35be52d073178d444869e91e15ebf6d411328b6 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.analytics.sensors;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.edgent.function.Predicate;
/**
* Deadtime {@link Predicate}.
* <p>
* {@link #test(Object) test()} returns true on its initial call
* and then false for any calls occurring during the following deadtime period.
* After the end of a deadtime period, the next call to {@code test()}
* returns true and a new deadtime period is begun.
* </p><p>
* The deadtime period may be changed while the org.apache.edgent.org.apache.edgent.topology is running
* via {@link #setPeriod(long, TimeUnit)}.
* </p>
*
* @param <T> tuple type
* @see Filters#deadtime(org.apache.edgent.topology.TStream, long, TimeUnit) Filters.deadtime()
*/
public class Deadtime<T> implements Predicate<T> {
private static final long serialVersionUID = 1L;
private long deadtimePeriodMillis;
private long lastTrueTimeMillis;
private volatile long nextTrueTimeMillis;
/**
* Create a new Deadtime Predicate
* <p>
* Same as {@code Deadtime(0, TimeUnit.SECONDS)}
*/
public Deadtime() {
setPeriod(0, TimeUnit.SECONDS);
}
/**
* Create a new Deadtime Predicate
* <p>
* The first received tuple is always "accepted".
* @param deadtimePeriod see {@link #setPeriod(long, TimeUnit) setPeriod()}
* @param unit {@link TimeUnit} of {@code deadtimePeriod}
*/
public Deadtime(long deadtimePeriod, TimeUnit unit) {
setPeriod(deadtimePeriod, unit);
}
/**
* Set the deadtime period
* <p>
* The end of a currently active deadtime period is shortened or extended
* to match the new deadtime period specification.
* </p><p>
* The deadtime period behavior is subject to the accuracy
* of the system's {@link System#currentTimeMillis()}.
* A period of less than 1ms is equivalent to specifying 0.
* </p>
* @param deadtimePeriod the amount of time for {@code test()}
* to return false after returning true.
* Specify a value of 0 for no deadtime period.
* Must be &gt;= 0.
* @param unit {@link TimeUnit} of {@code deadtimePeriod}
*/
public synchronized void setPeriod(long deadtimePeriod, TimeUnit unit) {
if (deadtimePeriod < 0)
throw new IllegalArgumentException("deadtimePeriod");
Objects.requireNonNull(unit, "unit");
deadtimePeriodMillis = unit.toMillis(deadtimePeriod);
nextTrueTimeMillis = lastTrueTimeMillis + deadtimePeriodMillis;
}
/**
* Test the deadtime predicate.
* @param value ignored
* @return false if in a deadtime period, true otherwise
*/
@Override
public boolean test(T value) {
long now = System.currentTimeMillis();
if (now < nextTrueTimeMillis)
return false;
else synchronized(this) {
lastTrueTimeMillis = now;
nextTrueTimeMillis = now + deadtimePeriodMillis;
return true;
}
}
/**
* Returns a String for development/debug support. Content subject to change.
*/
@Override
public String toString() {
return "nextPass after "+new Date(nextTrueTimeMillis);
}
}