| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.example; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.samza.application.StreamApplication; |
| import org.apache.samza.application.descriptors.StreamApplicationDescriptor; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.context.Context; |
| import org.apache.samza.example.models.PageViewEvent; |
| import org.apache.samza.operators.KV; |
| import org.apache.samza.operators.MessageStream; |
| import org.apache.samza.operators.OutputStream; |
| import org.apache.samza.operators.functions.FlatMapFunction; |
| import org.apache.samza.runtime.ApplicationRunner; |
| import org.apache.samza.runtime.ApplicationRunners; |
| import org.apache.samza.serializers.JsonSerdeV2; |
| import org.apache.samza.serializers.KVSerde; |
| import org.apache.samza.serializers.StringSerde; |
| import org.apache.samza.storage.kv.KeyValueStore; |
| import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor; |
| import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor; |
| import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor; |
| import org.apache.samza.util.CommandLine; |
| |
| |
| /** |
| * Example code using {@link KeyValueStore} to implement event-time window |
| */ |
| public class KeyValueStoreExample implements StreamApplication { |
| |
| // local execution mode |
| public static void main(String[] args) { |
| CommandLine cmdLine = new CommandLine(); |
| Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); |
| ApplicationRunner runner = ApplicationRunners.getApplicationRunner(new KeyValueStoreExample(), config); |
| |
| runner.run(); |
| runner.waitForFinish(); |
| } |
| |
| @Override |
| public void describe(StreamApplicationDescriptor appDescriptor) { |
| KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); |
| |
| KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = |
| trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); |
| |
| KafkaOutputDescriptor<KV<String, StatsOutput>> outputStreamDescriptor = |
| trackingSystem.getOutputDescriptor("pageViewEventPerMember", |
| KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class))); |
| |
| appDescriptor.withDefaultSystem(trackingSystem); |
| MessageStream<PageViewEvent> pageViewEvents = appDescriptor.getInputStream(inputStreamDescriptor); |
| OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = appDescriptor.getOutputStream(outputStreamDescriptor); |
| |
| pageViewEvents |
| .partitionBy(pve -> pve.getMemberId(), pve -> pve, |
| KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy") |
| .map(KV::getValue) |
| .flatMap(new MyStatsCounter()) |
| .map(stats -> KV.of(stats.memberId, stats)) |
| .sendTo(pageViewEventPerMember); |
| } |
| |
| static class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> { |
| private final int timeoutMs = 10 * 60 * 1000; |
| |
| KeyValueStore<String, StatsWindowState> statsStore; |
| |
| class StatsWindowState { |
| int lastCount = 0; |
| long timeAtLastOutput = 0; |
| int newCount = 0; |
| } |
| |
| @Override |
| public Collection<StatsOutput> apply(PageViewEvent message) { |
| List<StatsOutput> outputStats = new ArrayList<>(); |
| long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getTimestamp()) / 5) * 5; |
| String wndKey = String.format("%s-%d", message.getMemberId(), wndTimestamp); |
| StatsWindowState curState = this.statsStore.get(wndKey); |
| if (curState == null) { |
| curState = new StatsWindowState(); |
| } |
| curState.newCount++; |
| long curTimeMs = System.currentTimeMillis(); |
| if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) { |
| curState.timeAtLastOutput = curTimeMs; |
| curState.lastCount += curState.newCount; |
| curState.newCount = 0; |
| outputStats.add(new StatsOutput(message.getMemberId(), wndTimestamp, curState.lastCount)); |
| } |
| // update counter w/o generating output |
| this.statsStore.put(wndKey, curState); |
| return outputStats; |
| } |
| |
| @Override |
| public void init(Context context) { |
| this.statsStore = |
| (KeyValueStore<String, StatsWindowState>) context.getTaskContext().getStore("my-stats-wnd-store"); |
| } |
| } |
| |
| static class StatsOutput { |
| private String memberId; |
| private long timestamp; |
| private Integer count; |
| |
| StatsOutput(String key, long timestamp, Integer count) { |
| this.memberId = key; |
| this.timestamp = timestamp; |
| this.count = count; |
| } |
| } |
| } |