blob: dbdacf1ffd98274ac95483f999ddca30f94adc48 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.test.operator;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.config.Config;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.test.operator.data.AdClick;
import org.apache.samza.test.operator.data.PageView;
import org.apache.samza.test.operator.data.UserPageAdClick;
/**
* A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count.
*/
public class RepartitionJoinWindowApp implements StreamApplication {
public static final String SYSTEM = "kafka";
public static final String INPUT_TOPIC_1_CONFIG_KEY = "inputTopic1";
public static final String INPUT_TOPIC_2_CONFIG_KEY = "inputTopic2";
public static final String OUTPUT_TOPIC_CONFIG_KEY = "outputTopic";
private final List<String> intermediateStreamIds = new ArrayList<>();
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
// offset.default = oldest required for tests since checkpoint topic is empty on start and messages are published
// before the application is run
Config config = appDescriptor.getConfig();
String inputTopic1 = config.get(INPUT_TOPIC_1_CONFIG_KEY);
String inputTopic2 = config.get(INPUT_TOPIC_2_CONFIG_KEY);
String outputTopic = config.get(OUTPUT_TOPIC_CONFIG_KEY);
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM);
KafkaInputDescriptor<PageView> id1 = ksd.getInputDescriptor(inputTopic1, new JsonSerdeV2<>(PageView.class));
KafkaInputDescriptor<AdClick> id2 = ksd.getInputDescriptor(inputTopic2, new JsonSerdeV2<>(AdClick.class));
MessageStream<PageView> pageViews = appDescriptor.getInputStream(id1);
MessageStream<AdClick> adClicks = appDescriptor.getInputStream(id2);
MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = pageViews
.partitionBy(PageView::getViewId, pv -> pv,
new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)), "pageViewsByViewId");
MessageStream<PageView> pageViewsRepartitionedByViewIdValueONly = pageViewsRepartitionedByViewId.map(KV::getValue);
MessageStream<KV<String, AdClick>> adClicksRepartitionedByViewId = adClicks
.partitionBy(AdClick::getViewId, ac -> ac,
new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)), "adClicksByViewId");
MessageStream<AdClick> adClicksRepartitionedByViewIdValueOnly = adClicksRepartitionedByViewId.map(KV::getValue);
MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewIdValueONly
.join(adClicksRepartitionedByViewIdValueOnly, new UserPageViewAdClicksJoiner(),
new StringSerde(), new JsonSerdeV2<>(PageView.class), new JsonSerdeV2<>(AdClick.class),
Duration.ofMinutes(1), "pageViewAdClickJoin");
MessageStream<KV<String, UserPageAdClick>> userPageAdClicksByUserId = userPageAdClicks
.partitionBy(UserPageAdClick::getUserId, upac -> upac,
KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userPageAdClicksByUserId");
userPageAdClicksByUserId.map(KV::getValue)
.window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3),
new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userAdClickWindow")
.map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
.sink((message, messageCollector, taskCoordinator) -> {
taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
messageCollector.send(
new OutgoingMessageEnvelope(
new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue()));
});
intermediateStreamIds.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamId());
intermediateStreamIds.add(((IntermediateMessageStreamImpl) adClicksRepartitionedByViewId).getStreamId());
intermediateStreamIds.add(((IntermediateMessageStreamImpl) userPageAdClicksByUserId).getStreamId());
}
List<String> getIntermediateStreamIds() {
return intermediateStreamIds;
}
private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> {
@Override
public UserPageAdClick apply(PageView pv, AdClick ac) {
return new UserPageAdClick(pv.getUserId(), pv.getPageId(), ac.getAdId());
}
@Override
public String getFirstKey(PageView pv) {
return pv.getViewId();
}
@Override
public String getSecondKey(AdClick ac) {
return ac.getViewId();
}
}
}