blob: c8e10492b8ef7550ec27465944b9fc27d4536578 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.streams.kafka.processors.aggregation;
import static java.util.Objects.requireNonNull;
import java.util.Comparator;
import java.util.List;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.rya.api.function.aggregation.AggregationState;
import org.apache.rya.api.function.aggregation.AggregationStateStore;
import org.apache.rya.api.function.aggregation.AggregationsEvaluator;
import org.apache.rya.api.model.VisibilityBindingSet;
import org.apache.rya.streams.kafka.processors.ProcessorResult;
import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType;
import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor;
import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
import org.openrdf.query.algebra.Group;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.internal.Lists;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
/**
* Supplies {@link AggregationProcessor} instances.
*/
@DefaultAnnotation(NonNull.class)
public class AggregationProcessorSupplier extends RyaStreamsProcessorSupplier {
private final String stateStoreName;
private final Group aggNode;
/**
* Constructs an instance of {@link AggregationProcessorSupplier}.
*
* @param stateStoreName - The name of the state store the processor will use. (not null)
* @param aggNode - Defines which aggregations will be performed by the processor. (not null)
* @param resultFactory - The factory that the supplied processors will use to create results. (not null)
*/
public AggregationProcessorSupplier(
final String stateStoreName,
final Group aggNode,
final ProcessorResultFactory resultFactory) {
super(resultFactory);
this.stateStoreName = requireNonNull(stateStoreName);
this.aggNode = requireNonNull(aggNode);
}
@Override
public Processor<Object, ProcessorResult> get() {
return new AggregationProcessor(stateStoreName, aggNode, super.getResultFactory());
}
/**
* Evaluates a {@link Group} node that contains a bunch of aggregations. Each aggregation will have a binding
* within the resulting binding sets that contains the aggregation value.
*
* @see AggregationsEvaluator
*/
@DefaultAnnotation(NonNull.class)
public static class AggregationProcessor extends RyaStreamsProcessor {
private static final Logger log = LoggerFactory.getLogger(AggregationProcessor.class);
private final String stateStoreName;
private final Group aggNode;
private ProcessorContext context;
private AggregationStateStore aggStateStore;
private AggregationsEvaluator evaluator;
/**
* Constructs an instance of {@link AggregationProcessor}.
*
* @param stateStoreName - The name of the Kafka Streams state store that this processor will use. (not null)
* @param aggNode - The group by node that configures how the aggregations will be performed. (not null)
* @param resultFactory - The factory that will format this processor's final results for the downstream
* processor. (not null)
*/
public AggregationProcessor(
final String stateStoreName,
final Group aggNode,
final ProcessorResultFactory resultFactory) {
super(resultFactory);
this.stateStoreName = requireNonNull(stateStoreName);
this.aggNode = requireNonNull(aggNode);
}
@Override
public void init(final ProcessorContext context) {
this.context = context;
// Sort the group by vars so that they will be written to the state store in the same order every time.
final List<String> groupByVars = Lists.newArrayList(aggNode.getGroupBindingNames());
groupByVars.sort(Comparator.naturalOrder());
// Get a reference to the state store that keeps track of aggregation state.
final KeyValueStore<String, AggregationState> stateStore =
(KeyValueStore<String, AggregationState>) context.getStateStore( stateStoreName );
aggStateStore = new KeyValueAggregationStateStore(stateStore, groupByVars);
// Create the aggregation evaluator.
evaluator = AggregationsEvaluator.make(aggStateStore, aggNode, groupByVars);
}
@Override
public void process(final Object key, final ProcessorResult value) {
// Aggregations can only be unary.
if (value.getType() != ResultType.UNARY) {
throw new RuntimeException("The ProcessorResult to be processed must be Unary.");
}
// Log the binding set that has been input.
log.debug("\nINPUT:\nBinding Set: {}", value.getUnary().getResult());
// Update the aggregations values.
final VisibilityBindingSet resultBs = evaluator.update(value.getUnary().getResult());
// Log the binding set that will be output.
log.debug("\nOUTPUT:\nBinding Set: {}", resultBs);
// Forward to the updated aggregation binding set to the downstream processors.
context.forward(key, super.getResultFactory().make(resultBs));
}
@Override
public void punctuate(final long timestamp) {
// Do nothing.
}
@Override
public void close() {
// Do nothing.
}
}
}