blob: 94e883a2bb9136ff86487811160798edc1a8aced [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.common import Types
from pyflink.datastream.connectors.cassandra import CassandraSink, MapperOptions, ConsistencyLevel
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
class CassandraSinkTest(PyFlinkStreamingTestCase):
def test_cassandra_sink(self):
type_info = Types.ROW([Types.STRING(), Types.INT()])
ds = self.env.from_collection([('ab', 1), ('bdc', 2), ('cfgs', 3), ('deeefg', 4)],
type_info=type_info)
cassandra_sink_builder = CassandraSink.add_sink(ds)
cassandra_sink = cassandra_sink_builder\
.set_host('localhost', 9876) \
.set_query('query') \
.enable_ignore_null_fields() \
.set_mapper_options(MapperOptions()
.ttl(1)
.timestamp(100)
.tracing(True)
.if_not_exists(False)
.consistency_level(ConsistencyLevel.ANY)
.save_null_fields(True)) \
.set_max_concurrent_requests(1000) \
.build()
cassandra_sink.name('cassandra_sink').set_parallelism(3)
plan = eval(self.env.get_execution_plan())
self.assertEqual("Sink: cassandra_sink", plan['nodes'][1]['type'])
self.assertEqual(3, plan['nodes'][1]['parallelism'])