| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.application; |
| |
| import org.apache.samza.annotation.InterfaceStability; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.operators.ContextManager; |
| import org.apache.samza.operators.MessageStream; |
| import org.apache.samza.operators.OutputStream; |
| import org.apache.samza.operators.StreamGraph; |
| import org.apache.samza.operators.functions.InitableFunction; |
| import org.apache.samza.task.StreamTask; |
| import org.apache.samza.task.TaskContext; |
| |
| /** |
| * Describes and initializes the transforms for processing message streams and generating results. |
| * <p> |
| * The following example removes page views older than 1 hour from the input stream: |
| * <pre>{@code |
| * public class PageViewCounter implements StreamApplication { |
| * public void init(StreamGraph graph, Config config) { |
| * MessageStream<PageViewEvent> pageViewEvents = |
| * graph.getInputStream("pageViewEvents", (k, m) -> (PageViewEvent) m); |
| * OutputStream<String, PageViewEvent, PageViewEvent> recentPageViewEvents = |
| * graph.getOutputStream("recentPageViewEvents", m -> m.memberId, m -> m); |
| * |
| * pageViewEvents |
| * .filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis()) |
| * .sendTo(filteredPageViewEvents); |
| * } |
| * } |
| * }</pre> |
| *<p> |
| * The example above can be run using an ApplicationRunner: |
| * <pre>{@code |
| * public static void main(String[] args) { |
| * CommandLine cmdLine = new CommandLine(); |
| * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args)); |
| * PageViewCounter app = new PageViewCounter(); |
| * LocalApplicationRunner runner = new LocalApplicationRunner(config); |
| * runner.run(app); |
| * runner.waitForFinish(); |
| * } |
| * }</pre> |
| * <p> |
| * Implementation Notes: Currently StreamApplications are wrapped in a {@link StreamTask} during execution. |
| * A new StreamApplication instance will be created and initialized when planning the execution, as well as for each |
| * {@link StreamTask} instance used for processing incoming messages. Execution is synchronous and thread-safe |
| * within each {@link StreamTask}. |
| */ |
| @InterfaceStability.Unstable |
| public interface StreamApplication { |
| |
| /** |
| * Describes and initializes the transforms for processing message streams and generating results. |
| * <p> |
| * The {@link StreamGraph} provides access to input and output streams. Input {@link MessageStream}s can be |
| * transformed into other {@link MessageStream}s or sent to an {@link OutputStream} using the {@link MessageStream} |
| * operators. |
| * <p> |
| * Most operators accept custom functions for doing the transformations. These functions are {@link InitableFunction}s |
| * and are provided the {@link Config} and {@link TaskContext} during their own initialization. The config and the |
| * context can be used, for example, to create custom metrics or access durable state stores. |
| * <p> |
| * A shared context between {@link InitableFunction}s for different operators within a task instance can be set |
| * up by providing a {@link ContextManager} using {@link StreamGraph#withContextManager}. |
| * |
| * @param graph the {@link StreamGraph} to get input/output streams from |
| * @param config the configuration for the application |
| */ |
| void init(StreamGraph graph, Config config); |
| |
| } |