blob: 402061c519bdd0e14833d7604f7f0e863b8f5e1d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.streams.kafka.processors;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import java.util.Objects;
import org.apache.kafka.streams.processor.Processor;
import org.apache.rya.api.model.VisibilityBindingSet;
import com.google.common.base.Optional;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Represents a value that is emitted from a Rya Streams {@link Processor}. We can't just emit a
* {@link VisibilityBindingSet} because some downstream processors require more information about
* which upstream processor is emitting the result in order to do their work.
* </p>
* Currently there are only two types of processors:
* <ul>
* <li>Unary Processor - A processor that only has a single upstream node feeding it input.</li>
* <li>Binary Processor - A processor that has two upstream nodes feeding it input.</li>
* </ul>
* If a processor is emitting to a unary processor, then use {@link #make(UnaryResult)} to create its
* result. If it is emitting to a binary processor, then use {@link #make(BinaryResult)}.
*/
@DefaultAnnotation(NonNull.class)
public class ProcessorResult {
private final ResultType type;
private final Optional<UnaryResult> unary;
private final Optional<BinaryResult> binary;
/**
* Constructs an instance of {@link ProcessorResult}. Private to force users to use the static factory methods.
*
* @param type - Indicates the type of result this object holds. (not null)
* @param unary - The unary result if that is this object's type. (not null)
* @param binary - The binary result if that is this object's type. (not null)
*/
private ProcessorResult(
final ResultType type,
final Optional<UnaryResult> unary,
final Optional<BinaryResult> binary) {
this.type = requireNonNull(type);
this.unary= requireNonNull(unary);
this.binary= requireNonNull(binary);
}
/**
* @return Indicates the type of result this object holds.
*/
public ResultType getType() {
return type;
}
/**
* @return The unary result if that is this object's type.
* @throws IllegalStateException If this object's type is not {@link ResultType#UNARY}.
*/
public UnaryResult getUnary() throws IllegalStateException {
checkState(type == ResultType.UNARY, "The ResultType must be " + ResultType.UNARY + " to invoke this method, " +
"but it is " + type + ".");
return unary.get();
}
/**
* @return The binary result if that is this object's type.
* @throws IllegalStateException If this object's type is not {@link ResultType#BINARY}.
*/
public BinaryResult getBinary() throws IllegalStateException {
checkState(type == ResultType.BINARY, "The ResultType must be " + ResultType.BINARY + " to invoke this method, " +
"but it is " + type + ".");
return binary.get();
}
@Override
public int hashCode() {
return Objects.hash(type, unary, binary);
}
@Override
public boolean equals(final Object o) {
if(o instanceof ProcessorResult) {
final ProcessorResult other = (ProcessorResult) o;
return Objects.equals(type, other.type) &&
Objects.equals(unary, other.unary) &&
Objects.equals(binary, other.binary);
}
return false;
}
/**
* Creates a {@link ProcessorResult} using the supplied value.
*
* @param result - The result that will be held by the created object. (not null)
* @return An object holding the provided result.
*/
public static ProcessorResult make(final UnaryResult result) {
requireNonNull(result);
return new ProcessorResult(ResultType.UNARY, Optional.of(result), Optional.absent());
}
/**
* Creates a {@link ProcessorResult} using the supplied value.
*
* @param result - The result that will be held by the created object. (not null)
* @return An object holding the provided result.
*/
public static ProcessorResult make(final BinaryResult result) {
requireNonNull(result);
return new ProcessorResult(ResultType.BINARY, Optional.absent(), Optional.of(result));
}
/**
* Indicates the type of result held by a {@link ProcessorResult}.
*/
public static enum ResultType {
/**
* The {@link ProcessorResult} holds a {@link UnaryResult}.
*/
UNARY,
/**
* The {@link ProcessorResult} holds a {@link BinaryResult}.
*/
BINARY;
}
/**
* The result of a Rya Streams {@link Processor} whose downstream processor is unary.
*/
@DefaultAnnotation(NonNull.class)
public static final class UnaryResult {
private final VisibilityBindingSet result;
/**
* Constructs an instance of {@link UnaryResult}.
*
* @param result - The binding set that is being emitted to the downstream unary processor. (not null)
*/
public UnaryResult(final VisibilityBindingSet result) {
this.result = requireNonNull(result);
}
/**
* @return The binding set that is being emitted to the downstream unary processor.
*/
public VisibilityBindingSet getResult() {
return result;
}
@Override
public int hashCode() {
return Objects.hash(result);
}
@Override
public boolean equals(final Object o) {
if(o instanceof UnaryResult) {
final UnaryResult other = (UnaryResult) o;
return Objects.equals(result, other.result);
}
return false;
}
}
/**
* The result of a Rya Streams {@link Processor} whose downstream processor is binary.
*/
@DefaultAnnotation(NonNull.class)
public static final class BinaryResult {
private final Side side;
private final VisibilityBindingSet result;
/**
* Constructs an instance of {@link BinaryResult}.
*
* @param side - Which side of the downstream binary processor the result is being emitted to. (not null)
* @param result - The binding set that is being emitted to the downstream binary processor. (not null)
*/
public BinaryResult(final Side side, final VisibilityBindingSet result) {
this.side = requireNonNull(side);
this.result = requireNonNull(result);
}
/**
* @return Which side of the downstream binary processor the result is being emitted to.
*/
public Side getSide() {
return side;
}
/**
* @return The binding set that is being emitted to the downstream binary processor.
*/
public VisibilityBindingSet getResult() {
return result;
}
@Override
public int hashCode() {
return Objects.hash(side, result);
}
@Override
public boolean equals(final Object o) {
if(o instanceof BinaryResult) {
final BinaryResult other = (BinaryResult) o;
return Objects.equals(side, other.side) &&
Objects.equals(result, other.result);
}
return false;
}
/**
* A label that is used by the downstream binary processor to distinguish which upstream processor
* produced the {@link BinaryResult}.
*/
public static enum Side {
/**
* The result is being emitted from the "left" upstream processor.
*/
LEFT,
/**
* The result is being emitted from the "right" upstream processor.
*/
RIGHT;
}
}
}