blob: 3667052cca470e1084b2d6c50637110c5962430a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jwplayer.sqe.trident.spout.kafka;
import com.jwplayer.sqe.trident.StreamMetadata;
import org.apache.storm.trident.operation.TridentCollector;
import java.util.*;
public class FilteredTridentCollector implements TridentCollector {
public static final String HIGH_WATERMARKS = "highWatermarks";
public static final String TIMESTAMP = "timestamp";
public static final String WATERMARK = "watermark";
private TridentCollector collector;
private Map<String, Map<String, Long>> hwmMetadata;
private int keyIndex;
private long hwmTtl;
@SuppressWarnings("unchecked")
public FilteredTridentCollector(TridentCollector collector, int keyIndex, long hwmTtl, Map lastMetadata) {
this.collector = collector;
this.keyIndex = keyIndex;
this.hwmTtl = hwmTtl;
if (lastMetadata != null && lastMetadata.containsKey(HIGH_WATERMARKS)) {
this.hwmMetadata = (Map) lastMetadata.get(HIGH_WATERMARKS);
} else {
this.hwmMetadata = new HashMap<>();
}
}
@Override
public void emit(List<Object> values) {
// This is where the magic happens
// 1) Pull the key (if it exists, do we have access to it?)
// 2) Determine HWM metadata for this set of values
// 3) Emit values if this HWM > the previous HWM for the pid + partition
// 4) Update HWM metadata if values were emitted
byte[] key = (byte[]) values.get(keyIndex);
if (key != null && key.length == 20) {
StreamMetadata streamMetadata = StreamMetadata.parseBytes(key);
String pidAndPartition = streamMetadata.getPidAndPartitionAsHex();
Long watermark = streamMetadata.offset;
if (hwmMetadata.containsKey(pidAndPartition)) {
Map<String, Long> highWatermark = hwmMetadata.get(pidAndPartition);
if (watermark > highWatermark.get(WATERMARK)) {
collector.emit(values);
highWatermark.put(WATERMARK, watermark);
highWatermark.put(TIMESTAMP, System.currentTimeMillis() / 1000L);
hwmMetadata.put(pidAndPartition, highWatermark);
}
} else {
collector.emit(values);
Map<String, Long> highWatermark = new HashMap<>(2);
highWatermark.put(WATERMARK, watermark);
highWatermark.put(TIMESTAMP, System.currentTimeMillis() / 1000L);
hwmMetadata.put(pidAndPartition, highWatermark);
}
} else {
// Emit messages without parseable keys. These can be replays.
collector.emit(values);
}
}
@Override
public void reportError(Throwable throwable) {
collector.reportError(throwable);
}
@SuppressWarnings("unchecked")
public Map resolveMetadata(Map newMetadata) {
// newMetadata is the metadata generated by the emitter. We need to
// resolve the HWM metadata generated in this class with that metadata.
// Also remove any high watermarks that have a timestamp old than the TTL.
long now = System.currentTimeMillis() / 1000L;
Iterator<Map.Entry<String, Map<String, Long>>> iter = hwmMetadata.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, Map<String, Long>> entry = iter.next();
if (now - entry.getValue().get(TIMESTAMP) > hwmTtl) {
iter.remove();
}
}
newMetadata.put(HIGH_WATERMARKS, hwmMetadata);
return newMetadata;
}
}