blob: 5e0c2b997e1c4269acfe2de7462c876cb62b5aae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.integration_test.topology.streamlet_with_keyby_count_and_reduce;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.heron.api.Config;
import org.apache.heron.integration_test.common.AbstractTestTopology;
import org.apache.heron.integration_test.core.TestTopologyBuilder;
import org.apache.heron.streamlet.Builder;
import org.apache.heron.streamlet.SerializableFunction;
import org.apache.heron.streamlet.Streamlet;
import org.apache.heron.streamlet.StreamletReducers;
import org.apache.heron.streamlet.impl.BuilderImpl;
class StreamletWithKeybyCountAndReduce extends AbstractTestTopology {
private static final String MONTHS = "january - february - march - april - may - june"
+ " - july - august - september - october - november - december";
private static final Set<String> SPRING_MONTHS =
new HashSet<>(Arrays.asList("march", "april", "may"));
private static final Set<String> SUMMER_MONTHS =
new HashSet<>(Arrays.asList("june", "july", "august"));
private static final Set<String> FALL_MONTHS =
new HashSet<>(Arrays.asList("september", "october", "november"));
private static final Set<String> WINTER_MONTHS =
new HashSet<>(Arrays.asList("december", "january", "february"));
private static Set<String> incomingMonths = new HashSet<>();
StreamletWithKeybyCountAndReduce(String[] args) throws MalformedURLException {
super(args);
}
@Override
protected TestTopologyBuilder buildTopology(TestTopologyBuilder testTopologyBuilder) {
Builder streamletBuilder = Builder.newBuilder();
Streamlet<String> monthStreamlet = streamletBuilder
.newSource(() -> MONTHS)
.setName("months-text")
.flatMap((String m) -> Arrays.asList(m.split(" - ")))
.setName("months")
// Make sure each month is emitted only once
.filter((month) -> incomingMonths.add(month.toLowerCase()))
.setName("unique-months");
SerializableFunction<String, String> getSeason = month -> {
if (SPRING_MONTHS.contains(month)) {
return "spring";
} else if (SUMMER_MONTHS.contains(month)) {
return "summer";
} else if (FALL_MONTHS.contains(month)) {
return "fall";
} else if (WINTER_MONTHS.contains(month)) {
return "winter";
} else {
return "really?";
}
};
SerializableFunction<String, Integer> getNumberOfDays = month -> {
switch (month) {
case "january":
return 31;
case "february":
return 28; // Dont use this code in real projects
case "march":
return 31;
case "april":
return 30;
case "may":
return 31;
case "june":
return 30;
case "july":
return 31;
case "august":
return 31;
case "september":
return 30;
case "october":
return 31;
case "november":
return 30;
case "december":
return 31;
default:
return -1; // Shouldn't be here
}
};
// Count months per season
monthStreamlet
.keyBy(getSeason, getNumberOfDays)
.setName("key-by-season")
.countByKey(x -> x.getKey())
.setName("key-by-and-count")
.map(x -> String.format("%s: %d months", x.getKey(), x.getValue()))
.setName("to-string");
// Sum days per season
monthStreamlet
.<String, Integer>reduceByKey(getSeason, getNumberOfDays, StreamletReducers::sum)
.setName("sum-by-season")
.map(x -> String.format("%s: %d days", x.getKey(), x.getValue()))
.setName("to-string-2");
BuilderImpl streamletBuilderImpl = (BuilderImpl) streamletBuilder;
TestTopologyBuilder topology =
(TestTopologyBuilder) streamletBuilderImpl.build(testTopologyBuilder);
return topology;
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
StreamletWithKeybyCountAndReduce topology =
new StreamletWithKeybyCountAndReduce(args);
topology.submit(conf);
}
}