blob: 0d4029dd6e868e4c85451593c5fcc48163da7e01 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.PublicEvolving;
import java.io.Serializable;
/** Describe the input selection that stream operators want to read records. */
@PublicEvolving
public final class InputSelection implements Serializable {
public static final int NONE_AVAILABLE = -1;
private static final long serialVersionUID = 1L;
/** The {@code InputSelection} instance which indicates to select all inputs. */
public static final InputSelection ALL = new InputSelection(-1);
/** The {@code InputSelection} instance which indicates to select the first input. */
public static final InputSelection FIRST = new Builder().select(1).build();
/** The {@code InputSelection} instance which indicates to select the second input. */
public static final InputSelection SECOND = new Builder().select(2).build();
private final long inputMask;
/** @param inputMask -1 to mark if all inputs are selected. */
private InputSelection(long inputMask) {
this.inputMask = inputMask;
}
public long getInputMask() {
return inputMask;
}
/**
* Tests if the input specified by {@code inputId} is selected.
*
* @param inputId The input id, see the description of {@code inputId} in {@link
* Builder#select(int)}.
* @return {@code true} if the input is selected, {@code false} otherwise.
*/
public boolean isInputSelected(int inputId) {
return (inputMask & (1L << (inputId - 1))) != 0;
}
/**
* Tests if all inputs are selected.
*
* @return {@code true} if the input mask equals -1, {@code false} otherwise.
*/
public boolean areAllInputsSelected() {
return inputMask == -1L;
}
/**
* Fairly select one of the two inputs for reading. When {@code inputMask} includes two inputs
* and both inputs are available, alternately select one of them. Otherwise, select the
* available one of {@code inputMask}, or return {@link InputSelection#NONE_AVAILABLE} to
* indicate no input is selected.
*
* <p>Note that this supports only two inputs for performance reasons.
*
* @param availableInputsMask The mask of all available inputs.
* @param lastReadInputIndex The index of last read input.
* @return the index of the input for reading or {@link InputSelection#NONE_AVAILABLE} (if
* {@code inputMask} is empty or the inputs in {@code inputMask} are unavailable).
*/
public int fairSelectNextIndexOutOf2(int availableInputsMask, int lastReadInputIndex) {
return fairSelectNextIndexOutOf2((int) inputMask, availableInputsMask, lastReadInputIndex);
}
/**
* Fairly select one of the two inputs for reading. When {@code inputMask} includes two inputs
* and both inputs are available, alternately select one of them. Otherwise, select the
* available one of {@code inputMask}, or return {@link InputSelection#NONE_AVAILABLE} to
* indicate no input is selected.
*
* <p>Note that this supports only two inputs for performance reasons.
*
* @param selectionMask The mask of inputs that are selected. Note -1 for this is interpreted as
* all of the 32 inputs are available.
* @param availableInputsMask The mask of all available inputs.
* @param lastReadInputIndex The index of last read input.
* @return the index of the input for reading or {@link InputSelection#NONE_AVAILABLE} (if
* {@code inputMask} is empty or the inputs in {@code inputMask} are unavailable).
*/
public static int fairSelectNextIndexOutOf2(
int selectionMask, int availableInputsMask, int lastReadInputIndex) {
int combineMask = availableInputsMask & selectionMask;
if (combineMask == 3) {
return lastReadInputIndex == 0 ? 1 : 0;
} else if (combineMask >= 0 && combineMask < 3) {
return combineMask - 1;
}
throw new UnsupportedOperationException("Only two inputs are supported.");
}
/**
* Fairly select one of the available inputs for reading.
*
* @param availableInputsMask The mask of all available inputs. Note -1 for this is interpreted
* as all of the 32 inputs are available.
* @param lastReadInputIndex The index of last read input.
* @return the index of the input for reading or {@link InputSelection#NONE_AVAILABLE} (if
* {@code inputMask} is empty or the inputs in {@code inputMask} are unavailable).
*/
public int fairSelectNextIndex(long availableInputsMask, int lastReadInputIndex) {
return fairSelectNextIndex(inputMask, availableInputsMask, lastReadInputIndex);
}
/**
* Fairly select one of the available inputs for reading.
*
* @param inputMask The mask of inputs that are selected. Note -1 for this is interpreted as all
* of the 32 inputs are available.
* @param availableInputsMask The mask of all available inputs. Note -1 for this is interpreted
* as all of the 32 inputs are available.
* @param lastReadInputIndex The index of last read input.
* @return the index of the input for reading or {@link InputSelection#NONE_AVAILABLE} (if
* {@code inputMask} is empty or the inputs in {@code inputMask} are unavailable).
*/
public static int fairSelectNextIndex(
long inputMask, long availableInputsMask, int lastReadInputIndex) {
long combineMask = availableInputsMask & inputMask;
if (combineMask == 0) {
return NONE_AVAILABLE;
}
int nextReadInputIndex = selectFirstBitRightFromNext(combineMask, lastReadInputIndex + 1);
if (nextReadInputIndex >= 0) {
return nextReadInputIndex;
}
return selectFirstBitRightFromNext(combineMask, 0);
}
private static int selectFirstBitRightFromNext(long bits, int next) {
if (next >= 64) {
return NONE_AVAILABLE;
}
for (bits >>>= next; bits != 0 && (bits & 1) != 1; bits >>>= 1, next++) {}
return bits != 0 ? next : NONE_AVAILABLE;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InputSelection that = (InputSelection) o;
return inputMask == that.inputMask;
}
@Override
public String toString() {
return String.valueOf(inputMask);
}
/** Utility class for creating {@link InputSelection}. */
public static final class Builder {
private long inputMask = 0;
/**
* Returns a {@code Builder} that uses the input mask of the specified {@code selection} as
* the initial mask.
*/
public static Builder from(InputSelection selection) {
Builder builder = new Builder();
builder.inputMask = selection.inputMask;
return builder;
}
/**
* Selects an input identified by the given {@code inputId}.
*
* @param inputId the input id numbered starting from 1 to 64, and `1` indicates the first
* input. Specially, `-1` indicates all inputs.
* @return a reference to this object.
*/
public Builder select(int inputId) {
if (inputId > 0 && inputId <= 64) {
inputMask |= 1L << (inputId - 1);
} else if (inputId == -1L) {
inputMask = -1L;
} else {
throw new IllegalArgumentException(
"The inputId must be in the range of 1 to 64, or be -1.");
}
return this;
}
/**
* Build normalized mask, if all inputs were manually selected, inputMask will be normalized
* to -1.
*/
public InputSelection build(int inputCount) {
long allSelectedMask = (1L << inputCount) - 1;
if (inputMask == allSelectedMask) {
inputMask = -1;
} else if (inputMask > allSelectedMask) {
throw new IllegalArgumentException(
String.format(
"inputMask [%d] selects more than expected number of inputs [%d]",
inputMask, inputCount));
}
return build();
}
public InputSelection build() {
return new InputSelection(inputMask);
}
}
}