blob: 2817ea9f93b3564d2899cd722a2895a06f379b9f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Integration test for Python cross-language pipelines for Java KinesisIO.
If you want to run the tests on localstack then run it just with pipeline
options.
To test it on a real AWS account you need to pass some additional params, e.g.:
python setup.py nosetests \
--tests=apache_beam.io.external.xlang_kinesisio_it_test \
--test-pipeline-options="
--use_real_aws
--aws_kinesis_stream=<STREAM_NAME>
--aws_access_key=<AWS_ACCESS_KEY>
--aws_secret_key=<AWS_SECRET_KEY>
--aws_region=<AWS_REGION>
--runner=FlinkRunner"
"""
# pytype: skip-file
import argparse
import logging
import time
import unittest
import uuid
import apache_beam as beam
from apache_beam.io.kinesis import InitialPositionInStream
from apache_beam.io.kinesis import ReadDataFromKinesis
from apache_beam.io.kinesis import WatermarkPolicy
from apache_beam.io.kinesis import WriteToKinesis
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
try:
import boto3
except ImportError:
boto3 = None
try:
from testcontainers.core.container import DockerContainer
except ImportError:
DockerContainer = None
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
LOCALSTACK_VERSION = '0.11.3'
NUM_RECORDS = 10
MAX_READ_TIME = 5 * 60 * 1000 # 5min
NOW_SECONDS = time.time()
NOW_MILLIS = NOW_SECONDS * 1000
REQUEST_RECORDS_LIMIT = 1000
RECORD = b'record' + str(uuid.uuid4()).encode()
@unittest.skipUnless(DockerContainer, 'testcontainers is not installed.')
@unittest.skipUnless(boto3, 'boto3 is not installed.')
@unittest.skipUnless(
TestPipeline().get_pipeline_options().view_as(StandardOptions).runner,
'Do not run this test on precommit suites.')
class CrossLanguageKinesisIOTest(unittest.TestCase):
@unittest.skipUnless(
TestPipeline().get_option('aws_kinesis_stream'),
'Cannot test on real aws without pipeline options provided')
def test_kinesis_io_roundtrip(self):
# TODO: enable this test for localstack once BEAM-10664 is resolved
self.run_kinesis_write()
self.run_kinesis_read()
@unittest.skipIf(
TestPipeline().get_option('aws_kinesis_stream'),
'Do not test on localstack when pipeline options were provided')
def test_kinesis_write(self):
# TODO: remove this test once BEAM-10664 is resolved
self.run_kinesis_write()
records = self.kinesis_helper.read_from_stream(self.aws_kinesis_stream)
self.assertEqual(
sorted(records),
sorted([RECORD + str(i).encode() for i in range(NUM_RECORDS)]))
def run_kinesis_write(self):
with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p:
p.not_use_test_runner_api = True
_ = (
p
| 'Impulse' >> beam.Impulse()
| 'Generate' >> beam.FlatMap(lambda x: range(NUM_RECORDS)) # pylint: disable=bad-option-value
| 'Map to bytes' >>
beam.Map(lambda x: RECORD + str(x).encode()).with_output_types(bytes)
| 'WriteToKinesis' >> WriteToKinesis(
stream_name=self.aws_kinesis_stream,
aws_access_key=self.aws_access_key,
aws_secret_key=self.aws_secret_key,
region=self.aws_region,
service_endpoint=self.aws_service_endpoint,
verify_certificate=(not self.use_localstack),
partition_key='1',
producer_properties=self.producer_properties,
))
def run_kinesis_read(self):
records = [RECORD + str(i).encode() for i in range(NUM_RECORDS)]
with TestPipeline(options=PipelineOptions(self.pipeline_args)) as p:
result = (
p
| 'ReadFromKinesis' >> ReadDataFromKinesis(
stream_name=self.aws_kinesis_stream,
aws_access_key=self.aws_access_key,
aws_secret_key=self.aws_secret_key,
region=self.aws_region,
service_endpoint=self.aws_service_endpoint,
verify_certificate=not self.use_localstack,
max_num_records=NUM_RECORDS,
max_read_time=MAX_READ_TIME,
request_records_limit=REQUEST_RECORDS_LIMIT,
watermark_policy=WatermarkPolicy.ARRIVAL_TIME,
watermark_idle_duration_threshold=MAX_READ_TIME,
initial_position_in_stream=InitialPositionInStream.AT_TIMESTAMP,
initial_timestamp_in_stream=NOW_MILLIS,
).with_output_types(bytes))
assert_that(result, equal_to(records))
def set_localstack(self):
self.localstack = DockerContainer('localstack/localstack:{}'
.format(LOCALSTACK_VERSION))\
.with_env('SERVICES', 'kinesis')\
.with_env('KINESIS_PORT', '4568')\
.with_env('USE_SSL', 'true')\
.with_exposed_ports(4568)\
.with_volume_mapping('/var/run/docker.sock', '/var/run/docker.sock', 'rw')
# Repeat if ReadTimeout is raised.
for i in range(4):
try:
self.localstack.start()
break
except Exception as e: # pylint: disable=bare-except
if i == 3:
logging.error('Could not initialize localstack container')
raise e
self.aws_service_endpoint = 'https://{}:{}'.format(
self.localstack.get_container_host_ip(),
self.localstack.get_exposed_port('4568'),
)
def setUp(self):
parser = argparse.ArgumentParser()
parser.add_argument(
'--aws_kinesis_stream',
default='beam_kinesis_xlang',
help='Kinesis stream name',
)
parser.add_argument(
'--aws_access_key',
default='accesskey',
help=('Aws access key'),
)
parser.add_argument(
'--aws_secret_key',
default='secretkey',
help='Aws secret key',
)
parser.add_argument(
'--aws_region',
default='us-east-1',
help='Aws region',
)
parser.add_argument(
'--aws_service_endpoint',
default=None,
help='Url to external aws endpoint',
)
parser.add_argument(
'--use_real_aws',
default=False,
dest='use_real_aws',
action='store_true',
help='Flag whether to use real aws for the tests purpose',
)
parser.add_argument(
'--expansion_service',
help='Url to externally launched expansion service.',
)
pipeline = TestPipeline()
argv = pipeline.get_full_options_as_args()
known_args, self.pipeline_args = parser.parse_known_args(argv)
self.aws_kinesis_stream = known_args.aws_kinesis_stream
self.aws_access_key = known_args.aws_access_key
self.aws_secret_key = known_args.aws_secret_key
self.aws_region = known_args.aws_region
self.aws_service_endpoint = known_args.aws_service_endpoint
self.use_localstack = not known_args.use_real_aws
self.expansion_service = known_args.expansion_service
self.producer_properties = {
'CollectionMaxCount': str(NUM_RECORDS),
'ConnectTimeout': str(MAX_READ_TIME),
}
if self.use_localstack:
self.set_localstack()
self.kinesis_helper = KinesisHelper(
self.aws_access_key,
self.aws_secret_key,
self.aws_region,
self.aws_service_endpoint.replace('https', 'http')
if self.aws_service_endpoint else None,
)
if self.use_localstack:
self.kinesis_helper.create_stream(self.aws_kinesis_stream)
def tearDown(self):
if self.use_localstack:
self.kinesis_helper.delete_stream(self.aws_kinesis_stream)
try:
self.localstack.stop()
except: # pylint: disable=bare-except
logging.error('Could not stop the localstack container')
class KinesisHelper:
def __init__(self, access_key, secret_key, region, service_endpoint):
self.kinesis_client = boto3.client(
service_name='kinesis',
region_name=region,
endpoint_url=service_endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)
def create_stream(self, stream_name):
# localstack could not have initialized in the container yet so repeat
retries = 10
for i in range(retries):
try:
self.kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=1,
)
time.sleep(2)
break
except: # pylint: disable=bare-except
if i == retries - 1:
logging.error('Could not create kinesis stream')
raise
# Wait for the stream to be active
self.get_first_shard_id(stream_name)
def delete_stream(self, stream_name):
self.kinesis_client.delete_stream(
StreamName=stream_name,
EnforceConsumerDeletion=True,
)
def get_first_shard_id(self, stream_name):
retries = 10
stream = self.kinesis_client.describe_stream(StreamName=stream_name)
for i in range(retries):
if stream['StreamDescription']['StreamStatus'] == 'ACTIVE':
break
time.sleep(2)
if i == retries - 1:
logging.error('Could not initialize kinesis stream')
raise
stream = self.kinesis_client.describe_stream(StreamName=stream_name)
return stream['StreamDescription']['Shards'][0]['ShardId']
def read_from_stream(self, stream_name):
shard_id = self.get_first_shard_id(stream_name)
shard_iterator = self.kinesis_client.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType=InitialPositionInStream.AT_TIMESTAMP,
Timestamp=str(NOW_SECONDS),
)
result = self.kinesis_client.get_records(
ShardIterator=shard_iterator['ShardIterator'],
Limit=NUM_RECORDS,
)
return [record['Data'] for record in result['Records']]
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()