| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kafka.queue; |
| |
| import org.slf4j.Logger; |
| |
| import java.util.OptionalLong; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.function.Function; |
| |
| |
| public interface EventQueue extends AutoCloseable { |
| interface Event { |
| /** |
| * Run the event. |
| */ |
| void run() throws Exception; |
| |
| /** |
| * Handle an exception that was either generated by running the event, or by the |
| * event queue's inability to run the event. |
| * |
| * @param e The exception. This will be a TimeoutException if the event hit |
| * its deadline before it could be scheduled. |
| * It will be a RejectedExecutionException if the event could not be |
| * scheduled because the event queue has already been closed. |
| * Otherwise, it will be whatever exception was thrown by run(). |
| */ |
| default void handleException(Throwable e) {} |
| } |
| |
| abstract class FailureLoggingEvent implements Event { |
| private final Logger log; |
| |
| public FailureLoggingEvent(Logger log) { |
| this.log = log; |
| } |
| |
| @Override |
| public void handleException(Throwable e) { |
| if (e instanceof RejectedExecutionException) { |
| log.info("Not processing {} because the event queue is closed.", this); |
| } else { |
| log.error("Unexpected error handling {}", this, e); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return this.getClass().getSimpleName(); |
| } |
| } |
| |
| class NoDeadlineFunction implements Function<OptionalLong, OptionalLong> { |
| public static final NoDeadlineFunction INSTANCE = new NoDeadlineFunction(); |
| |
| @Override |
| public OptionalLong apply(OptionalLong ignored) { |
| return OptionalLong.empty(); |
| } |
| } |
| |
| class DeadlineFunction implements Function<OptionalLong, OptionalLong> { |
| private final long deadlineNs; |
| |
| public DeadlineFunction(long deadlineNs) { |
| this.deadlineNs = deadlineNs; |
| } |
| |
| @Override |
| public OptionalLong apply(OptionalLong ignored) { |
| return OptionalLong.of(deadlineNs); |
| } |
| } |
| |
| class EarliestDeadlineFunction implements Function<OptionalLong, OptionalLong> { |
| private final long newDeadlineNs; |
| |
| public EarliestDeadlineFunction(long newDeadlineNs) { |
| this.newDeadlineNs = newDeadlineNs; |
| } |
| |
| @Override |
| public OptionalLong apply(OptionalLong prevDeadlineNs) { |
| if (prevDeadlineNs.isEmpty()) { |
| return OptionalLong.of(newDeadlineNs); |
| } else if (prevDeadlineNs.getAsLong() < newDeadlineNs) { |
| return prevDeadlineNs; |
| } else { |
| return OptionalLong.of(newDeadlineNs); |
| } |
| } |
| } |
| |
| class LatestDeadlineFunction implements Function<OptionalLong, OptionalLong> { |
| private final long newDeadlineNs; |
| |
| public LatestDeadlineFunction(long newDeadlineNs) { |
| this.newDeadlineNs = newDeadlineNs; |
| } |
| |
| @Override |
| public OptionalLong apply(OptionalLong prevDeadlineNs) { |
| if (prevDeadlineNs.isEmpty()) { |
| return OptionalLong.of(newDeadlineNs); |
| } else if (prevDeadlineNs.getAsLong() > newDeadlineNs) { |
| return prevDeadlineNs; |
| } else { |
| return OptionalLong.of(newDeadlineNs); |
| } |
| } |
| } |
| |
| class VoidEvent implements Event { |
| public static final VoidEvent INSTANCE = new VoidEvent(); |
| |
| @Override |
| public void run() throws Exception { |
| } |
| } |
| |
| /** |
| * Add an element to the front of the queue. |
| * |
| * @param event The mandatory event to prepend. |
| */ |
| default void prepend(Event event) { |
| enqueue(EventInsertionType.PREPEND, null, NoDeadlineFunction.INSTANCE, event); |
| } |
| |
| /** |
| * Add an element to the end of the queue. |
| * |
| * @param event The event to append. |
| */ |
| default void append(Event event) { |
| enqueue(EventInsertionType.APPEND, null, NoDeadlineFunction.INSTANCE, event); |
| } |
| |
| /** |
| * Add an event to the end of the queue. |
| * |
| * @param deadlineNs The deadline for starting the event, in monotonic |
| * nanoseconds. If the event has not started by this |
| * deadline, handleException is called with a |
| * {@link org.apache.kafka.common.errors.TimeoutException}, |
| * and the event is cancelled. |
| * @param event The event to append. |
| */ |
| default void appendWithDeadline(long deadlineNs, Event event) { |
| enqueue(EventInsertionType.APPEND, null, new DeadlineFunction(deadlineNs), event); |
| } |
| |
| /** |
| * Schedule an event to be run at a specific time. |
| * |
| * @param tag If this is non-null, the unique tag to use for this |
| * event. If an event with this tag already exists, it |
| * will be cancelled. |
| * @param deadlineNsCalculator A function which takes as an argument the existing |
| * deadline for the event with this tag (or empty if the |
| * event has no tag, or if there is none such), and |
| * produces the deadline to use for this event. |
| * Once the deadline has arrived, the event will be |
| * run. Events whose deadlines are only a few nanoseconds |
| * apart may be executed in any order. |
| * @param event The event to schedule. |
| */ |
| default void scheduleDeferred(String tag, |
| Function<OptionalLong, OptionalLong> deadlineNsCalculator, |
| Event event) { |
| enqueue(EventInsertionType.DEFERRED, tag, deadlineNsCalculator, event); |
| } |
| |
| /** |
| * Cancel a deferred event. |
| * |
| * @param tag The unique tag for the event to be cancelled. Must be |
| * non-null. If the event with the tag has not been |
| * scheduled, this call will be ignored. |
| */ |
| void cancelDeferred(String tag); |
| |
| enum EventInsertionType { |
| PREPEND, |
| APPEND, |
| DEFERRED |
| } |
| |
| /** |
| * Add an event to the queue. |
| * |
| * @param insertionType How to insert the event. |
| * PREPEND means insert the event as the first thing |
| * to run. APPEND means insert the event as the last |
| * thing to run. DEFERRED means insert the event to |
| * run after a delay. |
| * @param tag If this is non-null, the unique tag to use for |
| * this event. If an event with this tag already |
| * exists, it will be cancelled. |
| * @param deadlineNsCalculator If this is non-null, it is a function which takes |
| * as an argument the existing deadline for the |
| * event with this tag (or null if the event has no |
| * tag, or if there is none such), and produces the |
| * deadline to use for this event (or empty to use |
| * none.) Events whose deadlines are only a few |
| * nanoseconds apart may be executed in any order. |
| * @param event The event to enqueue. |
| */ |
| void enqueue(EventInsertionType insertionType, |
| String tag, |
| Function<OptionalLong, OptionalLong> deadlineNsCalculator, |
| Event event); |
| |
| /** |
| * Asynchronously shut down the event queue. |
| * <br> |
| * No new events will be accepted, and the queue thread will exit after running the existing events. |
| * Deferred events will receive TimeoutExceptions. |
| * |
| * @param source The source of the shutdown. |
| */ |
| void beginShutdown(String source); |
| |
| /** |
| * @return The number of pending and running events. If this is 0, there is no running event and |
| * no events queued. |
| */ |
| int size(); |
| |
| /** |
| * @return True if there are no pending or running events. |
| */ |
| default boolean isEmpty() { |
| return size() == 0; |
| } |
| |
| /** |
| * This method is used during unit tests where MockTime is in use. |
| * It is used to alert the queue that the mock time has changed. |
| */ |
| default void wakeup() { } |
| |
| /** |
| * Synchronously close the event queue and wait for any threads to be joined. |
| */ |
| void close() throws InterruptedException; |
| } |