| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.trident.windowing; |
| |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.storm.coordination.BatchOutputCollector; |
| import org.apache.storm.trident.operation.Aggregator; |
| import org.apache.storm.trident.spout.IBatchID; |
| import org.apache.storm.trident.tuple.TridentTuple; |
| import org.apache.storm.trident.tuple.TridentTupleView; |
| import org.apache.storm.trident.windowing.config.WindowConfig; |
| import org.apache.storm.tuple.Fields; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This window manager uses {@code WindowsStore} for storing tuples and other trigger related information. It maintains tuples cache of |
| * {@code maxCachedTuplesSize} without accessing store for getting them. |
| */ |
| public class StoreBasedTridentWindowManager extends AbstractTridentWindowManager<TridentBatchTuple> { |
| private static final Logger LOG = LoggerFactory.getLogger(StoreBasedTridentWindowManager.class); |
| |
| private static final String TUPLE_PREFIX = "tu" + WindowsStore.KEY_SEPARATOR; |
| |
| private final String windowTupleTaskId; |
| private final TridentTupleView.FreshOutputFactory freshOutputFactory; |
| private final Fields inputFields; |
| private Long maxCachedTuplesSize; |
| private AtomicLong currentCachedTuplesSize = new AtomicLong(); |
| |
| public StoreBasedTridentWindowManager(WindowConfig windowConfig, String windowTaskId, WindowsStore windowStore, Aggregator aggregator, |
| BatchOutputCollector delegateCollector, Long maxTuplesCacheSize, Fields inputFields) { |
| super(windowConfig, windowTaskId, windowStore, aggregator, delegateCollector); |
| |
| this.maxCachedTuplesSize = maxTuplesCacheSize; |
| this.inputFields = inputFields; |
| freshOutputFactory = new TridentTupleView.FreshOutputFactory(inputFields); |
| windowTupleTaskId = TUPLE_PREFIX + windowTaskId; |
| } |
| |
| @Override |
| protected void initialize() { |
| |
| // get existing tuples and pending/unsuccessful triggers for this operator-component/task and add them to WindowManager |
| String windowTriggerInprocessId = WindowTridentProcessor.getWindowTriggerInprocessIdPrefix(windowTaskId); |
| String windowTriggerTaskId = WindowTridentProcessor.getWindowTriggerTaskPrefix(windowTaskId); |
| |
| List<String> attemptedTriggerKeys = new ArrayList<>(); |
| List<String> triggerKeys = new ArrayList<>(); |
| |
| Iterable<String> allEntriesIterable = windowStore.getAllKeys(); |
| for (String key : allEntriesIterable) { |
| if (key.startsWith(windowTupleTaskId)) { |
| int tupleIndexValue = lastPart(key); |
| String batchId = secondLastPart(key); |
| LOG.debug("Received tuple with batch [{}] and tuple index [{}]", batchId, tupleIndexValue); |
| windowManager.add(new TridentBatchTuple(batchId, System.currentTimeMillis(), tupleIndexValue)); |
| } else if (key.startsWith(windowTriggerTaskId)) { |
| triggerKeys.add(key); |
| LOG.debug("Received trigger with key [{}]", key); |
| } else if (key.startsWith(windowTriggerInprocessId)) { |
| attemptedTriggerKeys.add(key); |
| LOG.debug("Received earlier unsuccessful trigger [{}] from windows store [{}]", key); |
| } |
| } |
| |
| // these triggers will be retried as part of batch retries |
| Set<Integer> triggersToBeIgnored = new HashSet<>(); |
| Iterable<Object> attemptedTriggers = windowStore.get(attemptedTriggerKeys); |
| for (Object attemptedTrigger : attemptedTriggers) { |
| triggersToBeIgnored.addAll((List<Integer>) attemptedTrigger); |
| } |
| |
| // get trigger values only if they have more than zero |
| Iterable<Object> triggerObjects = windowStore.get(triggerKeys); |
| int i = 0; |
| for (Object triggerObject : triggerObjects) { |
| int id = lastPart(triggerKeys.get(i++)); |
| if (!triggersToBeIgnored.contains(id)) { |
| LOG.info("Adding pending trigger value [{}]", triggerObject); |
| pendingTriggers.add(new TriggerResult(id, (List<List<Object>>) triggerObject)); |
| } |
| } |
| |
| } |
| |
| private int lastPart(String key) { |
| int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR); |
| if (lastSepIndex < 0) { |
| throw new IllegalArgumentException("primaryKey does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'"); |
| } |
| return Integer.parseInt(key.substring(lastSepIndex + 1)); |
| } |
| |
| private String secondLastPart(String key) { |
| int lastSepIndex = key.lastIndexOf(WindowsStore.KEY_SEPARATOR); |
| if (lastSepIndex < 0) { |
| throw new IllegalArgumentException("key " + key + " does not have key separator '" + WindowsStore.KEY_SEPARATOR + "'"); |
| } |
| String trimKey = key.substring(0, lastSepIndex); |
| int secondLastSepIndex = trimKey.lastIndexOf(WindowsStore.KEY_SEPARATOR); |
| if (secondLastSepIndex < 0) { |
| throw new IllegalArgumentException("key " + key + " does not have second key separator '" + WindowsStore.KEY_SEPARATOR + "'"); |
| } |
| |
| return key.substring(secondLastSepIndex + 1, lastSepIndex); |
| } |
| |
| @Override |
| public void addTuplesBatch(Object batchId, List<TridentTuple> tuples) { |
| LOG.debug("Adding tuples to window-manager for batch: [{}]", batchId); |
| List<WindowsStore.Entry> entries = new ArrayList<>(); |
| for (int i = 0; i < tuples.size(); i++) { |
| String key = keyOf(batchId); |
| TridentTuple tridentTuple = tuples.get(i); |
| entries.add(new WindowsStore.Entry(key + i, tridentTuple.select(inputFields))); |
| } |
| |
| // tuples should be available in store before they are added to window manager |
| windowStore.putAll(entries); |
| |
| for (int i = 0; i < tuples.size(); i++) { |
| String key = keyOf(batchId); |
| TridentTuple tridentTuple = tuples.get(i); |
| addToWindowManager(i, key, tridentTuple); |
| } |
| |
| } |
| |
| private void addToWindowManager(int tupleIndex, String effectiveBatchId, TridentTuple tridentTuple) { |
| TridentTuple actualTuple = null; |
| if (maxCachedTuplesSize == null || currentCachedTuplesSize.get() < maxCachedTuplesSize) { |
| actualTuple = tridentTuple; |
| } |
| currentCachedTuplesSize.incrementAndGet(); |
| windowManager.add(new TridentBatchTuple(effectiveBatchId, System.currentTimeMillis(), tupleIndex, actualTuple)); |
| } |
| |
| public String getBatchTxnId(Object batchId) { |
| if (!(batchId instanceof IBatchID)) { |
| throw new IllegalArgumentException("argument should be an IBatchId instance"); |
| } |
| return ((IBatchID) batchId).getId().toString(); |
| } |
| |
| public String keyOf(Object batchId) { |
| return windowTupleTaskId + getBatchTxnId(batchId) + WindowsStore.KEY_SEPARATOR; |
| } |
| |
| @Override |
| public List<TridentTuple> getTridentTuples(List<TridentBatchTuple> tridentBatchTuples) { |
| List<TridentTuple> resultTuples = new ArrayList<>(); |
| List<String> keys = new ArrayList<>(); |
| for (TridentBatchTuple tridentBatchTuple : tridentBatchTuples) { |
| TridentTuple tuple = collectTridentTupleOrKey(tridentBatchTuple, keys); |
| if (tuple != null) { |
| resultTuples.add(tuple); |
| } |
| } |
| |
| if (keys.size() > 0) { |
| Iterable<Object> storedTupleValues = windowStore.get(keys); |
| for (Object storedTupleValue : storedTupleValues) { |
| TridentTuple tridentTuple = freshOutputFactory.create((List<Object>) storedTupleValue); |
| resultTuples.add(tridentTuple); |
| } |
| } |
| |
| return resultTuples; |
| } |
| |
| public TridentTuple collectTridentTupleOrKey(TridentBatchTuple tridentBatchTuple, List<String> keys) { |
| if (tridentBatchTuple.tridentTuple != null) { |
| return tridentBatchTuple.tridentTuple; |
| } |
| keys.add(tupleKey(tridentBatchTuple)); |
| return null; |
| } |
| |
| @Override |
| public void onTuplesExpired(List<TridentBatchTuple> expiredTuples) { |
| if (maxCachedTuplesSize != null) { |
| currentCachedTuplesSize.addAndGet(-expiredTuples.size()); |
| } |
| |
| List<String> keys = new ArrayList<>(); |
| for (TridentBatchTuple expiredTuple : expiredTuples) { |
| keys.add(tupleKey(expiredTuple)); |
| } |
| |
| windowStore.removeAll(keys); |
| } |
| |
| private String tupleKey(TridentBatchTuple tridentBatchTuple) { |
| return tridentBatchTuple.effectiveBatchId + tridentBatchTuple.tupleIndex; |
| } |
| |
| } |