| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.apex.malhar.stream.sample.cookbook; |
| |
| import java.util.List; |
| |
| import org.apache.apex.malhar.lib.window.TriggerOption; |
| import org.apache.apex.malhar.lib.window.Tuple; |
| import org.apache.apex.malhar.lib.window.Window; |
| import org.apache.apex.malhar.lib.window.WindowOption; |
| import org.apache.apex.malhar.lib.window.accumulation.Max; |
| import org.apache.apex.malhar.stream.api.ApexStream; |
| import org.apache.apex.malhar.stream.api.CompositeStreamTransform; |
| import org.apache.apex.malhar.stream.api.WindowedStream; |
| import org.apache.apex.malhar.stream.api.function.Function; |
| import org.apache.apex.malhar.stream.api.impl.StreamFactory; |
| import org.apache.hadoop.conf.Configuration; |
| |
| import static java.sql.Types.DOUBLE; |
| import static java.sql.Types.INTEGER; |
| |
| import com.google.common.collect.Lists; |
| import com.datatorrent.api.DAG; |
| import com.datatorrent.api.StreamingApplication; |
| import com.datatorrent.api.annotation.ApplicationAnnotation; |
| import com.datatorrent.lib.db.jdbc.JdbcFieldInfo; |
| import com.datatorrent.lib.db.jdbc.JdbcPOJOInputOperator; |
| import com.datatorrent.lib.db.jdbc.JdbcPOJOInsertOutputOperator; |
| import com.datatorrent.lib.db.jdbc.JdbcStore; |
| import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; |
| import com.datatorrent.lib.util.FieldInfo; |
| import com.datatorrent.lib.util.KeyValPair; |
| |
| import static org.apache.apex.malhar.stream.api.Option.Options.name; |
| |
| /** |
| * MaxPerKeyExamples Application from Beam |
| * |
| * @since 3.5.0 |
| */ |
| @ApplicationAnnotation(name = "MaxPerKeyExamples") |
| public class MaxPerKeyExamples implements StreamingApplication |
| { |
| |
| /** |
| * A map function to extract the mean temperature from {@link InputPojo}. |
| */ |
| public static class ExtractTempFn implements Function.MapFunction<InputPojo, KeyValPair<Integer, Double>> |
| { |
| @Override |
| public KeyValPair<Integer, Double> f(InputPojo row) |
| { |
| Integer month = row.getMonth(); |
| Double meanTemp = row.getMeanTemp(); |
| return new KeyValPair<Integer, Double>(month, meanTemp); |
| } |
| } |
| |
| |
| /** |
| * A map function to format output to {@link OutputPojo}. |
| */ |
| public static class FormatMaxesFn implements Function.MapFunction<Tuple.WindowedTuple<KeyValPair<Integer, Double>>, OutputPojo> |
| { |
| @Override |
| public OutputPojo f(Tuple.WindowedTuple<KeyValPair<Integer, Double>> input) |
| { |
| OutputPojo row = new OutputPojo(); |
| row.setMonth(input.getValue().getKey()); |
| row.setMeanTemp(input.getValue().getValue()); |
| return row; |
| } |
| } |
| |
| /** |
| * A composite transformation to perform three tasks: |
| * 1. extract the month and its mean temperature from input pojo. |
| * 2. find the maximum mean temperature for every month. |
| * 3. format the result to a output pojo object. |
| */ |
| public static class MaxMeanTemp extends CompositeStreamTransform<WindowedStream<InputPojo>, WindowedStream<OutputPojo>> |
| { |
| @Override |
| public WindowedStream<OutputPojo> compose(WindowedStream<InputPojo> rows) |
| { |
| // InputPojo... => <month, meanTemp> ... |
| WindowedStream<KeyValPair<Integer, Double>> temps = rows.map(new ExtractTempFn(), name("ExtractTempFn")); |
| |
| // month, meanTemp... => <month, max mean temp>... |
| WindowedStream<Tuple.WindowedTuple<KeyValPair<Integer, Double>>> tempMaxes = |
| temps.accumulateByKey(new Max<Double>(), |
| new Function.ToKeyValue<KeyValPair<Integer, Double>, Integer, Double>() |
| { |
| @Override |
| public Tuple<KeyValPair<Integer, Double>> f(KeyValPair<Integer, Double> input) |
| { |
| return new Tuple.WindowedTuple<KeyValPair<Integer, Double>>(Window.GLOBAL_WINDOW, input); |
| } |
| }, name("MaxPerMonth")); |
| |
| // <month, max>... => OutputPojo... |
| WindowedStream<OutputPojo> results = tempMaxes.map(new FormatMaxesFn(), name("FormatMaxesFn")); |
| |
| return results; |
| } |
| } |
| |
| /** |
| * Method to set field info for {@link JdbcPOJOInputOperator}. |
| * @return |
| */ |
| private List<FieldInfo> addInputFieldInfos() |
| { |
| List<FieldInfo> fieldInfos = Lists.newArrayList(); |
| fieldInfos.add(new FieldInfo("MONTH", "month", FieldInfo.SupportType.INTEGER)); |
| fieldInfos.add(new FieldInfo("DAY", "day", FieldInfo.SupportType.INTEGER)); |
| fieldInfos.add(new FieldInfo("YEAR", "year", FieldInfo.SupportType.INTEGER)); |
| fieldInfos.add(new FieldInfo("MEANTEMP", "meanTemp", FieldInfo.SupportType.DOUBLE)); |
| return fieldInfos; |
| } |
| |
| /** |
| * Method to set field info for {@link JdbcPOJOInsertOutputOperator}. |
| * @return |
| */ |
| private List<JdbcFieldInfo> addOutputFieldInfos() |
| { |
| List<JdbcFieldInfo> fieldInfos = Lists.newArrayList(); |
| fieldInfos.add(new JdbcFieldInfo("MONTH", "month", JdbcFieldInfo.SupportType.INTEGER, INTEGER)); |
| fieldInfos.add(new JdbcFieldInfo("MEANTEMP", "meanTemp", JdbcFieldInfo.SupportType.DOUBLE, DOUBLE)); |
| return fieldInfos; |
| } |
| |
| |
| /** |
| * Populate the dag using High-Level API. |
| * @param dag |
| * @param conf |
| */ |
| @Override |
| public void populateDAG(DAG dag, Configuration conf) |
| { |
| JdbcPOJOInputOperator jdbcInput = new JdbcPOJOInputOperator(); |
| jdbcInput.setFieldInfos(addInputFieldInfos()); |
| |
| JdbcStore store = new JdbcStore(); |
| jdbcInput.setStore(store); |
| |
| JdbcPOJOInsertOutputOperator jdbcOutput = new JdbcPOJOInsertOutputOperator(); |
| jdbcOutput.setFieldInfos(addOutputFieldInfos()); |
| JdbcTransactionalStore outputStore = new JdbcTransactionalStore(); |
| jdbcOutput.setStore(outputStore); |
| |
| // Create stream that reads from a Jdbc Input. |
| ApexStream<Object> stream = StreamFactory.fromInput(jdbcInput, jdbcInput.outputPort, name("jdbcInput")) |
| |
| // Apply window and trigger option to the stream. |
| .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) |
| |
| // Because Jdbc Input sends out a stream of Object, need to cast them to InputPojo. |
| .map(new Function.MapFunction<Object, InputPojo>() |
| { |
| @Override |
| public InputPojo f(Object input) |
| { |
| return (InputPojo)input; |
| } |
| }, name("ObjectToInputPojo")) |
| |
| // Plug in the composite transformation to the stream to calculate the maximum temperature for each month. |
| .addCompositeStreams(new MaxMeanTemp()) |
| |
| // Cast the resulted OutputPojo to Object for Jdbc Output to consume. |
| .map(new Function.MapFunction<OutputPojo, Object>() |
| { |
| @Override |
| public Object f(OutputPojo input) |
| { |
| return (Object)input; |
| } |
| }, name("OutputPojoToObject")) |
| |
| // Output the result to Jdbc Output. |
| .endWith(jdbcOutput, jdbcOutput.input, name("jdbcOutput")); |
| |
| stream.populateDag(dag); |
| |
| } |
| } |