blob: 0dad234a2ef05e6319937a88e71ef5df9a40bc9d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Integration test for Python cross-language pipelines for Java KafkaIO."""
from __future__ import absolute_import
import contextlib
import logging
import os
import socket
import subprocess
import sys
import time
import typing
import unittest
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.kafka import WriteToKafka
from apache_beam.metrics import Metrics
from apache_beam.testing.test_pipeline import TestPipeline
class CrossLanguageKafkaIO(object):
def __init__(self, bootstrap_servers, topic, expansion_service=None):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
self.expansion_service = expansion_service
self.sum_counter = Metrics.counter('source', 'elements_sum')
def build_write_pipeline(self, pipeline):
_ = (
pipeline
| 'Impulse' >> beam.Impulse()
| 'Generate' >> beam.FlatMap(lambda x: range(1000)) # pylint: disable=range-builtin-not-iterating
| 'Reshuffle' >> beam.Reshuffle()
| 'MakeKV' >> beam.Map(lambda x:
(b'', str(x).encode())).with_output_types(
typing.Tuple[bytes, bytes])
| 'WriteToKafka' >> WriteToKafka(
producer_config={'bootstrap.servers': self.bootstrap_servers},
topic=self.topic,
expansion_service=self.expansion_service))
def build_read_pipeline(self, pipeline):
_ = (
pipeline
| 'ReadFromKafka' >> ReadFromKafka(
consumer_config={
'bootstrap.servers': self.bootstrap_servers,
'auto.offset.reset': 'earliest'
},
topics=[self.topic],
expansion_service=self.expansion_service)
| 'Windowing' >> beam.WindowInto(
beam.window.FixedWindows(300),
trigger=beam.transforms.trigger.AfterProcessingTime(60),
accumulation_mode=beam.transforms.trigger.AccumulationMode.
DISCARDING)
| 'DecodingValue' >> beam.Map(lambda elem: int(elem[1].decode()))
| 'CombineGlobally' >> beam.CombineGlobally(sum).without_defaults()
| 'SetSumCounter' >> beam.Map(self.sum_counter.inc))
def run_xlang_kafkaio(self, pipeline):
self.build_write_pipeline(pipeline)
self.build_read_pipeline(pipeline)
pipeline.run(False)
@unittest.skipUnless(
os.environ.get('LOCAL_KAFKA_JAR'),
"LOCAL_KAFKA_JAR environment var is not provided.")
class CrossLanguageKafkaIOTest(unittest.TestCase):
def get_platform_localhost(self):
if sys.platform == 'darwin':
return 'host.docker.internal'
else:
return 'localhost'
def get_open_port(self):
s = None
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except: # pylint: disable=bare-except
# Above call will fail for nodes that only support IPv6.
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.bind(('localhost', 0))
s.listen(1)
port = s.getsockname()[1]
s.close()
return port
@contextlib.contextmanager
def local_kafka_service(self, local_kafka_jar_file):
kafka_port = str(self.get_open_port())
zookeeper_port = str(self.get_open_port())
kafka_server = None
try:
kafka_server = subprocess.Popen(
['java', '-jar', local_kafka_jar_file, kafka_port, zookeeper_port])
time.sleep(3)
yield kafka_port
finally:
if kafka_server:
kafka_server.kill()
def test_kafkaio_write(self):
local_kafka_jar = os.environ.get('LOCAL_KAFKA_JAR')
with self.local_kafka_service(local_kafka_jar) as kafka_port:
p = TestPipeline()
p.not_use_test_runner_api = True
xlang_kafkaio = CrossLanguageKafkaIO(
'%s:%s' % (self.get_platform_localhost(), kafka_port),
'xlang_kafkaio_test')
xlang_kafkaio.build_write_pipeline(p)
job = p.run()
job.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()