blob: 5caa5878d28e6ae5916271b5a05492d98dce1bfc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.trident.fluent;
import backtype.storm.tuple.Fields;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.operation.Aggregator;
import storm.trident.operation.CombinerAggregator;
import storm.trident.operation.Function;
import storm.trident.operation.ReducerAggregator;
import storm.trident.operation.impl.GroupedAggregator;
import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
import storm.trident.state.QueryFunction;
import storm.trident.state.StateFactory;
import storm.trident.state.StateSpec;
import storm.trident.state.map.MapCombinerAggStateUpdater;
import storm.trident.state.map.MapReducerAggStateUpdater;
import storm.trident.util.TridentUtils;
public class GroupedStream implements IAggregatableStream, GlobalAggregationScheme<GroupedStream> {
Fields _groupFields;
Stream _stream;
public GroupedStream(Stream stream, Fields groupFields) {
_groupFields = groupFields;
_stream = stream;
}
public GroupedStream name(String name) {
return new GroupedStream(_stream.name(name), _groupFields);
}
public ChainedAggregatorDeclarer chainedAgg() {
return new ChainedAggregatorDeclarer(this, this);
}
public Stream aggregate(Aggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
return new ChainedAggregatorDeclarer(this, this).aggregate(inputFields, agg, functionFields).chainEnd();
}
public Stream aggregate(CombinerAggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
return new ChainedAggregatorDeclarer(this, this).aggregate(inputFields, agg, functionFields).chainEnd();
}
public Stream aggregate(ReducerAggregator agg, Fields functionFields) {
return aggregate(null, agg, functionFields);
}
public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
return new ChainedAggregatorDeclarer(this, this).aggregate(inputFields, agg, functionFields).chainEnd();
}
public TridentState persistentAggregate(StateFactory stateFactory, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
}
public TridentState persistentAggregate(StateSpec spec, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(spec, null, agg, functionFields);
}
public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
return aggregate(inputFields, agg, functionFields).partitionPersist(spec, TridentUtils.fieldsUnion(_groupFields, functionFields),
new MapCombinerAggStateUpdater(agg, _groupFields, functionFields), TridentUtils.fieldsConcat(_groupFields, functionFields));
}
public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
return _stream.partitionBy(_groupFields).partitionPersist(spec, TridentUtils.fieldsUnion(_groupFields, inputFields),
new MapReducerAggStateUpdater(agg, _groupFields, inputFields), TridentUtils.fieldsConcat(_groupFields, functionFields));
}
public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {
return _stream.partitionBy(_groupFields).stateQuery(state, inputFields, function, functionFields);
}
public TridentState persistentAggregate(StateFactory stateFactory, ReducerAggregator agg, Fields functionFields) {
return persistentAggregate(new StateSpec(stateFactory), agg, functionFields);
}
public TridentState persistentAggregate(StateSpec spec, ReducerAggregator agg, Fields functionFields) {
return persistentAggregate(spec, null, agg, functionFields);
}
public Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields) {
return stateQuery(state, null, function, functionFields);
}
@Override
public IAggregatableStream each(Fields inputFields, Function function, Fields functionFields) {
Stream s = _stream.each(inputFields, function, functionFields);
return new GroupedStream(s, _groupFields);
}
@Override
public IAggregatableStream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
Aggregator groupedAgg = new GroupedAggregator(agg, _groupFields, inputFields, functionFields.size());
Fields allInFields = TridentUtils.fieldsUnion(_groupFields, inputFields);
Fields allOutFields = TridentUtils.fieldsConcat(_groupFields, functionFields);
Stream s = _stream.partitionAggregate(allInFields, groupedAgg, allOutFields);
return new GroupedStream(s, _groupFields);
}
@Override
public IAggregatableStream aggPartition(GroupedStream s) {
return new GroupedStream(s._stream.partitionBy(_groupFields), _groupFields);
}
@Override
public Stream toStream() {
return _stream;
}
@Override
public Fields getOutputFields() {
return _stream.getOutputFields();
}
public Fields getGroupFields() {
return _groupFields;
}
@Override
public BatchToPartition singleEmitPartitioner() {
return null;
}
}