blob: c5e7f938897610330225362b73fef14c505e6f7a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.trident.windowing;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.Aggregator;
import org.apache.storm.trident.planner.ProcessorContext;
import org.apache.storm.trident.planner.TridentProcessor;
import org.apache.storm.trident.planner.processor.FreshCollector;
import org.apache.storm.trident.planner.processor.TridentContext;
import org.apache.storm.trident.spout.IBatchID;
import org.apache.storm.trident.tuple.ConsList;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.tuple.TridentTupleView;
import org.apache.storm.trident.windowing.config.WindowConfig;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@code TridentProcessor} implementation for windowing operations on trident stream.
*/
public class WindowTridentProcessor implements TridentProcessor {
public static final String TRIGGER_INPROCESS_PREFIX = "tip" + WindowsStore.KEY_SEPARATOR;
public static final String TRIGGER_PREFIX = "tr" + WindowsStore.KEY_SEPARATOR;
public static final String TRIGGER_COUNT_PREFIX = "tc" + WindowsStore.KEY_SEPARATOR;
public static final String TRIGGER_FIELD_NAME = "_task_info";
public static final long DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT = 100L;
private static final Logger LOG = LoggerFactory.getLogger(WindowTridentProcessor.class);
private final String windowId;
private final Fields inputFields;
private final Aggregator aggregator;
private final boolean storeTuplesInStore;
private String windowTriggerInprocessId;
private WindowConfig windowConfig;
private WindowsStoreFactory windowStoreFactory;
private WindowsStore windowStore;
private TopologyContext topologyContext;
private FreshCollector collector;
private TridentTupleView.ProjectionFactory projection;
private TridentContext tridentContext;
private ITridentWindowManager tridentWindowManager;
private String windowTaskId;
public WindowTridentProcessor(WindowConfig windowConfig, String uniqueWindowId, WindowsStoreFactory windowStoreFactory,
Fields inputFields, Aggregator aggregator, boolean storeTuplesInStore) {
this.windowConfig = windowConfig;
this.windowId = uniqueWindowId;
this.windowStoreFactory = windowStoreFactory;
this.inputFields = inputFields;
this.aggregator = aggregator;
this.storeTuplesInStore = storeTuplesInStore;
}
public static String getWindowTriggerInprocessIdPrefix(String windowTaskId) {
return TRIGGER_INPROCESS_PREFIX + windowTaskId;
}
public static String getWindowTriggerTaskPrefix(String windowTaskId) {
return TRIGGER_PREFIX + windowTaskId;
}
public static Object getBatchTxnId(Object batchId) {
if (batchId instanceof IBatchID) {
return ((IBatchID) batchId).getId();
}
return null;
}
static boolean retriedAttempt(Object batchId) {
if (batchId instanceof IBatchID) {
return ((IBatchID) batchId).getAttemptId() > 0;
}
return false;
}
public static String generateWindowTriggerKey(String windowTaskId, int triggerId) {
return TRIGGER_PREFIX + windowTaskId + triggerId;
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, TridentContext tridentContext) {
this.topologyContext = context;
List<TridentTuple.Factory> parents = tridentContext.getParentTupleFactories();
if (parents.size() != 1) {
throw new RuntimeException("Aggregation related operation can only have one parent");
}
this.tridentContext = tridentContext;
collector = new FreshCollector(tridentContext);
projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields);
windowStore = windowStoreFactory.create(topoConf);
windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR;
windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId);
Long maxTuplesCacheSize = getWindowTuplesCacheSize(topoConf);
tridentWindowManager = storeTuplesInStore
? new StoreBasedTridentWindowManager(windowConfig,
windowTaskId,
windowStore,
aggregator,
tridentContext.getDelegateCollector(),
maxTuplesCacheSize,
inputFields)
: new InMemoryTridentWindowManager(windowConfig,
windowTaskId,
windowStore,
aggregator,
tridentContext.getDelegateCollector());
tridentWindowManager.prepare();
}
private Long getWindowTuplesCacheSize(Map<String, Object> conf) {
if (conf.containsKey(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)) {
return ((Number) conf.get(Config.TOPOLOGY_TRIDENT_WINDOWING_INMEMORY_CACHE_LIMIT)).longValue();
}
return DEFAULT_INMEMORY_TUPLE_CACHE_LIMIT;
}
@Override
public void cleanup() {
LOG.info("shutting down window manager");
try {
tridentWindowManager.shutdown();
} catch (Exception ex) {
LOG.error("Error occurred while cleaning up window processor", ex);
throw ex;
}
}
@Override
public void startBatch(ProcessorContext processorContext) {
// initialize state for batch
processorContext.state[tridentContext.getStateIndex()] = new ArrayList<TridentTuple>();
}
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
// add tuple to the batch state
Object state = processorContext.state[tridentContext.getStateIndex()];
((List<TridentTuple>) state).add(projection.create(tuple));
}
@Override
public void flush() {
// NO-OP
}
@Override
public void finishBatch(ProcessorContext processorContext) {
Object batchId = processorContext.batchId;
Object batchTxnId = getBatchTxnId(batchId);
LOG.debug("Received finishBatch of : [{}] ", batchId);
// get all the tuples in a batch and add it to trident-window-manager
List<TridentTuple> tuples = (List<TridentTuple>) processorContext.state[tridentContext.getStateIndex()];
tridentWindowManager.addTuplesBatch(batchId, tuples);
List<Integer> pendingTriggerIds = null;
List<String> triggerKeys = new ArrayList<>();
Iterable<Object> triggerValues = null;
if (retriedAttempt(batchId)) {
pendingTriggerIds = (List<Integer>) windowStore.get(inprocessTriggerKey(batchTxnId));
if (pendingTriggerIds != null) {
for (Integer pendingTriggerId : pendingTriggerIds) {
triggerKeys.add(triggerKey(pendingTriggerId));
}
triggerValues = windowStore.get(triggerKeys);
}
}
// if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers.
if (triggerValues == null) {
pendingTriggerIds = new ArrayList<>();
Queue<StoreBasedTridentWindowManager.TriggerResult> pendingTriggers = tridentWindowManager.getPendingTriggers();
LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size());
try {
Iterator<StoreBasedTridentWindowManager.TriggerResult> pendingTriggersIter = pendingTriggers.iterator();
List<Object> values = new ArrayList<>();
StoreBasedTridentWindowManager.TriggerResult triggerResult = null;
while (pendingTriggersIter.hasNext()) {
triggerResult = pendingTriggersIter.next();
for (List<Object> aggregatedResult : triggerResult.result) {
String triggerKey = triggerKey(triggerResult.id);
triggerKeys.add(triggerKey);
values.add(aggregatedResult);
pendingTriggerIds.add(triggerResult.id);
}
pendingTriggersIter.remove();
}
triggerValues = values;
} finally {
// store inprocess triggers of a batch in store for batch retries for any failures
if (!pendingTriggerIds.isEmpty()) {
windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds);
}
}
}
collector.setContext(processorContext);
int i = 0;
for (Object resultValue : triggerValues) {
collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List<Object>) resultValue));
}
collector.setContext(null);
}
private String inprocessTriggerKey(Object batchTxnId) {
return windowTriggerInprocessId + batchTxnId;
}
@Override
public TridentTuple.Factory getOutputFactory() {
return collector.getOutputFactory();
}
public String triggerKey(int triggerId) {
return generateWindowTriggerKey(windowTaskId, triggerId);
}
public static class TriggerInfo implements Serializable {
public final String windowTaskId;
public final int triggerId;
public TriggerInfo(String windowTaskId, int triggerId) {
this.windowTaskId = windowTaskId;
this.triggerId = triggerId;
}
public String generateTriggerKey() {
return generateWindowTriggerKey(windowTaskId, triggerId);
}
@Override
public String toString() {
return "TriggerInfo{"
+ "windowTaskId='" + windowTaskId + '\''
+ ", triggerId=" + triggerId
+ '}';
}
}
}