blob: 832d79904c7fa3ff854baf9570ec31bc5c399d24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.asyncprocessing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.LinkedList;
/**
* Epoch manager segments inputs into distinct epochs, marked by the arrival of non-records(e.g.
* watermark, record attributes). Records are assigned to a unique epoch based on their arrival,
* records within an epoch are allowed to be parallelized, while the non-record of an epoch can only
* be executed when all records in this epoch have finished.
*
* <p>For more details please refer to FLIP-425.
*/
public class EpochManager {
private static final Logger LOG = LoggerFactory.getLogger(EpochManager.class);
/**
* This enum defines whether parallel execution between epochs is allowed. We should keep this
* internal and away from API module for now, until we could see the concrete need for {@link
* #PARALLEL_BETWEEN_EPOCH} from average users.
*/
public enum ParallelMode {
/**
* Subsequent epochs must wait until the previous epoch is completed before they can start.
*/
SERIAL_BETWEEN_EPOCH,
/**
* Subsequent epochs can begin execution even if the previous epoch has not yet completed.
* Usually performs better than {@link #SERIAL_BETWEEN_EPOCH}.
*/
PARALLEL_BETWEEN_EPOCH
}
/**
* The reference to the {@link AsyncExecutionController}, used for {@link
* ParallelMode#SERIAL_BETWEEN_EPOCH}. Can be null when testing.
*/
final AsyncExecutionController<?> asyncExecutionController;
/** The number of epochs that have arrived. */
long epochNum;
/** The output queue to hold ongoing epochs. */
LinkedList<Epoch> outputQueue;
/** Current active epoch, only one active epoch at the same time. */
Epoch activeEpoch;
public EpochManager(AsyncExecutionController<?> aec) {
this.epochNum = 0;
this.outputQueue = new LinkedList<>();
this.asyncExecutionController = aec;
// init an empty epoch, the epoch action will be updated when non-record is received.
this.activeEpoch = new Epoch(epochNum++);
}
/**
* Add a record to the current epoch and return the current open epoch, the epoch will be
* associated with the {@link RecordContext} of this record. Must be invoked within task thread.
*
* @return the current open epoch.
*/
public Epoch onRecord() {
activeEpoch.ongoingRecordCount++;
return activeEpoch;
}
/**
* Add a non-record to the current epoch, close current epoch and open a new epoch. Must be
* invoked within task thread.
*
* @param action the action associated with this non-record.
* @param parallelMode the parallel mode for this epoch.
*/
public void onNonRecord(Runnable action, ParallelMode parallelMode) {
LOG.trace(
"on NonRecord, old epoch: {}, outputQueue size: {}",
activeEpoch,
outputQueue.size());
switchActiveEpoch(action);
if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
asyncExecutionController.drainInflightRecords(0);
}
}
/**
* Complete one record in the specific epoch. Must be invoked within task thread.
*
* @param epoch the specific epoch
*/
public void completeOneRecord(Epoch epoch) {
if (--epoch.ongoingRecordCount == 0) {
tryFinishInQueue();
}
}
private void tryFinishInQueue() {
// If one epoch has been closed before and all records in
// this epoch have finished, the epoch will be removed from the output queue.
while (!outputQueue.isEmpty() && outputQueue.peek().tryFinish()) {
LOG.trace(
"Finish epoch: {}, outputQueue size: {}",
outputQueue.peek(),
outputQueue.size());
outputQueue.pop();
}
}
private void switchActiveEpoch(Runnable action) {
activeEpoch.close(action);
outputQueue.offer(activeEpoch);
this.activeEpoch = new Epoch(epochNum++);
tryFinishInQueue();
}
/** The status of an epoch, see Fig.6 in FLIP-425 for details. */
enum EpochStatus {
/**
* The subsequent non-record input has not arrived. So arriving records will be collected
* into current epoch.
*/
OPEN,
/**
* The records belong to this epoch is settled since the following non-record input has
* arrived, the newly arriving records would be collected into the next epoch.
*/
CLOSED,
/**
* One epoch can only be finished when it meets the following three conditions. 1. The
* records of this epoch have finished execution. 2. The epoch is closed. 3. The epoch is in
* the front of outputQueue.
*/
FINISHED
}
/**
* All inputs are segment into distinct epochs, marked by the arrival of non-record inputs.
* Records are assigned to a unique epoch based on their arrival.
*/
public static class Epoch {
/** The id of this epoch for easy debugging. */
long id;
/** The number of records that are still ongoing in this epoch. */
int ongoingRecordCount;
/** The action associated with non-record of this epoch(e.g. advance watermark). */
@Nullable Runnable action;
EpochStatus status;
public Epoch(long id) {
this.id = id;
this.ongoingRecordCount = 0;
this.status = EpochStatus.OPEN;
this.action = null;
}
/**
* Try to finish this epoch.
*
* @return whether this epoch has been finished.
*/
boolean tryFinish() {
if (this.status == EpochStatus.FINISHED) {
return true;
}
if (ongoingRecordCount == 0 && this.status == EpochStatus.CLOSED) {
this.status = EpochStatus.FINISHED;
if (action != null) {
action.run();
}
return true;
}
return false;
}
/** Close this epoch. */
void close(Runnable action) {
this.action = action;
this.status = EpochStatus.CLOSED;
}
public String toString() {
return String.format(
"Epoch{id=%d, ongoingRecord=%d, status=%s}", id, ongoingRecordCount, status);
}
}
}