blob: 330e5f6dfa1c17cfe741028f7fa96d6a6438f71e [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import unittest
from pyflink.common import SimpleStringSchema, Types
from pyflink.datastream.connectors.kinesis import PartitionKeyGenerator, FlinkKinesisConsumer, \
KinesisStreamsSink, KinesisFirehoseSink
from pyflink.testing.test_case_utils import PyFlinkUTTestCase
from pyflink.util.java_utils import get_field_value
@unittest.skip("Disable due to Kinesis connectors need to release a new version 2.0")
class FlinkKinesisTest(PyFlinkUTTestCase):
def test_kinesis_source(self):
consumer_config = {
'aws.region': 'us-east-1',
'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
'flink.stream.initpos': 'LATEST'
}
kinesis_source = FlinkKinesisConsumer("stream-1", SimpleStringSchema(), consumer_config)
ds = self.env.add_source(source_func=kinesis_source, source_name="kinesis source")
ds.print()
plan = eval(self.env.get_execution_plan())
self.assertEqual('Source: kinesis source', plan['nodes'][0]['type'])
self.assertEqual(
get_field_value(kinesis_source.get_java_function(), 'streams')[0], 'stream-1')
def test_kinesis_streams_sink(self):
sink_properties = {
'aws.region': 'us-east-1',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
}
ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3), ('deeefg', 4)],
type_info=Types.ROW([Types.STRING(), Types.INT()]))
kinesis_streams_sink = KinesisStreamsSink.builder() \
.set_kinesis_client_properties(sink_properties) \
.set_serialization_schema(SimpleStringSchema()) \
.set_partition_key_generator(PartitionKeyGenerator.fixed()) \
.set_stream_name("stream-1") \
.set_fail_on_error(False) \
.set_max_batch_size(500) \
.set_max_in_flight_requests(50) \
.set_max_buffered_requests(10000) \
.set_max_batch_size_in_bytes(5 * 1024 * 1024) \
.set_max_time_in_buffer_ms(5000) \
.set_max_record_size_in_bytes(1 * 1024 * 1024) \
.build()
ds.sink_to(kinesis_streams_sink).name('kinesis streams sink')
plan = eval(self.env.get_execution_plan())
self.assertEqual('kinesis streams sink: Writer', plan['nodes'][1]['type'])
self.assertEqual(get_field_value(kinesis_streams_sink.get_java_function(), 'failOnError'),
False)
self.assertEqual(
get_field_value(kinesis_streams_sink.get_java_function(), 'streamName'), 'stream-1')
def test_kinesis_firehose_sink(self):
sink_properties = {
'aws.region': 'eu-west-1',
'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
}
ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3), ('deeefg', 4)],
type_info=Types.ROW([Types.STRING(), Types.INT()]))
kinesis_firehose_sink = KinesisFirehoseSink.builder() \
.set_firehose_client_properties(sink_properties) \
.set_serialization_schema(SimpleStringSchema()) \
.set_delivery_stream_name('stream-1') \
.set_fail_on_error(False) \
.set_max_batch_size(500) \
.set_max_in_flight_requests(50) \
.set_max_buffered_requests(10000) \
.set_max_batch_size_in_bytes(5 * 1024 * 1024) \
.set_max_time_in_buffer_ms(5000) \
.set_max_record_size_in_bytes(1 * 1024 * 1024) \
.build()
ds.sink_to(kinesis_firehose_sink).name('kinesis firehose sink')
plan = eval(self.env.get_execution_plan())
self.assertEqual('kinesis firehose sink: Writer', plan['nodes'][1]['type'])
self.assertEqual(get_field_value(kinesis_firehose_sink.get_java_function(), 'failOnError'),
False)
self.assertEqual(
get_field_value(kinesis_firehose_sink.get_java_function(), 'deliveryStreamName'),
'stream-1')