blob: 9f8156774d57cd3f27c1c35ec1fb6c3bb5957547 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.flink.connectors;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import java.util.Properties;
public class Test {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10_000);
// DataStream<String> stream = env.readTextFile("s3://shausma-nyc-tlc/yellow-trip-data/taxi-trips.json/dropoff_year=2010/part-00000-cdac5fe4-b823-4576-aeb7-7327b077476e.c000.json");
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "eu-west-1");
consumerConfig.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
DataStream<String> stream = env.addSource(new FlinkKinesisConsumer<>("test-in", new SimpleStringSchema(), consumerConfig));
stream.sinkTo(new AmazonKinesisDataStreamSink<>("test-out"));
/*
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.readTextFile(textPath);
*
* then, transform the resulting DataStream<String> using operations
* like
* .filter()
* .flatMap()
* .join()
* .coGroup()
*
* and many more.
* Have a look at the programming guide for the Java API:
*
* https://flink.apache.org/docs/latest/apis/streaming/index.html
*
*/
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}