blob: f07d3f171d8d0b5c4c7816039f48f0a25f0f2199 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.trident.windowing;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.trident.operation.Aggregator;
import org.apache.storm.trident.spout.IBatchID;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.trident.tuple.TridentTupleView;
import org.apache.storm.trident.windowing.config.WindowConfig;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This window manager uses {@code WindowsStore} for storing tuples and other trigger related information. It maintains tuples cache of
* {@code maxCachedTuplesSize} without accessing store for getting them.
*/
public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> {
private static final Logger LOG = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class);
private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR;
private final String windowTupleTaskId;
private final TridentTupleView.FreshOutputFactory freshOutputFactory;
private final Fields inputFields;
private Long maxCachedTuplesSize;
private AtomicLong currentCachedTuplesSize = new AtomicLong();
public StoreBasedTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator,
BatchOutputCollector delegateCollector, Long maxTuplesCacheSize, Fields inputFields) {
super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector);
this.maxCachedTuplesSize = maxTuplesCacheSize;
this.inputFields = inputFields;
freshOutputFactory = new TridentTupleView.FreshOutputFactory(inputFields);
windowTupleTaskId = TUPLE_PREFIX + windowTaskId;
}
@Override
protected void initialize() {
// get existing tuples and pending/unsuccessful triggers for this operator-component/task and add them to WindowManager
String windowTriggerInprocessId = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(windowTaskId);
String windowTriggerTaskId = WindowTridentProcessor.getWindowTriggerTaskPrefix(windowTaskId);
List<String> attemptedTriggerKeys = new ArrayList<>();
List<String> triggerKeys = new ArrayList<>();
Iterable<String> allEntriesIterable = windowStore.getAllKeys();
for (String key : allEntriesIterable) {
if (key.startsWith(windowTupleTaskId)) {
int tupleIndexValue = lastPart(key);
String batchId = secondLastPart(key);
LOG.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue);
windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue));
} else if (key.startsWith(windowTriggerTaskId)) {
triggerKeys.add(key);
LOG.debug("Received trigger with key [{}]", key);
} else if (key.startsWith(windowTriggerInprocessId)) {
attemptedTriggerKeys.add(key);
LOG.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key);
}
}
// these triggers will be retried as part of batch retries
Set<Integer> triggersToBeIgnored = new HashSet<>();
Iterable<Object> attemptedTriggers = windowStore.get(attemptedTriggerKeys);
for (Object attemptedTrigger : attemptedTriggers) {
triggersToBeIgnored.addAll((List<Integer>) attemptedTrigger);
}
// get trigger values only if they have more than zero
Iterable<Object> triggerObjects = windowStore.get(triggerKeys);
int i = 0;
for (Object triggerObject : triggerObjects) {
int id = lastPart(triggerKeys.get(i++));
if (!triggersToBeIgnored.contains(id)) {
LOG.info("Adding pending trigger value [{}]", triggerObject);
pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject));
}
}
}
private int lastPart(String key) {
int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
if (lastSepIndex < 0) {
throw new IllegalArgumentException("primaryKey does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
}
return Integer.parseInt(key.substring(lastSepIndex + 1));
}
private String secondLastPart(String key) {
int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR);
if (lastSepIndex < 0) {
throw new IllegalArgumentException("key " + key + " does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'");
}
String trimKey = key.substring(0, lastSepIndex);
int secondLastSepIndex = trimKey.lastIndexOf(WindowsStore.KEY_SEPARATOR);
if (secondLastSepIndex < 0) {
throw new IllegalArgumentException("key " + key + " does not have second key separator '" + WindowsStore.KEY_SEPARATOR + "'");
}
return key.substring(secondLastSepIndex + 1, lastSepIndex);
}
@Override
public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) {
LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId);
List<WindowsStore.Entry> entries = new ArrayList<>();
for (int i = 0; i < tuples.size(); i++) {
String key = keyOf(batchId);
TridentTuple tridentTuple = tuples.get(i);
entries.add(new WindowsStore.Entry(key + i, tridentTuple.select(inputFields)));
}
// tuples should be available in store before they are added to window manager
windowStore.putAll(entries);
for (int i = 0; i < tuples.size(); i++) {
String key = keyOf(batchId);
TridentTuple tridentTuple = tuples.get(i);
addToWindowManager(i, key, tridentTuple);
}
}
private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) {
TridentTuple actualTuple = null;
if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) {
actualTuple = tridentTuple;
}
currentCachedTuplesSize.incrementAndGet();
windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple));
}
public String getBatchTxnId(Object batchId) {
if (!(batchId instanceof IBatchID)) {
throw new IllegalArgumentException("argument should be an IBatchId instance");
}
return ((IBatchID) batchId).getId().toString();
}
public String keyOf(Object batchId) {
return windowTupleTaskId + getBatchTxnId(batchId) + WindowsStore.KEY_SEPARATOR;
}
@Override
public List<TridentTuple> getTridentTuples(List<TridentBatchTuple> tridentBatchTuples) {
List<TridentTuple> resultTuples = new ArrayList<>();
List<String> keys = new ArrayList<>();
for (TridentBatchTuple tridentBatchTuple : tridentBatchTuples) {
TridentTuple tuple = collectTridentTupleOrKey(tridentBatchTuple, keys);
if (tuple != null) {
resultTuples.add(tuple);
}
}
if (keys.size() > 0) {
Iterable<Object> storedTupleValues = windowStore.get(keys);
for (Object storedTupleValue : storedTupleValues) {
TridentTuple tridentTuple = freshOutputFactory.create((List<Object>) storedTupleValue);
resultTuples.add(tridentTuple);
}
}
return resultTuples;
}
public TridentTuple collectTridentTupleOrKey(TridentBatchTuple tridentBatchTuple, List<String> keys) {
if (tridentBatchTuple.tridentTuple != null) {
return tridentBatchTuple.tridentTuple;
}
keys.add(tupleKey(tridentBatchTuple));
return null;
}
@Override
public void onTuplesExpired(List<TridentBatchTuple> expiredTuples) {
if (maxCachedTuplesSize != null) {
currentCachedTuplesSize.addAndGet(-expiredTuples.size());
}
List<String> keys = new ArrayList<>();
for (TridentBatchTuple expiredTuple : expiredTuples) {
keys.add(tupleKey(expiredTuple));
}
windowStore.removeAll(keys);
}
private String tupleKey(TridentBatchTuple tridentBatchTuple) {
return tridentBatchTuple.effectiveBatchId + tridentBatchTuple.tupleIndex;
}
}