| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from collections import OrderedDict |
| import argparse |
| import binascii |
| import contextlib |
| import glob |
| import gzip |
| import itertools |
| import json |
| import os |
| import random |
| import six |
| import string |
| import subprocess |
| import sys |
| import tempfile |
| import traceback |
| import uuid |
| import errno |
| |
| import numpy as np |
| |
| |
| ARROW_HOME = os.path.abspath(__file__).rsplit("/", 2)[0] |
| |
| # Control for flakiness |
| np.random.seed(12345) |
| |
| |
| def load_version_from_pom(): |
| import xml.etree.ElementTree as ET |
| tree = ET.parse(os.path.join(ARROW_HOME, 'java', 'pom.xml')) |
| tag_pattern = '{http://maven.apache.org/POM/4.0.0}version' |
| version_tag = list(tree.getroot().findall(tag_pattern))[0] |
| return version_tag.text |
| |
| |
| def guid(): |
| return uuid.uuid4().hex |
| |
| |
| # from pandas |
| RANDS_CHARS = np.array(list(string.ascii_letters + string.digits), |
| dtype=(np.str_, 1)) |
| |
| |
| def rands(nchars): |
| """ |
| Generate one random byte string. |
| |
| See `rands_array` if you want to create an array of random strings. |
| |
| """ |
| return ''.join(np.random.choice(RANDS_CHARS, nchars)) |
| |
| |
| def tobytes(o): |
| if isinstance(o, six.text_type): |
| return o.encode('utf8') |
| return o |
| |
| |
| def frombytes(o): |
| if isinstance(o, six.binary_type): |
| return o.decode('utf8') |
| return o |
| |
| |
| # from the merge_arrow_pr.py script |
| def run_cmd(cmd): |
| if isinstance(cmd, six.string_types): |
| cmd = cmd.split(' ') |
| |
| try: |
| output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) |
| except subprocess.CalledProcessError as e: |
| # this avoids hiding the stdout / stderr of failed processes |
| print('Command failed: %s' % ' '.join(cmd)) |
| print('With output:') |
| print('--------------') |
| print(frombytes(e.output)) |
| print('--------------') |
| raise e |
| |
| return frombytes(output) |
| |
| # ---------------------------------------------------------------------- |
| # Data generation |
| |
| |
| class DataType(object): |
| |
| def __init__(self, name, nullable=True): |
| self.name = name |
| self.nullable = nullable |
| |
| def get_json(self): |
| return OrderedDict([ |
| ('name', self.name), |
| ('type', self._get_type()), |
| ('nullable', self.nullable), |
| ('children', self._get_children()) |
| ]) |
| |
| def _make_is_valid(self, size): |
| if self.nullable: |
| return np.random.randint(0, 2, size=size) |
| else: |
| return np.ones(size) |
| |
| |
| class Column(object): |
| |
| def __init__(self, name, count): |
| self.name = name |
| self.count = count |
| |
| def __len__(self): |
| return self.count |
| |
| def _get_children(self): |
| return [] |
| |
| def _get_buffers(self): |
| return [] |
| |
| def get_json(self): |
| entries = [ |
| ('name', self.name), |
| ('count', self.count) |
| ] |
| |
| buffers = self._get_buffers() |
| entries.extend(buffers) |
| |
| children = self._get_children() |
| if len(children) > 0: |
| entries.append(('children', children)) |
| |
| return OrderedDict(entries) |
| |
| |
| class PrimitiveType(DataType): |
| |
| def _get_children(self): |
| return [] |
| |
| |
| class PrimitiveColumn(Column): |
| |
| def __init__(self, name, count, is_valid, values): |
| super(PrimitiveColumn, self).__init__(name, count) |
| self.is_valid = is_valid |
| self.values = values |
| |
| def _encode_value(self, x): |
| return x |
| |
| def _get_buffers(self): |
| return [ |
| ('VALIDITY', [int(v) for v in self.is_valid]), |
| ('DATA', list([self._encode_value(x) for x in self.values])) |
| ] |
| |
| |
| TEST_INT_MAX = 2 ** 31 - 1 |
| TEST_INT_MIN = ~TEST_INT_MAX |
| |
| |
| class IntegerType(PrimitiveType): |
| |
| def __init__(self, name, is_signed, bit_width, nullable=True, |
| min_value=TEST_INT_MIN, |
| max_value=TEST_INT_MAX): |
| super(IntegerType, self).__init__(name, nullable=nullable) |
| self.is_signed = is_signed |
| self.bit_width = bit_width |
| self.min_value = min_value |
| self.max_value = max_value |
| |
| def _get_generated_data_bounds(self): |
| if self.is_signed: |
| signed_iinfo = np.iinfo('int' + str(self.bit_width)) |
| min_value, max_value = signed_iinfo.min, signed_iinfo.max |
| else: |
| unsigned_iinfo = np.iinfo('uint' + str(self.bit_width)) |
| min_value, max_value = 0, unsigned_iinfo.max |
| |
| lower_bound = max(min_value, self.min_value) |
| upper_bound = min(max_value, self.max_value) |
| return lower_bound, upper_bound |
| |
| def _get_type(self): |
| return OrderedDict([ |
| ('name', 'int'), |
| ('isSigned', self.is_signed), |
| ('bitWidth', self.bit_width) |
| ]) |
| |
| def generate_column(self, size, name=None): |
| lower_bound, upper_bound = self._get_generated_data_bounds() |
| return self.generate_range(size, lower_bound, upper_bound, name=name) |
| |
| def generate_range(self, size, lower, upper, name=None): |
| values = [int(x) for x in |
| np.random.randint(lower, upper, size=size)] |
| |
| is_valid = self._make_is_valid(size) |
| |
| if name is None: |
| name = self.name |
| return PrimitiveColumn(name, size, is_valid, values) |
| |
| |
| class DateType(IntegerType): |
| |
| DAY = 0 |
| MILLISECOND = 1 |
| |
| # 1/1/1 to 12/31/9999 |
| _ranges = { |
| DAY: [-719162, 2932896], |
| MILLISECOND: [-62135596800000, 253402214400000] |
| } |
| |
| def __init__(self, name, unit, nullable=True): |
| bit_width = 32 if unit == self.DAY else 64 |
| |
| min_value, max_value = self._ranges[unit] |
| super(DateType, self).__init__( |
| name, True, bit_width, nullable=nullable, |
| min_value=min_value, max_value=max_value |
| ) |
| self.unit = unit |
| |
| def _get_type(self): |
| return OrderedDict([ |
| ('name', 'date'), |
| ('unit', 'DAY' if self.unit == self.DAY else 'MILLISECOND') |
| ]) |
| |
| |
| TIMEUNIT_NAMES = { |
| 's': 'SECOND', |
| 'ms': 'MILLISECOND', |
| 'us': 'MICROSECOND', |
| 'ns': 'NANOSECOND' |
| } |
| |
| |
| class TimeType(IntegerType): |
| |
| BIT_WIDTHS = { |
| 's': 32, |
| 'ms': 32, |
| 'us': 64, |
| 'ns': 64 |
| } |
| |
| _ranges = { |
| 's': [0, 86400], |
| 'ms': [0, 86400000], |
| 'us': [0, 86400000000], |
| 'ns': [0, 86400000000000] |
| } |
| |
| def __init__(self, name, unit='s', nullable=True): |
| min_val, max_val = self._ranges[unit] |
| super(TimeType, self).__init__(name, True, self.BIT_WIDTHS[unit], |
| nullable=nullable, |
| min_value=min_val, |
| max_value=max_val) |
| self.unit = unit |
| |
| def _get_type(self): |
| return OrderedDict([ |
| ('name', 'time'), |
| ('unit', TIMEUNIT_NAMES[self.unit]), |
| ('bitWidth', self.bit_width) |
| ]) |
| |
| |
| class TimestampType(IntegerType): |
| |
| # 1/1/1 to 12/31/9999 |
| _ranges = { |
| 's': [-62135596800, 253402214400], |
| 'ms': [-62135596800000, 253402214400000], |
| 'us': [-62135596800000000, 253402214400000000], |
| |
| # Physical range for int64, ~584 years and change |
| 'ns': [np.iinfo('int64').min, np.iinfo('int64').max] |
| } |
| |
| def __init__(self, name, unit='s', tz=None, nullable=True): |
| min_val, max_val = self._ranges[unit] |
| super(TimestampType, self).__init__(name, True, 64, nullable=nullable, |
| min_value=min_val, |
| max_value=max_val) |
| self.unit = unit |
| self.tz = tz |
| |
| def _get_type(self): |
| fields = [ |
| ('name', 'timestamp'), |
| ('unit', TIMEUNIT_NAMES[self.unit]) |
| ] |
| |
| if self.tz is not None: |
| fields.append(('timezone', self.tz)) |
| |
| return OrderedDict(fields) |
| |
| |
| class DurationIntervalType(IntegerType): |
| def __init__(self, name, unit='s', nullable=True): |
| min_val, max_val = np.iinfo('int64').min, np.iinfo('int64').max, |
| super(DurationIntervalType, self).__init__( |
| name, True, 64, nullable=nullable, |
| min_value=min_val, |
| max_value=max_val) |
| self.unit = unit |
| |
| def _get_type(self): |
| fields = [ |
| ('name', 'duration'), |
| ('unit', TIMEUNIT_NAMES[self.unit]) |
| ] |
| |
| return OrderedDict(fields) |
| |
| |
| class YearMonthIntervalType(IntegerType): |
| def __init__(self, name, nullable=True): |
| min_val, max_val = [-10000*12, 10000*12] # +/- 10000 years. |
| super(YearMonthIntervalType, self).__init__( |
| name, True, 32, nullable=nullable, |
| min_value=min_val, |
| max_value=max_val) |
| |
| def _get_type(self): |
| fields = [ |
| ('name', 'interval'), |
| ('unit', 'YEAR_MONTH'), |
| ] |
| |
| return OrderedDict(fields) |
| |
| |
| class DayTimeIntervalType(PrimitiveType): |
| def __init__(self, name, nullable=True): |
| super(DayTimeIntervalType, self).__init__(name, nullable=True) |
| |
| @property |
| def numpy_type(self): |
| return object |
| |
| def _get_type(self): |
| |
| return OrderedDict([ |
| ('name', 'interval'), |
| ('unit', 'DAY_TIME'), |
| ]) |
| |
| def generate_column(self, size, name=None): |
| min_day_value, max_day_value = -10000*366, 10000*366 |
| values = [{'days': random.randint(min_day_value, max_day_value), |
| 'milliseconds': random.randint(-86400000, +86400000)} |
| for _ in range(size)] |
| |
| is_valid = self._make_is_valid(size) |
| if name is None: |
| name = self.name |
| return PrimitiveColumn(name, size, is_valid, values) |
| |
| |
| class FloatingPointType(PrimitiveType): |
| |
| def __init__(self, name, bit_width, nullable=True): |
| super(FloatingPointType, self).__init__(name, nullable=nullable) |
| |
| self.bit_width = bit_width |
| self.precision = { |
| 16: 'HALF', |
| 32: 'SINGLE', |
| 64: 'DOUBLE' |
| }[self.bit_width] |
| |
| @property |
| def numpy_type(self): |
| return 'float' + str(self.bit_width) |
| |
| def _get_type(self): |
| return OrderedDict([ |
| ('name', 'floatingpoint'), |
| ('precision', self.precision) |
| ]) |
| |
| def generate_column(self, size, name=None): |
| values = np.random.randn(size) * 1000 |
| values = np.round(values, 3) |
| |
| is_valid = self._make_is_valid(size) |
| if name is None: |
| name = self.name |
| return PrimitiveColumn(name, size, is_valid, values) |
| |
| |
| DECIMAL_PRECISION_TO_VALUE = { |
| key: (1 << (8 * i - 1)) - 1 for i, key in enumerate( |
| [1, 3, 5, 7, 10, 12, 15, 17, 19, 22, 24, 27, 29, 32, 34, 36], |
| start=1, |
| ) |
| } |
| |
| |
| def decimal_range_from_precision(precision): |
| assert 1 <= precision <= 38 |
| try: |
| max_value = DECIMAL_PRECISION_TO_VALUE[precision] |
| except KeyError: |
| return decimal_range_from_precision(precision - 1) |
| else: |
| return ~max_value, max_value |
| |
| |
| class DecimalType(PrimitiveType): |
| def __init__(self, name, precision, scale, bit_width=128, nullable=True): |
| super(DecimalType, self).__init__(name, nullable=True) |
| self.precision = precision |
| self.scale = scale |
| self.bit_width = bit_width |
| |
| @property |
| def numpy_type(self): |
| return object |
| |
| def _get_type(self): |
| return OrderedDict([ |
| ('name', 'decimal'), |
| ('precision', self.precision), |
| ('scale', self.scale), |
| ]) |
| |
| def generate_column(self, size, name=None): |
| min_value, max_value = decimal_range_from_precision(self.precision) |
| values = [random.randint(min_value, max_value) for _ in range(size)] |
| |
| is_valid = self._make_is_valid(size) |
| if name is None: |
| name = self.name |
| return DecimalColumn(name, size, is_valid, values, self.bit_width) |
| |
| |
| class DecimalColumn(PrimitiveColumn): |
| |
| def __init__(self, name, count, is_valid, values, bit_width=128): |
| super(DecimalColumn, self).__init__(name, count, is_valid, values) |
| self.bit_width = bit_width |
| |
| def _encode_value(self, x): |
| return str(x) |
| |
| |
| class BooleanType(PrimitiveType): |
| bit_width = 1 |
| |
| def _get_type(self): |
| return OrderedDict([('name', 'bool')]) |
| |
| @property |
| def numpy_type(self): |
| return 'bool' |
| |
| def generate_column(self, size, name=None): |
| values = list(map(bool, np.random.randint(0, 2, size=size))) |
| is_valid = self._make_is_valid(size) |
| if name is None: |
| name = self.name |
| return PrimitiveColumn(name, size, is_valid, values) |
| |
| |
| class BinaryType(PrimitiveType): |
| |
| @property |
| def numpy_type(self): |
| return object |
| |
| @property |
| def column_class(self): |
| return BinaryColumn |
| |
| def _get_type(self): |
| return OrderedDict([('name', 'binary')]) |
| |
| def generate_column(self, size, name=None): |
| K = 7 |
| is_valid = self._make_is_valid(size) |
| values = [] |
| |
| for i in range(size): |
| if is_valid[i]: |
| draw = (np.random.randint(0, 255, size=K) |
| .astype(np.uint8) |
| .tostring()) |
| values.append(draw) |
| else: |
| values.append(b"") |
| |
| if name is None: |
| name = self.name |
| return self.column_class(name, size, is_valid, values) |
| |
| |
| class FixedSizeBinaryType(PrimitiveType): |
| |
| def __init__(self, name, byte_width, nullable=True): |
| super(FixedSizeBinaryType, self).__init__(name, nullable=nullable) |
| self.byte_width = byte_width |
| |
| @property |
| def numpy_type(self): |
| return object |
| |
| @property |
| def column_class(self): |
| return FixedSizeBinaryColumn |
| |
| def _get_type(self): |
| return OrderedDict([('name', 'fixedsizebinary'), |
| ('byteWidth', self.byte_width)]) |
| |
| def _get_type_layout(self): |
| return OrderedDict([ |
| ('vectors', |
| [OrderedDict([('type', 'VALIDITY'), |
| ('typeBitWidth', 1)]), |
| OrderedDict([('type', 'DATA'), |
| ('typeBitWidth', self.byte_width)])])]) |
| |
| def generate_column(self, size, name=None): |
| is_valid = self._make_is_valid(size) |
| values = [] |
| |
| for i in range(size): |
| draw = (np.random.randint(0, 255, size=self.byte_width) |
| .astype(np.uint8) |
| .tostring()) |
| values.append(draw) |
| |
| if name is None: |
| name = self.name |
| return self.column_class(name, size, is_valid, values) |
| |
| |
| class StringType(BinaryType): |
| |
| @property |
| def column_class(self): |
| return StringColumn |
| |
| def _get_type(self): |
| return OrderedDict([('name', 'utf8')]) |
| |
| def generate_column(self, size, name=None): |
| K = 7 |
| is_valid = self._make_is_valid(size) |
| values = [] |
| |
| for i in range(size): |
| if is_valid[i]: |
| values.append(tobytes(rands(K))) |
| else: |
| values.append(b"") |
| |
| if name is None: |
| name = self.name |
| return self.column_class(name, size, is_valid, values) |
| |
| |
| class JsonSchema(object): |
| |
| def __init__(self, fields): |
| self.fields = fields |
| |
| def get_json(self): |
| return OrderedDict([ |
| ('fields', [field.get_json() for field in self.fields]) |
| ]) |
| |
| |
| class BinaryColumn(PrimitiveColumn): |
| |
| def _encode_value(self, x): |
| return frombytes(binascii.hexlify(x).upper()) |
| |
| def _get_buffers(self): |
| offset = 0 |
| offsets = [0] |
| |
| data = [] |
| for i, v in enumerate(self.values): |
| if self.is_valid[i]: |
| offset += len(v) |
| else: |
| v = b"" |
| |
| offsets.append(offset) |
| data.append(self._encode_value(v)) |
| |
| return [ |
| ('VALIDITY', [int(x) for x in self.is_valid]), |
| ('OFFSET', offsets), |
| ('DATA', data) |
| ] |
| |
| |
| class FixedSizeBinaryColumn(PrimitiveColumn): |
| |
| def _encode_value(self, x): |
| return ''.join('{:02x}'.format(c).upper() for c in x) |
| |
| def _get_buffers(self): |
| data = [] |
| for i, v in enumerate(self.values): |
| data.append(self._encode_value(v)) |
| |
| return [ |
| ('VALIDITY', [int(x) for x in self.is_valid]), |
| ('DATA', data) |
| ] |
| |
| |
| class StringColumn(BinaryColumn): |
| |
| def _encode_value(self, x): |
| return frombytes(x) |
| |
| |
| class ListType(DataType): |
| |
| def __init__(self, name, value_type, nullable=True): |
| super(ListType, self).__init__(name, nullable=nullable) |
| self.value_type = value_type |
| |
| def _get_type(self): |
| return OrderedDict([ |
| ('name', 'list') |
| ]) |
| |
| def _get_children(self): |
| return [self.value_type.get_json()] |
| |
| def generate_column(self, size, name=None): |
| MAX_LIST_SIZE = 4 |
| |
| is_valid = self._make_is_valid(size) |
| list_sizes = np.random.randint(0, MAX_LIST_SIZE + 1, size=size) |
| offsets = [0] |
| |
| offset = 0 |
| for i in range(size): |
| if is_valid[i]: |
| offset += int(list_sizes[i]) |
| offsets.append(offset) |
| |
| # The offset now is the total number of elements in the child array |
| values = self.value_type.generate_column(offset) |
| |
| if name is None: |
| name = self.name |
| return ListColumn(name, size, is_valid, offsets, values) |
| |
| |
| class ListColumn(Column): |
| |
| def __init__(self, name, count, is_valid, offsets, values): |
| super(ListColumn, self).__init__(name, count) |
| self.is_valid = is_valid |
| self.offsets = offsets |
| self.values = values |
| |
| def _get_buffers(self): |
| return [ |
| ('VALIDITY', [int(v) for v in self.is_valid]), |
| ('OFFSET', list(self.offsets)) |
| ] |
| |
| def _get_children(self): |
| return [self.values.get_json()] |
| |
| |
| class MapType(DataType): |
| |
| def __init__(self, name, key_type, item_type, nullable=True, |
| keysSorted=False): |
| super(MapType, self).__init__(name, nullable=nullable) |
| |
| assert not key_type.nullable |
| self.key_type = key_type |
| self.item_type = item_type |
| self.pair_type = StructType('entries', [key_type, item_type], False) |
| self.keysSorted = keysSorted |
| |
| def _get_type(self): |
| return OrderedDict([ |
| ('name', 'map'), |
| ('keysSorted', self.keysSorted) |
| ]) |
| |
| def _get_children(self): |
| return [self.pair_type.get_json()] |
| |
| def generate_column(self, size, name=None): |
| MAX_MAP_SIZE = 4 |
| |
| is_valid = self._make_is_valid(size) |
| map_sizes = np.random.randint(0, MAX_MAP_SIZE + 1, size=size) |
| offsets = [0] |
| |
| offset = 0 |
| for i in range(size): |
| if is_valid[i]: |
| offset += int(map_sizes[i]) |
| offsets.append(offset) |
| |
| # The offset now is the total number of elements in the child array |
| pairs = self.pair_type.generate_column(offset) |
| if name is None: |
| name = self.name |
| |
| return MapColumn(name, size, is_valid, offsets, pairs) |
| |
| |
| class MapColumn(Column): |
| |
| def __init__(self, name, count, is_valid, offsets, pairs): |
| super(MapColumn, self).__init__(name, count) |
| self.is_valid = is_valid |
| self.offsets = offsets |
| self.pairs = pairs |
| |
| def _get_buffers(self): |
| return [ |
| ('VALIDITY', [int(v) for v in self.is_valid]), |
| ('OFFSET', list(self.offsets)) |
| ] |
| |
| def _get_children(self): |
| return [self.pairs.get_json()] |
| |
| |
| class FixedSizeListType(DataType): |
| |
| def __init__(self, name, value_type, list_size, nullable=True): |
| super(FixedSizeListType, self).__init__(name, nullable=nullable) |
| self.value_type = value_type |
| self.list_size = list_size |
| |
| def _get_type(self): |
| return OrderedDict([ |
| ('name', 'fixedsizelist'), |
| ('listSize', self.list_size) |
| ]) |
| |
| def _get_children(self): |
| return [self.value_type.get_json()] |
| |
| def generate_column(self, size, name=None): |
| is_valid = self._make_is_valid(size) |
| values = self.value_type.generate_column(size * self.list_size) |
| |
| if name is None: |
| name = self.name |
| return FixedSizeListColumn(name, size, is_valid, values) |
| |
| |
| class FixedSizeListColumn(Column): |
| |
| def __init__(self, name, count, is_valid, values): |
| super(FixedSizeListColumn, self).__init__(name, count) |
| self.is_valid = is_valid |
| self.values = values |
| |
| def _get_buffers(self): |
| return [ |
| ('VALIDITY', [int(v) for v in self.is_valid]) |
| ] |
| |
| def _get_children(self): |
| return [self.values.get_json()] |
| |
| |
| class StructType(DataType): |
| |
| def __init__(self, name, field_types, nullable=True): |
| super(StructType, self).__init__(name, nullable=nullable) |
| self.field_types = field_types |
| |
| def _get_type(self): |
| return OrderedDict([ |
| ('name', 'struct') |
| ]) |
| |
| def _get_children(self): |
| return [type_.get_json() for type_ in self.field_types] |
| |
| def generate_column(self, size, name=None): |
| is_valid = self._make_is_valid(size) |
| |
| field_values = [type_.generate_column(size) |
| for type_ in self.field_types] |
| if name is None: |
| name = self.name |
| return StructColumn(name, size, is_valid, field_values) |
| |
| |
| class Dictionary(object): |
| |
| def __init__(self, id_, field, values, ordered=False): |
| self.id_ = id_ |
| self.field = field |
| self.values = values |
| self.ordered = ordered |
| |
| def __len__(self): |
| return len(self.values) |
| |
| def get_json(self): |
| dummy_batch = JsonRecordBatch(len(self.values), [self.values]) |
| return OrderedDict([ |
| ('id', self.id_), |
| ('data', dummy_batch.get_json()) |
| ]) |
| |
| |
| class DictionaryType(DataType): |
| |
| def __init__(self, name, index_type, dictionary, nullable=True): |
| super(DictionaryType, self).__init__(name, nullable=nullable) |
| assert isinstance(index_type, IntegerType) |
| assert isinstance(dictionary, Dictionary) |
| |
| self.index_type = index_type |
| self.dictionary = dictionary |
| |
| def get_json(self): |
| dict_field = self.dictionary.field |
| return OrderedDict([ |
| ('name', self.name), |
| ('type', dict_field._get_type()), |
| ('nullable', self.nullable), |
| ('children', dict_field._get_children()), |
| ('dictionary', OrderedDict([ |
| ('id', self.dictionary.id_), |
| ('indexType', self.index_type._get_type()), |
| ('isOrdered', self.dictionary.ordered) |
| ])) |
| ]) |
| |
| def generate_column(self, size, name=None): |
| if name is None: |
| name = self.name |
| return self.index_type.generate_range(size, 0, len(self.dictionary), |
| name=name) |
| |
| |
| class StructColumn(Column): |
| |
| def __init__(self, name, count, is_valid, field_values): |
| super(StructColumn, self).__init__(name, count) |
| self.is_valid = is_valid |
| self.field_values = field_values |
| |
| def _get_buffers(self): |
| return [ |
| ('VALIDITY', [int(v) for v in self.is_valid]) |
| ] |
| |
| def _get_children(self): |
| return [field.get_json() for field in self.field_values] |
| |
| |
| class JsonRecordBatch(object): |
| |
| def __init__(self, count, columns): |
| self.count = count |
| self.columns = columns |
| |
| def get_json(self): |
| return OrderedDict([ |
| ('count', self.count), |
| ('columns', [col.get_json() for col in self.columns]) |
| ]) |
| |
| |
| # SKIP categories |
| SKIP_ARROW = 'arrow' |
| SKIP_FLIGHT = 'flight' |
| |
| |
| class JsonFile(object): |
| |
| def __init__(self, name, schema, batches, dictionaries=None, |
| skip=None, path=None): |
| self.name = name |
| self.schema = schema |
| self.dictionaries = dictionaries or [] |
| self.batches = batches |
| self.skip = set() |
| self.path = path |
| if skip: |
| self.skip.update(skip) |
| |
| def get_json(self): |
| entries = [ |
| ('schema', self.schema.get_json()) |
| ] |
| |
| if len(self.dictionaries) > 0: |
| entries.append(('dictionaries', |
| [dictionary.get_json() |
| for dictionary in self.dictionaries])) |
| |
| entries.append(('batches', [batch.get_json() |
| for batch in self.batches])) |
| return OrderedDict(entries) |
| |
| def write(self, path): |
| with open(path, 'wb') as f: |
| f.write(json.dumps(self.get_json(), indent=2).encode('utf-8')) |
| self.path = path |
| |
| def skip_category(self, category): |
| """Skip this test for the given category. |
| |
| Category should be SKIP_ARROW or SKIP_FLIGHT. |
| """ |
| self.skip.add(category) |
| return self |
| |
| |
| def get_field(name, type_, nullable=True): |
| if type_ == 'binary': |
| return BinaryType(name, nullable=nullable) |
| elif type_ == 'utf8': |
| return StringType(name, nullable=nullable) |
| elif type_.startswith('fixedsizebinary_'): |
| byte_width = int(type_.split('_')[1]) |
| return FixedSizeBinaryType(name, |
| byte_width=byte_width, |
| nullable=nullable) |
| |
| dtype = np.dtype(type_) |
| |
| if dtype.kind in ('i', 'u'): |
| return IntegerType(name, dtype.kind == 'i', dtype.itemsize * 8, |
| nullable=nullable) |
| elif dtype.kind == 'f': |
| return FloatingPointType(name, dtype.itemsize * 8, |
| nullable=nullable) |
| elif dtype.kind == 'b': |
| return BooleanType(name, nullable=nullable) |
| else: |
| raise TypeError(dtype) |
| |
| |
| def _generate_file(name, fields, batch_sizes, dictionaries=None, skip=None): |
| schema = JsonSchema(fields) |
| batches = [] |
| for size in batch_sizes: |
| columns = [] |
| for field in fields: |
| col = field.generate_column(size) |
| columns.append(col) |
| |
| batches.append(JsonRecordBatch(size, columns)) |
| |
| return JsonFile(name, schema, batches, dictionaries, skip=skip) |
| |
| |
| def generate_primitive_case(batch_sizes, name='primitive'): |
| types = ['bool', 'int8', 'int16', 'int32', 'int64', |
| 'uint8', 'uint16', 'uint32', 'uint64', |
| 'float32', 'float64', 'binary', 'utf8', |
| 'fixedsizebinary_19', 'fixedsizebinary_120'] |
| |
| fields = [] |
| |
| for type_ in types: |
| fields.append(get_field(type_ + "_nullable", type_, True)) |
| fields.append(get_field(type_ + "_nonnullable", type_, False)) |
| |
| return _generate_file(name, fields, batch_sizes) |
| |
| |
| def generate_decimal_case(): |
| fields = [ |
| DecimalType(name='f{}'.format(i), precision=precision, scale=2) |
| for i, precision in enumerate(range(3, 39)) |
| ] |
| |
| possible_batch_sizes = 7, 10 |
| batch_sizes = [possible_batch_sizes[i % 2] for i in range(len(fields))] |
| |
| skip = set() |
| skip.add('Go') # TODO(ARROW-3676) |
| return _generate_file('decimal', fields, batch_sizes, skip=skip) |
| |
| |
| def generate_datetime_case(): |
| fields = [ |
| DateType('f0', DateType.DAY), |
| DateType('f1', DateType.MILLISECOND), |
| TimeType('f2', 's'), |
| TimeType('f3', 'ms'), |
| TimeType('f4', 'us'), |
| TimeType('f5', 'ns'), |
| TimestampType('f6', 's'), |
| TimestampType('f7', 'ms'), |
| TimestampType('f8', 'us'), |
| TimestampType('f9', 'ns'), |
| TimestampType('f10', 'ms', tz=None), |
| TimestampType('f11', 's', tz='UTC'), |
| TimestampType('f12', 'ms', tz='US/Eastern'), |
| TimestampType('f13', 'us', tz='Europe/Paris'), |
| TimestampType('f14', 'ns', tz='US/Pacific'), |
| ] |
| |
| batch_sizes = [7, 10] |
| return _generate_file("datetime", fields, batch_sizes) |
| |
| |
| def generate_interval_case(): |
| fields = [ |
| DurationIntervalType('f1', 's'), |
| DurationIntervalType('f2', 'ms'), |
| DurationIntervalType('f3', 'us'), |
| DurationIntervalType('f4', 'ns'), |
| YearMonthIntervalType('f5'), |
| DayTimeIntervalType('f6'), |
| ] |
| |
| batch_sizes = [7, 10] |
| skip = set() |
| skip.add('JS') # TODO(ARROW-5239) |
| return _generate_file("interval", fields, batch_sizes, skip=skip) |
| |
| |
| def generate_map_case(): |
| # TODO(bkietz): separated from nested_case so it can be |
| # independently skipped, consolidate after JS supports map |
| fields = [ |
| MapType('map_nullable', get_field('key', 'utf8', False), |
| get_field('value', 'int32')), |
| ] |
| |
| batch_sizes = [7, 10] |
| skip = set() |
| skip.add('Go') # TODO(ARROW-3679) |
| return _generate_file("map", fields, batch_sizes, skip=skip) |
| |
| |
| def generate_nested_case(): |
| fields = [ |
| ListType('list_nullable', get_field('item', 'int32')), |
| FixedSizeListType('fixedsizelist_nullable', |
| get_field('item', 'int32'), 4), |
| StructType('struct_nullable', [get_field('f1', 'int32'), |
| get_field('f2', 'utf8')]), |
| |
| # TODO(wesm): this causes segfault |
| # ListType('list_nonnullable', get_field('item', 'int32'), False), |
| ] |
| |
| batch_sizes = [7, 10] |
| return _generate_file("nested", fields, batch_sizes) |
| |
| |
| def generate_dictionary_case(): |
| dict_type0 = StringType('dictionary1') |
| dict_type1 = StringType('dictionary1') |
| dict_type2 = get_field('dictionary2', 'int64') |
| |
| dict0 = Dictionary(0, dict_type0, |
| dict_type0.generate_column(10, name='DICT0')) |
| dict1 = Dictionary(1, dict_type1, |
| dict_type1.generate_column(5, name='DICT1')) |
| dict2 = Dictionary(2, dict_type2, |
| dict_type2.generate_column(50, name='DICT2')) |
| |
| fields = [ |
| DictionaryType('dict0', get_field('', 'int8'), dict0), |
| DictionaryType('dict1', get_field('', 'int32'), dict1), |
| DictionaryType('dict2', get_field('', 'int16'), dict2) |
| ] |
| skip = set() |
| skip.add('Go') # TODO(ARROW-3039) |
| batch_sizes = [7, 10] |
| return _generate_file("dictionary", fields, batch_sizes, |
| dictionaries=[dict0, dict1, dict2], |
| skip=skip) |
| |
| |
| def generate_nested_dictionary_case(): |
| str_type = StringType('str') |
| dict0 = Dictionary(0, str_type, str_type.generate_column(10, name='DICT0')) |
| |
| list_type = ListType( |
| 'list', |
| DictionaryType('str_dict', get_field('', 'int8'), dict0)) |
| dict1 = Dictionary(1, |
| list_type, |
| list_type.generate_column(30, name='DICT1')) |
| |
| struct_type = StructType('struct', [ |
| DictionaryType('str_dict_a', get_field('', 'int8'), dict0), |
| DictionaryType('str_dict_b', get_field('', 'int8'), dict0) |
| ]) |
| dict2 = Dictionary(2, |
| struct_type, |
| struct_type.generate_column(30, name='DICT2')) |
| |
| fields = [ |
| DictionaryType('list_dict', get_field('', 'int8'), dict1), |
| DictionaryType('struct_dict', get_field('', 'int8'), dict2) |
| ] |
| |
| batch_sizes = [10, 13] |
| return _generate_file("nested_dictionary", fields, batch_sizes, |
| dictionaries=[dict0, dict1, dict2]) |
| |
| |
| def get_generated_json_files(tempdir=None, flight=False): |
| tempdir = tempdir or tempfile.mkdtemp() |
| |
| def _temp_path(): |
| return |
| |
| file_objs = [ |
| generate_primitive_case([], name='primitive_no_batches'), |
| generate_primitive_case([17, 20], name='primitive'), |
| generate_primitive_case([0, 0, 0], name='primitive_zerolength'), |
| generate_decimal_case(), |
| generate_datetime_case(), |
| generate_interval_case(), |
| generate_map_case(), |
| generate_nested_case(), |
| generate_dictionary_case(), |
| generate_nested_dictionary_case().skip_category(SKIP_ARROW) |
| .skip_category(SKIP_FLIGHT), |
| ] |
| |
| if flight: |
| file_objs.append(generate_primitive_case([24 * 1024], |
| name='large_batch')) |
| |
| generated_paths = [] |
| for file_obj in file_objs: |
| out_path = os.path.join(tempdir, 'generated_' + |
| file_obj.name + '.json') |
| file_obj.write(out_path) |
| generated_paths.append(file_obj) |
| |
| return generated_paths |
| |
| |
| # ---------------------------------------------------------------------- |
| # Testing harness |
| |
| |
| class IntegrationRunner(object): |
| |
| def __init__(self, json_files, testers, args): |
| self.json_files = json_files |
| self.testers = testers |
| self.temp_dir = args.tempdir or tempfile.mkdtemp() |
| self.debug = args.debug |
| self.stop_on_error = args.stop_on_error |
| self.gold_dirs = args.gold_dirs |
| |
| def run(self): |
| failures = [] |
| |
| for producer, consumer in itertools.product( |
| filter(lambda t: t.PRODUCER, self.testers), |
| filter(lambda t: t.CONSUMER, self.testers)): |
| for failure in self._compare_implementations( |
| producer, consumer, self._produce_consume, self.json_files): |
| failures.append(failure) |
| if self.gold_dirs: |
| for gold_dir, consumer in itertools.product( |
| self.gold_dirs, |
| filter(lambda t: t.CONSUMER, self.testers)): |
| print('\n\n\n\n') |
| print('******************************************************') |
| print('Tests against golden files in {}'.format(gold_dir)) |
| print('******************************************************') |
| |
| def run_gold(producer, consumer, test_case): |
| self._run_gold(gold_dir, producer, consumer, test_case) |
| for failure in self._compare_implementations( |
| consumer, consumer, run_gold, self._gold_tests(gold_dir)): |
| failures.append(failure) |
| |
| return failures |
| |
| def run_flight(self): |
| failures = [] |
| servers = filter(lambda t: t.FLIGHT_SERVER, self.testers) |
| clients = filter(lambda t: (t.FLIGHT_CLIENT and t.CONSUMER), |
| self.testers) |
| for server, client in itertools.product(servers, clients): |
| for failure in self._compare_flight_implementations(server, |
| client): |
| failures.append(failure) |
| return failures |
| |
| def _gold_tests(self, gold_dir): |
| prefix = os.path.basename(os.path.normpath(gold_dir)) |
| SUFFIX = ".json.gz" |
| golds = [jf for jf in os.listdir(gold_dir) if jf.endswith(SUFFIX)] |
| for json_path in golds: |
| name = json_path[json_path.index('_')+1: -len(SUFFIX)] |
| base_name = prefix + "_" + name + ".gold.json" |
| out_path = os.path.join(self.temp_dir, base_name) |
| with gzip.open(os.path.join(gold_dir, json_path)) as i: |
| with open(out_path, "wb") as out: |
| out.write(i.read()) |
| |
| try: |
| skip = next(f for f in self.json_files |
| if f.name == name).skip |
| except StopIteration: |
| skip = set() |
| yield JsonFile(name, None, None, skip=skip, path=out_path) |
| |
| def _compare_implementations( |
| self, producer, consumer, run_binaries, test_cases): |
| print('##########################################################') |
| print( |
| '{0} producing, {1} consuming'.format(producer.name, consumer.name) |
| ) |
| print('##########################################################') |
| |
| for test_case in test_cases: |
| json_path = test_case.path |
| print('==========================================================') |
| print('Testing file {0}'.format(json_path)) |
| print('==========================================================') |
| |
| if producer.name in test_case.skip: |
| print('-- Skipping test because producer {0} does ' |
| 'not support'.format(producer.name)) |
| continue |
| |
| if consumer.name in test_case.skip: |
| print('-- Skipping test because consumer {0} does ' |
| 'not support'.format(consumer.name)) |
| continue |
| |
| if SKIP_ARROW in test_case.skip: |
| print('-- Skipping test') |
| continue |
| |
| try: |
| run_binaries(producer, consumer, test_case) |
| except Exception: |
| traceback.print_exc() |
| yield (test_case, producer, consumer, sys.exc_info()) |
| if self.stop_on_error: |
| break |
| else: |
| continue |
| |
| def _produce_consume(self, producer, consumer, test_case): |
| # Make the random access file |
| json_path = test_case.path |
| file_id = guid()[:8] |
| name = os.path.splitext(os.path.basename(json_path))[0] |
| |
| producer_file_path = os.path.join(self.temp_dir, file_id + '_' + |
| name + '.json_as_file') |
| producer_stream_path = os.path.join(self.temp_dir, file_id + '_' + |
| name + '.producer_file_as_stream') |
| consumer_file_path = os.path.join(self.temp_dir, file_id + '_' + |
| name + '.consumer_stream_as_file') |
| |
| print('-- Creating binary inputs') |
| producer.json_to_file(json_path, producer_file_path) |
| |
| # Validate the file |
| print('-- Validating file') |
| consumer.validate(json_path, producer_file_path) |
| |
| print('-- Validating stream') |
| producer.file_to_stream(producer_file_path, producer_stream_path) |
| consumer.stream_to_file(producer_stream_path, consumer_file_path) |
| consumer.validate(json_path, consumer_file_path) |
| |
| def _run_gold(self, gold_dir, producer, consumer, test_case): |
| json_path = test_case.path |
| |
| # Validate the file |
| print('-- Validating file') |
| producer_file_path = os.path.join( |
| gold_dir, "generated_" + test_case.name + ".arrow_file") |
| consumer.validate(json_path, producer_file_path) |
| |
| print('-- Validating stream') |
| consumer_stream_path = os.path.join( |
| gold_dir, "generated_" + test_case.name + ".stream") |
| file_id = guid()[:8] |
| name = os.path.splitext(os.path.basename(json_path))[0] |
| |
| consumer_file_path = os.path.join(self.temp_dir, file_id + '_' + |
| name + '.consumer_stream_as_file') |
| |
| consumer.stream_to_file(consumer_stream_path, consumer_file_path) |
| consumer.validate(json_path, consumer_file_path) |
| |
| def _compare_flight_implementations(self, producer, consumer): |
| print('##########################################################') |
| print( |
| '{0} serving, {1} requesting'.format(producer.name, consumer.name) |
| ) |
| print('##########################################################') |
| |
| for test_case in self.json_files: |
| json_path = test_case.path |
| print('=' * 58) |
| print('Testing file {0}'.format(json_path)) |
| print('=' * 58) |
| |
| if ('Java' in (producer.name, consumer.name) and |
| "map" in test_case.name): |
| print('TODO(ARROW-1279): Enable map tests ' + |
| ' for Java and JS once Java supports them and JS\'' + |
| ' are unbroken') |
| continue |
| |
| if SKIP_FLIGHT in test_case.skip: |
| print('-- Skipping test') |
| continue |
| |
| try: |
| with producer.flight_server(): |
| # Have the client upload the file, then download and |
| # compare |
| consumer.flight_request(producer.FLIGHT_PORT, json_path) |
| except Exception: |
| traceback.print_exc() |
| yield (test_case, producer, consumer, sys.exc_info()) |
| continue |
| |
| |
| class Tester(object): |
| PRODUCER = False |
| CONSUMER = False |
| FLIGHT_SERVER = False |
| FLIGHT_CLIENT = False |
| FLIGHT_PORT = 31337 |
| |
| def __init__(self, args): |
| self.args = args |
| self.debug = args.debug |
| |
| def run_shell_command(self, cmd): |
| cmd = ' '.join(cmd) |
| if self.debug: |
| print(cmd) |
| subprocess.check_call(cmd, shell=True) |
| |
| def json_to_file(self, json_path, arrow_path): |
| raise NotImplementedError |
| |
| def stream_to_file(self, stream_path, file_path): |
| raise NotImplementedError |
| |
| def file_to_stream(self, file_path, stream_path): |
| raise NotImplementedError |
| |
| def validate(self, json_path, arrow_path): |
| raise NotImplementedError |
| |
| def flight_server(self): |
| raise NotImplementedError |
| |
| def flight_request(self, port, json_path): |
| raise NotImplementedError |
| |
| |
| class JavaTester(Tester): |
| PRODUCER = True |
| CONSUMER = True |
| FLIGHT_SERVER = True |
| FLIGHT_CLIENT = True |
| |
| FLIGHT_PORT = 31338 |
| |
| JAVA_OPTS = ['-Dio.netty.tryReflectionSetAccessible=true'] |
| |
| _arrow_version = load_version_from_pom() |
| ARROW_TOOLS_JAR = os.environ.get( |
| 'ARROW_JAVA_INTEGRATION_JAR', |
| os.path.join(ARROW_HOME, |
| 'java/tools/target/arrow-tools-{}-' |
| 'jar-with-dependencies.jar'.format(_arrow_version))) |
| ARROW_FLIGHT_JAR = os.environ.get( |
| 'ARROW_FLIGHT_JAVA_INTEGRATION_JAR', |
| os.path.join(ARROW_HOME, |
| 'java/flight/target/arrow-flight-{}-' |
| 'jar-with-dependencies.jar'.format(_arrow_version))) |
| ARROW_FLIGHT_SERVER = ('org.apache.arrow.flight.example.integration.' |
| 'IntegrationTestServer') |
| ARROW_FLIGHT_CLIENT = ('org.apache.arrow.flight.example.integration.' |
| 'IntegrationTestClient') |
| |
| name = 'Java' |
| |
| def _run(self, arrow_path=None, json_path=None, command='VALIDATE'): |
| cmd = ['java'] + self.JAVA_OPTS + \ |
| ['-cp', self.ARROW_TOOLS_JAR, 'org.apache.arrow.tools.Integration'] |
| |
| if arrow_path is not None: |
| cmd.extend(['-a', arrow_path]) |
| |
| if json_path is not None: |
| cmd.extend(['-j', json_path]) |
| |
| cmd.extend(['-c', command]) |
| |
| if self.debug: |
| print(' '.join(cmd)) |
| |
| run_cmd(cmd) |
| |
| def validate(self, json_path, arrow_path): |
| return self._run(arrow_path, json_path, 'VALIDATE') |
| |
| def json_to_file(self, json_path, arrow_path): |
| return self._run(arrow_path, json_path, 'JSON_TO_ARROW') |
| |
| def stream_to_file(self, stream_path, file_path): |
| cmd = ['java'] + self.JAVA_OPTS + \ |
| ['-cp', self.ARROW_TOOLS_JAR, |
| 'org.apache.arrow.tools.StreamToFile', stream_path, file_path] |
| if self.debug: |
| print(' '.join(cmd)) |
| run_cmd(cmd) |
| |
| def file_to_stream(self, file_path, stream_path): |
| cmd = ['java'] + self.JAVA_OPTS + \ |
| ['-cp', self.ARROW_TOOLS_JAR, |
| 'org.apache.arrow.tools.FileToStream', file_path, stream_path] |
| if self.debug: |
| print(' '.join(cmd)) |
| run_cmd(cmd) |
| |
| def flight_request(self, port, json_path): |
| cmd = ['java'] + self.JAVA_OPTS + \ |
| ['-cp', self.ARROW_FLIGHT_JAR, self.ARROW_FLIGHT_CLIENT, |
| '-port', str(port), '-j', json_path] |
| if self.debug: |
| print(' '.join(cmd)) |
| run_cmd(cmd) |
| |
| @contextlib.contextmanager |
| def flight_server(self): |
| cmd = ['java'] + self.JAVA_OPTS + \ |
| ['-cp', self.ARROW_FLIGHT_JAR, self.ARROW_FLIGHT_SERVER, |
| '-port', str(self.FLIGHT_PORT)] |
| if self.debug: |
| print(' '.join(cmd)) |
| server = subprocess.Popen(cmd, stdout=subprocess.PIPE) |
| try: |
| output = server.stdout.readline().decode() |
| if not output.startswith("Server listening on localhost"): |
| raise RuntimeError( |
| "Flight-Java server did not start properly, output: " + |
| output) |
| yield |
| finally: |
| server.kill() |
| server.wait(5) |
| |
| |
| class CPPTester(Tester): |
| PRODUCER = True |
| CONSUMER = True |
| FLIGHT_SERVER = True |
| FLIGHT_CLIENT = True |
| |
| EXE_PATH = os.environ.get( |
| 'ARROW_CPP_EXE_PATH', |
| os.path.join(ARROW_HOME, 'cpp/build/debug')) |
| |
| CPP_INTEGRATION_EXE = os.path.join(EXE_PATH, 'arrow-json-integration-test') |
| STREAM_TO_FILE = os.path.join(EXE_PATH, 'arrow-stream-to-file') |
| FILE_TO_STREAM = os.path.join(EXE_PATH, 'arrow-file-to-stream') |
| |
| FLIGHT_PORT = 31337 |
| |
| FLIGHT_SERVER_CMD = [ |
| os.path.join(EXE_PATH, 'flight-test-integration-server'), |
| "-port", str(FLIGHT_PORT)] |
| FLIGHT_CLIENT_CMD = [ |
| os.path.join(EXE_PATH, 'flight-test-integration-client'), |
| "-host", "localhost"] |
| |
| name = 'C++' |
| |
| def _run(self, arrow_path=None, json_path=None, command='VALIDATE'): |
| cmd = [self.CPP_INTEGRATION_EXE, '--integration'] |
| |
| if arrow_path is not None: |
| cmd.append('--arrow=' + arrow_path) |
| |
| if json_path is not None: |
| cmd.append('--json=' + json_path) |
| |
| cmd.append('--mode=' + command) |
| |
| if self.debug: |
| print(' '.join(cmd)) |
| |
| run_cmd(cmd) |
| |
| def validate(self, json_path, arrow_path): |
| return self._run(arrow_path, json_path, 'VALIDATE') |
| |
| def json_to_file(self, json_path, arrow_path): |
| return self._run(arrow_path, json_path, 'JSON_TO_ARROW') |
| |
| def stream_to_file(self, stream_path, file_path): |
| cmd = ['cat', stream_path, '|', self.STREAM_TO_FILE, '>', file_path] |
| self.run_shell_command(cmd) |
| |
| def file_to_stream(self, file_path, stream_path): |
| cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path] |
| self.run_shell_command(cmd) |
| |
| @contextlib.contextmanager |
| def flight_server(self): |
| if self.debug: |
| print(' '.join(self.FLIGHT_SERVER_CMD)) |
| server = subprocess.Popen(self.FLIGHT_SERVER_CMD, |
| stdout=subprocess.PIPE) |
| try: |
| output = server.stdout.readline().decode() |
| if not output.startswith("Server listening on localhost"): |
| raise RuntimeError( |
| "Flight-C++ server did not start properly, output: " + |
| output) |
| yield |
| finally: |
| server.kill() |
| server.wait(5) |
| |
| def flight_request(self, port, json_path): |
| cmd = self.FLIGHT_CLIENT_CMD + [ |
| '-port=' + str(port), |
| '-path=' + json_path, |
| ] |
| if self.debug: |
| print(' '.join(cmd)) |
| run_cmd(cmd) |
| |
| |
| class JSTester(Tester): |
| PRODUCER = True |
| CONSUMER = True |
| |
| EXE_PATH = os.path.join(ARROW_HOME, 'js/bin') |
| VALIDATE = os.path.join(EXE_PATH, 'integration.js') |
| JSON_TO_ARROW = os.path.join(EXE_PATH, 'json-to-arrow.js') |
| STREAM_TO_FILE = os.path.join(EXE_PATH, 'stream-to-file.js') |
| FILE_TO_STREAM = os.path.join(EXE_PATH, 'file-to-stream.js') |
| |
| name = 'JS' |
| |
| def _run(self, exe_cmd, arrow_path=None, json_path=None, |
| command='VALIDATE'): |
| cmd = [exe_cmd] |
| |
| if arrow_path is not None: |
| cmd.extend(['-a', arrow_path]) |
| |
| if json_path is not None: |
| cmd.extend(['-j', json_path]) |
| |
| cmd.extend(['--mode', command]) |
| |
| if self.debug: |
| print(' '.join(cmd)) |
| |
| run_cmd(cmd) |
| |
| def validate(self, json_path, arrow_path): |
| return self._run(self.VALIDATE, arrow_path, json_path, 'VALIDATE') |
| |
| def json_to_file(self, json_path, arrow_path): |
| cmd = ['node', |
| '--no-warnings', self.JSON_TO_ARROW, |
| '-a', arrow_path, |
| '-j', json_path] |
| self.run_shell_command(cmd) |
| |
| def stream_to_file(self, stream_path, file_path): |
| cmd = ['cat', stream_path, '|', |
| 'node', '--no-warnings', self.STREAM_TO_FILE, '>', |
| file_path] |
| self.run_shell_command(cmd) |
| |
| def file_to_stream(self, file_path, stream_path): |
| cmd = ['cat', file_path, '|', |
| 'node', '--no-warnings', self.FILE_TO_STREAM, '>', |
| stream_path] |
| self.run_shell_command(cmd) |
| |
| |
| class GoTester(Tester): |
| PRODUCER = True |
| CONSUMER = True |
| |
| # FIXME(sbinet): revisit for Go modules |
| GOPATH = os.getenv('GOPATH', '~/go') |
| GOBIN = os.environ.get('GOBIN', os.path.join(GOPATH, 'bin')) |
| |
| GO_INTEGRATION_EXE = os.path.join(GOBIN, 'arrow-json-integration-test') |
| STREAM_TO_FILE = os.path.join(GOBIN, 'arrow-stream-to-file') |
| FILE_TO_STREAM = os.path.join(GOBIN, 'arrow-file-to-stream') |
| |
| name = 'Go' |
| |
| def _run(self, arrow_path=None, json_path=None, command='VALIDATE'): |
| cmd = [self.GO_INTEGRATION_EXE] |
| |
| if arrow_path is not None: |
| cmd.extend(['-arrow', arrow_path]) |
| |
| if json_path is not None: |
| cmd.extend(['-json', json_path]) |
| |
| cmd.extend(['-mode', command]) |
| |
| if self.debug: |
| print(' '.join(cmd)) |
| |
| run_cmd(cmd) |
| |
| def validate(self, json_path, arrow_path): |
| return self._run(arrow_path, json_path, 'VALIDATE') |
| |
| def json_to_file(self, json_path, arrow_path): |
| return self._run(arrow_path, json_path, 'JSON_TO_ARROW') |
| |
| def stream_to_file(self, stream_path, file_path): |
| cmd = ['cat', stream_path, '|', self.STREAM_TO_FILE, '>', file_path] |
| self.run_shell_command(cmd) |
| |
| def file_to_stream(self, file_path, stream_path): |
| cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path] |
| self.run_shell_command(cmd) |
| |
| |
| def get_static_json_files(): |
| glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json') |
| return [JsonFile(name=os.path.basename(p), path=p, skip=set(), |
| schema=None, batches=None) |
| for p in glob.glob(glob_pattern)] |
| |
| |
| def run_all_tests(args): |
| testers = [] |
| |
| if args.enable_cpp: |
| testers.append(CPPTester(args)) |
| |
| if args.enable_java: |
| testers.append(JavaTester(args)) |
| |
| if args.enable_js: |
| testers.append(JSTester(args)) |
| |
| if args.enable_go: |
| testers.append(GoTester(args)) |
| |
| static_json_files = get_static_json_files() |
| generated_json_files = get_generated_json_files(tempdir=args.tempdir, |
| flight=args.run_flight) |
| json_files = static_json_files + generated_json_files |
| |
| runner = IntegrationRunner(json_files, testers, args) |
| failures = [] |
| failures.extend(runner.run()) |
| if args.run_flight: |
| failures.extend(runner.run_flight()) |
| |
| fail_count = 0 |
| if failures: |
| print("################# FAILURES #################") |
| for test_case, producer, consumer, exc_info in failures: |
| fail_count += 1 |
| print("FAILED TEST:", end=" ") |
| print(test_case.name, producer.name, "producing, ", |
| consumer.name, "consuming") |
| if exc_info: |
| traceback.print_exception(*exc_info) |
| print() |
| |
| print(fail_count, "failures") |
| if fail_count > 0: |
| sys.exit(1) |
| |
| |
| def write_js_test_json(directory): |
| generate_map_case().write(os.path.join(directory, 'map.json')) |
| generate_nested_case().write(os.path.join(directory, 'nested.json')) |
| generate_decimal_case().write(os.path.join(directory, 'decimal.json')) |
| generate_datetime_case().write(os.path.join(directory, 'datetime.json')) |
| (generate_dictionary_case() |
| .write(os.path.join(directory, 'dictionary.json'))) |
| (generate_primitive_case([]) |
| .write(os.path.join(directory, 'primitive_no_batches.json'))) |
| (generate_primitive_case([7, 10]) |
| .write(os.path.join(directory, 'primitive.json'))) |
| (generate_primitive_case([0, 0, 0]) |
| .write(os.path.join(directory, 'primitive-empty.json'))) |
| |
| |
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser(description='Arrow integration test CLI') |
| |
| parser.add_argument('--enable-c++', dest='enable_cpp', |
| action='store', type=int, default=1, |
| help='Include C++ in integration tests') |
| parser.add_argument('--enable-java', dest='enable_java', |
| action='store', type=int, default=1, |
| help='Include Java in integration tests') |
| parser.add_argument('--enable-js', dest='enable_js', |
| action='store', type=int, default=1, |
| help='Include JavaScript in integration tests') |
| parser.add_argument('--enable-go', dest='enable_go', |
| action='store', type=int, default=1, |
| help='Include Go in integration tests') |
| |
| parser.add_argument('--write_generated_json', dest='generated_json_path', |
| action='store', default=False, |
| help='Generate test JSON') |
| parser.add_argument('--run_flight', dest='run_flight', |
| action='store_true', default=False, |
| help='Run Flight integration tests') |
| parser.add_argument('--debug', dest='debug', action='store_true', |
| default=False, |
| help='Run executables in debug mode as relevant') |
| parser.add_argument('--tempdir', dest='tempdir', |
| default=tempfile.mkdtemp(), |
| help=('Directory to use for writing ' |
| 'integration test temporary files')) |
| |
| parser.add_argument('-x', '--stop-on-error', dest='stop_on_error', |
| action='store_true', default=False, |
| help='Stop on first error') |
| |
| parser.add_argument('--gold_dirs', action='append', |
| help="gold integration test file paths") |
| |
| args = parser.parse_args() |
| if args.generated_json_path: |
| try: |
| os.makedirs(args.generated_json_path) |
| except OSError as e: |
| if e.errno != errno.EEXIST: |
| raise |
| write_js_test_json(args.generated_json_path) |
| else: |
| run_all_tests(args) |