blob: 48dfe11eeb9dadc96530f3d1dfb1f88a6aee7613 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""End-to-end test for Avro IO's fastavro support.
Writes a configurable number of records to a temporary location with each of
{avro,fastavro}, then reads them back in, joins the two read datasets, and
verifies they have the same elements.
Usage:
DataFlowRunner:
python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
--test-pipeline-options="
--runner=TestDataflowRunner
--project=...
--staging_location=gs://...
--temp_location=gs://...
--output=gs://...
--sdk_location=...
"
DirectRunner:
python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
--test-pipeline-options="
--output=/tmp
--records=5000
"
"""
from __future__ import absolute_import
from __future__ import division
import json
import logging
import os
import sys
import unittest
import uuid
from fastavro import parse_schema
from nose.plugins.attrib import attr
from apache_beam.io.avroio import ReadAllFromAvro
from apache_beam.io.avroio import WriteToAvro
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_utils import delete_files
from apache_beam.testing.util import BeamAssertException
from apache_beam.transforms.core import Create
from apache_beam.transforms.core import FlatMap
from apache_beam.transforms.core import Map
from apache_beam.transforms.util import CoGroupByKey
# pylint: disable=wrong-import-order, wrong-import-position
try:
from avro.schema import Parse # avro-python3 library for python3
except ImportError:
from avro.schema import parse as Parse # avro library for python2
# pylint: enable=wrong-import-order, wrong-import-position
LABELS = ['abc', 'def', 'ghi', 'jkl', 'mno', 'pqr', 'stu', 'vwx']
COLORS = ['RED', 'ORANGE', 'YELLOW', 'GREEN', 'BLUE', 'PURPLE', None]
def record(i):
return {
'label': LABELS[i % len(LABELS)],
'number': i,
'number_str': str(i),
'color': COLORS[i % len(COLORS)]
}
@unittest.skipIf(sys.version_info[0] >= 3 and
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
'Due to a known issue in avro-python3 package, this'
'test is skipped until BEAM-6522 is addressed. ')
class FastavroIT(unittest.TestCase):
SCHEMA_STRING = '''
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "label", "type": "string"},
{"name": "number", "type": ["int", "null"]},
{"name": "number_str", "type": ["string", "null"]},
{"name": "color", "type": ["string", "null"]}
]
}
'''
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.uuid = str(uuid.uuid4())
self.output = '/'.join([
self.test_pipeline.get_option('output'),
self.uuid
])
@attr('IT')
def test_avro_it(self):
num_records = self.test_pipeline.get_option('records')
num_records = int(num_records) if num_records else 1000000
# Seed a `PCollection` with indices that will each be FlatMap'd into
# `batch_size` records, to avoid having a too-large list in memory at
# the outset
batch_size = self.test_pipeline.get_option('batch-size')
batch_size = int(batch_size) if batch_size else 10000
# pylint: disable=range-builtin-not-iterating
batches = range(int(num_records / batch_size))
def batch_indices(start):
# pylint: disable=range-builtin-not-iterating
return range(start * batch_size, (start + 1) * batch_size)
# A `PCollection` with `num_records` avro records
records_pcoll = \
self.test_pipeline \
| 'create-batches' >> Create(batches) \
| 'expand-batches' >> FlatMap(batch_indices) \
| 'create-records' >> Map(record)
fastavro_output = '/'.join([self.output, 'fastavro'])
avro_output = '/'.join([self.output, 'avro'])
# pylint: disable=expression-not-assigned
records_pcoll \
| 'write_fastavro' >> WriteToAvro(
fastavro_output,
parse_schema(json.loads(self.SCHEMA_STRING)),
use_fastavro=True
)
# pylint: disable=expression-not-assigned
records_pcoll \
| 'write_avro' >> WriteToAvro(
avro_output,
Parse(self.SCHEMA_STRING),
use_fastavro=False
)
result = self.test_pipeline.run()
result.wait_until_finish()
assert result.state == PipelineState.DONE
fastavro_read_pipeline = TestPipeline(is_integration_test=True)
fastavro_records = \
fastavro_read_pipeline \
| 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
| 'read-fastavro' >> ReadAllFromAvro(use_fastavro=True) \
| Map(lambda rec: (rec['number'], rec))
avro_records = \
fastavro_read_pipeline \
| 'create-avro' >> Create(['%s*' % avro_output]) \
| 'read-avro' >> ReadAllFromAvro(use_fastavro=False) \
| Map(lambda rec: (rec['number'], rec))
def check(elem):
v = elem[1]
def assertEqual(l, r):
if l != r:
raise BeamAssertException('Assertion failed: %s == %s' % (l, r))
assertEqual(v.keys(), ['avro', 'fastavro'])
avro_values = v['avro']
fastavro_values = v['fastavro']
assertEqual(avro_values, fastavro_values)
assertEqual(len(avro_values), 1)
# pylint: disable=expression-not-assigned
{
'avro': avro_records,
'fastavro': fastavro_records
} \
| CoGroupByKey() \
| Map(check)
self.addCleanup(delete_files, [self.output])
fastavro_read_pipeline.run().wait_until_finish()
assert result.state == PipelineState.DONE
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()