blob: 5c99e4c307d7cd69d5950a79a9f64c39cde5f4b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.delayed;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@Slf4j
public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {
protected final PersistentDispatcherMultipleConsumers dispatcher;
// Reference to the shared (per-broker) timer for delayed delivery
protected final Timer timer;
// Current timeout or null if not set
protected Timeout timeout;
// Timestamp at which the timeout is currently set
private long currentTimeoutTarget;
// Last time the TimerTask was triggered for this class
private long lastTickRun;
protected long tickTimeMillis;
protected final Clock clock;
private final boolean isDelayedDeliveryDeliverAtTimeStrict;
public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
}
public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this.dispatcher = dispatcher;
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
}
/**
* When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the
* {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay
* tracker for a brief amount of time when we're already trying to dispatch to the consumer.
*
* When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages
* can be delivered. As a consequence, there are two delays that will affect delivery. The first is the
* {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity.
*
* @return the cutoff time to determine whether a message is ready to deliver to the consumer
*/
protected long getCutoffTime() {
return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis;
}
public void resetTickTime(long tickTime) {
if (this.tickTimeMillis != tickTime) {
this.tickTimeMillis = tickTime;
}
}
protected void updateTimer() {
if (getNumberOfDelayedMessages() == 0) {
if (timeout != null) {
currentTimeoutTarget = -1;
timeout.cancel();
timeout = null;
}
return;
}
long timestamp = nextDeliveryTime();
if (timestamp == currentTimeoutTarget) {
// The timer is already set to the correct target time
return;
}
if (timeout != null) {
timeout.cancel();
}
long now = clock.millis();
long delayMillis = timestamp - now;
if (delayMillis < 0) {
// There are messages that are already ready to be delivered. If
// the dispatcher is not getting them is because the consumer is
// either not connected or slow.
// We don't need to keep retriggering the timer. When the consumer
// catches up, the dispatcher will do the readMoreEntries() and
// get these messages
return;
}
// Compute the earliest time that we schedule the timer to run.
long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);
if (log.isDebugEnabled()) {
log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis);
}
// Even though we may delay longer than this timestamp because of the tick delay, we still track the
// current timeout with reference to the next message's timestamp.
currentTimeoutTarget = timestamp;
timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
}
@Override
public void run(Timeout timeout) throws Exception {
if (log.isDebugEnabled()) {
log.debug("[{}] Timer triggered", dispatcher.getName());
}
if (timeout == null || timeout.isCancelled()) {
return;
}
synchronized (dispatcher) {
lastTickRun = clock.millis();
currentTimeoutTarget = -1;
this.timeout = null;
dispatcher.readMoreEntries();
}
}
@Override
public void close() {
if (timeout != null) {
timeout.cancel();
timeout = null;
}
}
protected abstract long nextDeliveryTime();
}