blob: 8c0423358ab26aa06406d8911e5f279f5505bd38 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from collections import OrderedDict
import argparse
import binascii
import contextlib
import glob
import gzip
import itertools
import json
import os
import random
import six
import string
import subprocess
import sys
import tempfile
import traceback
import uuid
import errno
import numpy as np
ARROW_HOME = os.path.abspath(__file__).rsplit("/", 2)[0]
# Control for flakiness
np.random.seed(12345)
def load_version_from_pom():
import xml.etree.ElementTree as ET
tree = ET.parse(os.path.join(ARROW_HOME, 'java', 'pom.xml'))
tag_pattern = '{http://maven.apache.org/POM/4.0.0}version'
version_tag = list(tree.getroot().findall(tag_pattern))[0]
return version_tag.text
def guid():
return uuid.uuid4().hex
# from pandas
RANDS_CHARS = np.array(list(string.ascii_letters + string.digits),
dtype=(np.str_, 1))
def rands(nchars):
"""
Generate one random byte string.
See `rands_array` if you want to create an array of random strings.
"""
return ''.join(np.random.choice(RANDS_CHARS, nchars))
def tobytes(o):
if isinstance(o, six.text_type):
return o.encode('utf8')
return o
def frombytes(o):
if isinstance(o, six.binary_type):
return o.decode('utf8')
return o
# from the merge_arrow_pr.py script
def run_cmd(cmd):
if isinstance(cmd, six.string_types):
cmd = cmd.split(' ')
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
# this avoids hiding the stdout / stderr of failed processes
print('Command failed: %s' % ' '.join(cmd))
print('With output:')
print('--------------')
print(frombytes(e.output))
print('--------------')
raise e
return frombytes(output)
# ----------------------------------------------------------------------
# Data generation
class DataType(object):
def __init__(self, name, nullable=True):
self.name = name
self.nullable = nullable
def get_json(self):
return OrderedDict([
('name', self.name),
('type', self._get_type()),
('nullable', self.nullable),
('children', self._get_children())
])
def _make_is_valid(self, size):
if self.nullable:
return np.random.randint(0, 2, size=size)
else:
return np.ones(size)
class Column(object):
def __init__(self, name, count):
self.name = name
self.count = count
def __len__(self):
return self.count
def _get_children(self):
return []
def _get_buffers(self):
return []
def get_json(self):
entries = [
('name', self.name),
('count', self.count)
]
buffers = self._get_buffers()
entries.extend(buffers)
children = self._get_children()
if len(children) > 0:
entries.append(('children', children))
return OrderedDict(entries)
class PrimitiveType(DataType):
def _get_children(self):
return []
class PrimitiveColumn(Column):
def __init__(self, name, count, is_valid, values):
super(PrimitiveColumn, self).__init__(name, count)
self.is_valid = is_valid
self.values = values
def _encode_value(self, x):
return x
def _get_buffers(self):
return [
('VALIDITY', [int(v) for v in self.is_valid]),
('DATA', list([self._encode_value(x) for x in self.values]))
]
TEST_INT_MAX = 2 ** 31 - 1
TEST_INT_MIN = ~TEST_INT_MAX
class IntegerType(PrimitiveType):
def __init__(self, name, is_signed, bit_width, nullable=True,
min_value=TEST_INT_MIN,
max_value=TEST_INT_MAX):
super(IntegerType, self).__init__(name, nullable=nullable)
self.is_signed = is_signed
self.bit_width = bit_width
self.min_value = min_value
self.max_value = max_value
def _get_generated_data_bounds(self):
if self.is_signed:
signed_iinfo = np.iinfo('int' + str(self.bit_width))
min_value, max_value = signed_iinfo.min, signed_iinfo.max
else:
unsigned_iinfo = np.iinfo('uint' + str(self.bit_width))
min_value, max_value = 0, unsigned_iinfo.max
lower_bound = max(min_value, self.min_value)
upper_bound = min(max_value, self.max_value)
return lower_bound, upper_bound
def _get_type(self):
return OrderedDict([
('name', 'int'),
('isSigned', self.is_signed),
('bitWidth', self.bit_width)
])
def generate_column(self, size, name=None):
lower_bound, upper_bound = self._get_generated_data_bounds()
return self.generate_range(size, lower_bound, upper_bound, name=name)
def generate_range(self, size, lower, upper, name=None):
values = [int(x) for x in
np.random.randint(lower, upper, size=size)]
is_valid = self._make_is_valid(size)
if name is None:
name = self.name
return PrimitiveColumn(name, size, is_valid, values)
class DateType(IntegerType):
DAY = 0
MILLISECOND = 1
# 1/1/1 to 12/31/9999
_ranges = {
DAY: [-719162, 2932896],
MILLISECOND: [-62135596800000, 253402214400000]
}
def __init__(self, name, unit, nullable=True):
bit_width = 32 if unit == self.DAY else 64
min_value, max_value = self._ranges[unit]
super(DateType, self).__init__(
name, True, bit_width, nullable=nullable,
min_value=min_value, max_value=max_value
)
self.unit = unit
def _get_type(self):
return OrderedDict([
('name', 'date'),
('unit', 'DAY' if self.unit == self.DAY else 'MILLISECOND')
])
TIMEUNIT_NAMES = {
's': 'SECOND',
'ms': 'MILLISECOND',
'us': 'MICROSECOND',
'ns': 'NANOSECOND'
}
class TimeType(IntegerType):
BIT_WIDTHS = {
's': 32,
'ms': 32,
'us': 64,
'ns': 64
}
_ranges = {
's': [0, 86400],
'ms': [0, 86400000],
'us': [0, 86400000000],
'ns': [0, 86400000000000]
}
def __init__(self, name, unit='s', nullable=True):
min_val, max_val = self._ranges[unit]
super(TimeType, self).__init__(name, True, self.BIT_WIDTHS[unit],
nullable=nullable,
min_value=min_val,
max_value=max_val)
self.unit = unit
def _get_type(self):
return OrderedDict([
('name', 'time'),
('unit', TIMEUNIT_NAMES[self.unit]),
('bitWidth', self.bit_width)
])
class TimestampType(IntegerType):
# 1/1/1 to 12/31/9999
_ranges = {
's': [-62135596800, 253402214400],
'ms': [-62135596800000, 253402214400000],
'us': [-62135596800000000, 253402214400000000],
# Physical range for int64, ~584 years and change
'ns': [np.iinfo('int64').min, np.iinfo('int64').max]
}
def __init__(self, name, unit='s', tz=None, nullable=True):
min_val, max_val = self._ranges[unit]
super(TimestampType, self).__init__(name, True, 64, nullable=nullable,
min_value=min_val,
max_value=max_val)
self.unit = unit
self.tz = tz
def _get_type(self):
fields = [
('name', 'timestamp'),
('unit', TIMEUNIT_NAMES[self.unit])
]
if self.tz is not None:
fields.append(('timezone', self.tz))
return OrderedDict(fields)
class DurationIntervalType(IntegerType):
def __init__(self, name, unit='s', nullable=True):
min_val, max_val = np.iinfo('int64').min, np.iinfo('int64').max,
super(DurationIntervalType, self).__init__(
name, True, 64, nullable=nullable,
min_value=min_val,
max_value=max_val)
self.unit = unit
def _get_type(self):
fields = [
('name', 'duration'),
('unit', TIMEUNIT_NAMES[self.unit])
]
return OrderedDict(fields)
class YearMonthIntervalType(IntegerType):
def __init__(self, name, nullable=True):
min_val, max_val = [-10000*12, 10000*12] # +/- 10000 years.
super(YearMonthIntervalType, self).__init__(
name, True, 32, nullable=nullable,
min_value=min_val,
max_value=max_val)
def _get_type(self):
fields = [
('name', 'interval'),
('unit', 'YEAR_MONTH'),
]
return OrderedDict(fields)
class DayTimeIntervalType(PrimitiveType):
def __init__(self, name, nullable=True):
super(DayTimeIntervalType, self).__init__(name, nullable=True)
@property
def numpy_type(self):
return object
def _get_type(self):
return OrderedDict([
('name', 'interval'),
('unit', 'DAY_TIME'),
])
def generate_column(self, size, name=None):
min_day_value, max_day_value = -10000*366, 10000*366
values = [{'days': random.randint(min_day_value, max_day_value),
'milliseconds': random.randint(-86400000, +86400000)}
for _ in range(size)]
is_valid = self._make_is_valid(size)
if name is None:
name = self.name
return PrimitiveColumn(name, size, is_valid, values)
class FloatingPointType(PrimitiveType):
def __init__(self, name, bit_width, nullable=True):
super(FloatingPointType, self).__init__(name, nullable=nullable)
self.bit_width = bit_width
self.precision = {
16: 'HALF',
32: 'SINGLE',
64: 'DOUBLE'
}[self.bit_width]
@property
def numpy_type(self):
return 'float' + str(self.bit_width)
def _get_type(self):
return OrderedDict([
('name', 'floatingpoint'),
('precision', self.precision)
])
def generate_column(self, size, name=None):
values = np.random.randn(size) * 1000
values = np.round(values, 3)
is_valid = self._make_is_valid(size)
if name is None:
name = self.name
return PrimitiveColumn(name, size, is_valid, values)
DECIMAL_PRECISION_TO_VALUE = {
key: (1 << (8 * i - 1)) - 1 for i, key in enumerate(
[1, 3, 5, 7, 10, 12, 15, 17, 19, 22, 24, 27, 29, 32, 34, 36],
start=1,
)
}
def decimal_range_from_precision(precision):
assert 1 <= precision <= 38
try:
max_value = DECIMAL_PRECISION_TO_VALUE[precision]
except KeyError:
return decimal_range_from_precision(precision - 1)
else:
return ~max_value, max_value
class DecimalType(PrimitiveType):
def __init__(self, name, precision, scale, bit_width=128, nullable=True):
super(DecimalType, self).__init__(name, nullable=True)
self.precision = precision
self.scale = scale
self.bit_width = bit_width
@property
def numpy_type(self):
return object
def _get_type(self):
return OrderedDict([
('name', 'decimal'),
('precision', self.precision),
('scale', self.scale),
])
def generate_column(self, size, name=None):
min_value, max_value = decimal_range_from_precision(self.precision)
values = [random.randint(min_value, max_value) for _ in range(size)]
is_valid = self._make_is_valid(size)
if name is None:
name = self.name
return DecimalColumn(name, size, is_valid, values, self.bit_width)
class DecimalColumn(PrimitiveColumn):
def __init__(self, name, count, is_valid, values, bit_width=128):
super(DecimalColumn, self).__init__(name, count, is_valid, values)
self.bit_width = bit_width
def _encode_value(self, x):
return str(x)
class BooleanType(PrimitiveType):
bit_width = 1
def _get_type(self):
return OrderedDict([('name', 'bool')])
@property
def numpy_type(self):
return 'bool'
def generate_column(self, size, name=None):
values = list(map(bool, np.random.randint(0, 2, size=size)))
is_valid = self._make_is_valid(size)
if name is None:
name = self.name
return PrimitiveColumn(name, size, is_valid, values)
class BinaryType(PrimitiveType):
@property
def numpy_type(self):
return object
@property
def column_class(self):
return BinaryColumn
def _get_type(self):
return OrderedDict([('name', 'binary')])
def generate_column(self, size, name=None):
K = 7
is_valid = self._make_is_valid(size)
values = []
for i in range(size):
if is_valid[i]:
draw = (np.random.randint(0, 255, size=K)
.astype(np.uint8)
.tostring())
values.append(draw)
else:
values.append(b"")
if name is None:
name = self.name
return self.column_class(name, size, is_valid, values)
class FixedSizeBinaryType(PrimitiveType):
def __init__(self, name, byte_width, nullable=True):
super(FixedSizeBinaryType, self).__init__(name, nullable=nullable)
self.byte_width = byte_width
@property
def numpy_type(self):
return object
@property
def column_class(self):
return FixedSizeBinaryColumn
def _get_type(self):
return OrderedDict([('name', 'fixedsizebinary'),
('byteWidth', self.byte_width)])
def _get_type_layout(self):
return OrderedDict([
('vectors',
[OrderedDict([('type', 'VALIDITY'),
('typeBitWidth', 1)]),
OrderedDict([('type', 'DATA'),
('typeBitWidth', self.byte_width)])])])
def generate_column(self, size, name=None):
is_valid = self._make_is_valid(size)
values = []
for i in range(size):
draw = (np.random.randint(0, 255, size=self.byte_width)
.astype(np.uint8)
.tostring())
values.append(draw)
if name is None:
name = self.name
return self.column_class(name, size, is_valid, values)
class StringType(BinaryType):
@property
def column_class(self):
return StringColumn
def _get_type(self):
return OrderedDict([('name', 'utf8')])
def generate_column(self, size, name=None):
K = 7
is_valid = self._make_is_valid(size)
values = []
for i in range(size):
if is_valid[i]:
values.append(tobytes(rands(K)))
else:
values.append(b"")
if name is None:
name = self.name
return self.column_class(name, size, is_valid, values)
class JsonSchema(object):
def __init__(self, fields):
self.fields = fields
def get_json(self):
return OrderedDict([
('fields', [field.get_json() for field in self.fields])
])
class BinaryColumn(PrimitiveColumn):
def _encode_value(self, x):
return frombytes(binascii.hexlify(x).upper())
def _get_buffers(self):
offset = 0
offsets = [0]
data = []
for i, v in enumerate(self.values):
if self.is_valid[i]:
offset += len(v)
else:
v = b""
offsets.append(offset)
data.append(self._encode_value(v))
return [
('VALIDITY', [int(x) for x in self.is_valid]),
('OFFSET', offsets),
('DATA', data)
]
class FixedSizeBinaryColumn(PrimitiveColumn):
def _encode_value(self, x):
return ''.join('{:02x}'.format(c).upper() for c in x)
def _get_buffers(self):
data = []
for i, v in enumerate(self.values):
data.append(self._encode_value(v))
return [
('VALIDITY', [int(x) for x in self.is_valid]),
('DATA', data)
]
class StringColumn(BinaryColumn):
def _encode_value(self, x):
return frombytes(x)
class ListType(DataType):
def __init__(self, name, value_type, nullable=True):
super(ListType, self).__init__(name, nullable=nullable)
self.value_type = value_type
def _get_type(self):
return OrderedDict([
('name', 'list')
])
def _get_children(self):
return [self.value_type.get_json()]
def generate_column(self, size, name=None):
MAX_LIST_SIZE = 4
is_valid = self._make_is_valid(size)
list_sizes = np.random.randint(0, MAX_LIST_SIZE + 1, size=size)
offsets = [0]
offset = 0
for i in range(size):
if is_valid[i]:
offset += int(list_sizes[i])
offsets.append(offset)
# The offset now is the total number of elements in the child array
values = self.value_type.generate_column(offset)
if name is None:
name = self.name
return ListColumn(name, size, is_valid, offsets, values)
class ListColumn(Column):
def __init__(self, name, count, is_valid, offsets, values):
super(ListColumn, self).__init__(name, count)
self.is_valid = is_valid
self.offsets = offsets
self.values = values
def _get_buffers(self):
return [
('VALIDITY', [int(v) for v in self.is_valid]),
('OFFSET', list(self.offsets))
]
def _get_children(self):
return [self.values.get_json()]
class MapType(DataType):
def __init__(self, name, key_type, item_type, nullable=True,
keysSorted=False):
super(MapType, self).__init__(name, nullable=nullable)
assert not key_type.nullable
self.key_type = key_type
self.item_type = item_type
self.pair_type = StructType('entries', [key_type, item_type], False)
self.keysSorted = keysSorted
def _get_type(self):
return OrderedDict([
('name', 'map'),
('keysSorted', self.keysSorted)
])
def _get_children(self):
return [self.pair_type.get_json()]
def generate_column(self, size, name=None):
MAX_MAP_SIZE = 4
is_valid = self._make_is_valid(size)
map_sizes = np.random.randint(0, MAX_MAP_SIZE + 1, size=size)
offsets = [0]
offset = 0
for i in range(size):
if is_valid[i]:
offset += int(map_sizes[i])
offsets.append(offset)
# The offset now is the total number of elements in the child array
pairs = self.pair_type.generate_column(offset)
if name is None:
name = self.name
return MapColumn(name, size, is_valid, offsets, pairs)
class MapColumn(Column):
def __init__(self, name, count, is_valid, offsets, pairs):
super(MapColumn, self).__init__(name, count)
self.is_valid = is_valid
self.offsets = offsets
self.pairs = pairs
def _get_buffers(self):
return [
('VALIDITY', [int(v) for v in self.is_valid]),
('OFFSET', list(self.offsets))
]
def _get_children(self):
return [self.pairs.get_json()]
class FixedSizeListType(DataType):
def __init__(self, name, value_type, list_size, nullable=True):
super(FixedSizeListType, self).__init__(name, nullable=nullable)
self.value_type = value_type
self.list_size = list_size
def _get_type(self):
return OrderedDict([
('name', 'fixedsizelist'),
('listSize', self.list_size)
])
def _get_children(self):
return [self.value_type.get_json()]
def generate_column(self, size, name=None):
is_valid = self._make_is_valid(size)
values = self.value_type.generate_column(size * self.list_size)
if name is None:
name = self.name
return FixedSizeListColumn(name, size, is_valid, values)
class FixedSizeListColumn(Column):
def __init__(self, name, count, is_valid, values):
super(FixedSizeListColumn, self).__init__(name, count)
self.is_valid = is_valid
self.values = values
def _get_buffers(self):
return [
('VALIDITY', [int(v) for v in self.is_valid])
]
def _get_children(self):
return [self.values.get_json()]
class StructType(DataType):
def __init__(self, name, field_types, nullable=True):
super(StructType, self).__init__(name, nullable=nullable)
self.field_types = field_types
def _get_type(self):
return OrderedDict([
('name', 'struct')
])
def _get_children(self):
return [type_.get_json() for type_ in self.field_types]
def generate_column(self, size, name=None):
is_valid = self._make_is_valid(size)
field_values = [type_.generate_column(size)
for type_ in self.field_types]
if name is None:
name = self.name
return StructColumn(name, size, is_valid, field_values)
class Dictionary(object):
def __init__(self, id_, field, values, ordered=False):
self.id_ = id_
self.field = field
self.values = values
self.ordered = ordered
def __len__(self):
return len(self.values)
def get_json(self):
dummy_batch = JsonRecordBatch(len(self.values), [self.values])
return OrderedDict([
('id', self.id_),
('data', dummy_batch.get_json())
])
class DictionaryType(DataType):
def __init__(self, name, index_type, dictionary, nullable=True):
super(DictionaryType, self).__init__(name, nullable=nullable)
assert isinstance(index_type, IntegerType)
assert isinstance(dictionary, Dictionary)
self.index_type = index_type
self.dictionary = dictionary
def get_json(self):
dict_field = self.dictionary.field
return OrderedDict([
('name', self.name),
('type', dict_field._get_type()),
('nullable', self.nullable),
('children', dict_field._get_children()),
('dictionary', OrderedDict([
('id', self.dictionary.id_),
('indexType', self.index_type._get_type()),
('isOrdered', self.dictionary.ordered)
]))
])
def generate_column(self, size, name=None):
if name is None:
name = self.name
return self.index_type.generate_range(size, 0, len(self.dictionary),
name=name)
class StructColumn(Column):
def __init__(self, name, count, is_valid, field_values):
super(StructColumn, self).__init__(name, count)
self.is_valid = is_valid
self.field_values = field_values
def _get_buffers(self):
return [
('VALIDITY', [int(v) for v in self.is_valid])
]
def _get_children(self):
return [field.get_json() for field in self.field_values]
class JsonRecordBatch(object):
def __init__(self, count, columns):
self.count = count
self.columns = columns
def get_json(self):
return OrderedDict([
('count', self.count),
('columns', [col.get_json() for col in self.columns])
])
# SKIP categories
SKIP_ARROW = 'arrow'
SKIP_FLIGHT = 'flight'
class JsonFile(object):
def __init__(self, name, schema, batches, dictionaries=None,
skip=None, path=None):
self.name = name
self.schema = schema
self.dictionaries = dictionaries or []
self.batches = batches
self.skip = set()
self.path = path
if skip:
self.skip.update(skip)
def get_json(self):
entries = [
('schema', self.schema.get_json())
]
if len(self.dictionaries) > 0:
entries.append(('dictionaries',
[dictionary.get_json()
for dictionary in self.dictionaries]))
entries.append(('batches', [batch.get_json()
for batch in self.batches]))
return OrderedDict(entries)
def write(self, path):
with open(path, 'wb') as f:
f.write(json.dumps(self.get_json(), indent=2).encode('utf-8'))
self.path = path
def skip_category(self, category):
"""Skip this test for the given category.
Category should be SKIP_ARROW or SKIP_FLIGHT.
"""
self.skip.add(category)
return self
def get_field(name, type_, nullable=True):
if type_ == 'binary':
return BinaryType(name, nullable=nullable)
elif type_ == 'utf8':
return StringType(name, nullable=nullable)
elif type_.startswith('fixedsizebinary_'):
byte_width = int(type_.split('_')[1])
return FixedSizeBinaryType(name,
byte_width=byte_width,
nullable=nullable)
dtype = np.dtype(type_)
if dtype.kind in ('i', 'u'):
return IntegerType(name, dtype.kind == 'i', dtype.itemsize * 8,
nullable=nullable)
elif dtype.kind == 'f':
return FloatingPointType(name, dtype.itemsize * 8,
nullable=nullable)
elif dtype.kind == 'b':
return BooleanType(name, nullable=nullable)
else:
raise TypeError(dtype)
def _generate_file(name, fields, batch_sizes, dictionaries=None, skip=None):
schema = JsonSchema(fields)
batches = []
for size in batch_sizes:
columns = []
for field in fields:
col = field.generate_column(size)
columns.append(col)
batches.append(JsonRecordBatch(size, columns))
return JsonFile(name, schema, batches, dictionaries, skip=skip)
def generate_primitive_case(batch_sizes, name='primitive'):
types = ['bool', 'int8', 'int16', 'int32', 'int64',
'uint8', 'uint16', 'uint32', 'uint64',
'float32', 'float64', 'binary', 'utf8',
'fixedsizebinary_19', 'fixedsizebinary_120']
fields = []
for type_ in types:
fields.append(get_field(type_ + "_nullable", type_, True))
fields.append(get_field(type_ + "_nonnullable", type_, False))
return _generate_file(name, fields, batch_sizes)
def generate_decimal_case():
fields = [
DecimalType(name='f{}'.format(i), precision=precision, scale=2)
for i, precision in enumerate(range(3, 39))
]
possible_batch_sizes = 7, 10
batch_sizes = [possible_batch_sizes[i % 2] for i in range(len(fields))]
skip = set()
skip.add('Go') # TODO(ARROW-3676)
return _generate_file('decimal', fields, batch_sizes, skip=skip)
def generate_datetime_case():
fields = [
DateType('f0', DateType.DAY),
DateType('f1', DateType.MILLISECOND),
TimeType('f2', 's'),
TimeType('f3', 'ms'),
TimeType('f4', 'us'),
TimeType('f5', 'ns'),
TimestampType('f6', 's'),
TimestampType('f7', 'ms'),
TimestampType('f8', 'us'),
TimestampType('f9', 'ns'),
TimestampType('f10', 'ms', tz=None),
TimestampType('f11', 's', tz='UTC'),
TimestampType('f12', 'ms', tz='US/Eastern'),
TimestampType('f13', 'us', tz='Europe/Paris'),
TimestampType('f14', 'ns', tz='US/Pacific'),
]
batch_sizes = [7, 10]
return _generate_file("datetime", fields, batch_sizes)
def generate_interval_case():
fields = [
DurationIntervalType('f1', 's'),
DurationIntervalType('f2', 'ms'),
DurationIntervalType('f3', 'us'),
DurationIntervalType('f4', 'ns'),
YearMonthIntervalType('f5'),
DayTimeIntervalType('f6'),
]
batch_sizes = [7, 10]
skip = set()
skip.add('JS') # TODO(ARROW-5239)
return _generate_file("interval", fields, batch_sizes, skip=skip)
def generate_map_case():
# TODO(bkietz): separated from nested_case so it can be
# independently skipped, consolidate after JS supports map
fields = [
MapType('map_nullable', get_field('key', 'utf8', False),
get_field('value', 'int32')),
]
batch_sizes = [7, 10]
skip = set()
skip.add('Go') # TODO(ARROW-3679)
return _generate_file("map", fields, batch_sizes, skip=skip)
def generate_nested_case():
fields = [
ListType('list_nullable', get_field('item', 'int32')),
FixedSizeListType('fixedsizelist_nullable',
get_field('item', 'int32'), 4),
StructType('struct_nullable', [get_field('f1', 'int32'),
get_field('f2', 'utf8')]),
# TODO(wesm): this causes segfault
# ListType('list_nonnullable', get_field('item', 'int32'), False),
]
batch_sizes = [7, 10]
return _generate_file("nested", fields, batch_sizes)
def generate_dictionary_case():
dict_type0 = StringType('dictionary1')
dict_type1 = StringType('dictionary1')
dict_type2 = get_field('dictionary2', 'int64')
dict0 = Dictionary(0, dict_type0,
dict_type0.generate_column(10, name='DICT0'))
dict1 = Dictionary(1, dict_type1,
dict_type1.generate_column(5, name='DICT1'))
dict2 = Dictionary(2, dict_type2,
dict_type2.generate_column(50, name='DICT2'))
fields = [
DictionaryType('dict0', get_field('', 'int8'), dict0),
DictionaryType('dict1', get_field('', 'int32'), dict1),
DictionaryType('dict2', get_field('', 'int16'), dict2)
]
skip = set()
skip.add('Go') # TODO(ARROW-3039)
batch_sizes = [7, 10]
return _generate_file("dictionary", fields, batch_sizes,
dictionaries=[dict0, dict1, dict2],
skip=skip)
def generate_nested_dictionary_case():
str_type = StringType('str')
dict0 = Dictionary(0, str_type, str_type.generate_column(10, name='DICT0'))
list_type = ListType(
'list',
DictionaryType('str_dict', get_field('', 'int8'), dict0))
dict1 = Dictionary(1,
list_type,
list_type.generate_column(30, name='DICT1'))
struct_type = StructType('struct', [
DictionaryType('str_dict_a', get_field('', 'int8'), dict0),
DictionaryType('str_dict_b', get_field('', 'int8'), dict0)
])
dict2 = Dictionary(2,
struct_type,
struct_type.generate_column(30, name='DICT2'))
fields = [
DictionaryType('list_dict', get_field('', 'int8'), dict1),
DictionaryType('struct_dict', get_field('', 'int8'), dict2)
]
batch_sizes = [10, 13]
return _generate_file("nested_dictionary", fields, batch_sizes,
dictionaries=[dict0, dict1, dict2])
def get_generated_json_files(tempdir=None, flight=False):
tempdir = tempdir or tempfile.mkdtemp()
def _temp_path():
return
file_objs = [
generate_primitive_case([], name='primitive_no_batches'),
generate_primitive_case([17, 20], name='primitive'),
generate_primitive_case([0, 0, 0], name='primitive_zerolength'),
generate_decimal_case(),
generate_datetime_case(),
generate_interval_case(),
generate_map_case(),
generate_nested_case(),
generate_dictionary_case(),
generate_nested_dictionary_case().skip_category(SKIP_ARROW)
.skip_category(SKIP_FLIGHT),
]
if flight:
file_objs.append(generate_primitive_case([24 * 1024],
name='large_batch'))
generated_paths = []
for file_obj in file_objs:
out_path = os.path.join(tempdir, 'generated_' +
file_obj.name + '.json')
file_obj.write(out_path)
generated_paths.append(file_obj)
return generated_paths
# ----------------------------------------------------------------------
# Testing harness
class IntegrationRunner(object):
def __init__(self, json_files, testers, args):
self.json_files = json_files
self.testers = testers
self.temp_dir = args.tempdir or tempfile.mkdtemp()
self.debug = args.debug
self.stop_on_error = args.stop_on_error
self.gold_dirs = args.gold_dirs
def run(self):
failures = []
for producer, consumer in itertools.product(
filter(lambda t: t.PRODUCER, self.testers),
filter(lambda t: t.CONSUMER, self.testers)):
for failure in self._compare_implementations(
producer, consumer, self._produce_consume, self.json_files):
failures.append(failure)
if self.gold_dirs:
for gold_dir, consumer in itertools.product(
self.gold_dirs,
filter(lambda t: t.CONSUMER, self.testers)):
print('\n\n\n\n')
print('******************************************************')
print('Tests against golden files in {}'.format(gold_dir))
print('******************************************************')
def run_gold(producer, consumer, test_case):
self._run_gold(gold_dir, producer, consumer, test_case)
for failure in self._compare_implementations(
consumer, consumer, run_gold, self._gold_tests(gold_dir)):
failures.append(failure)
return failures
def run_flight(self):
failures = []
servers = filter(lambda t: t.FLIGHT_SERVER, self.testers)
clients = filter(lambda t: (t.FLIGHT_CLIENT and t.CONSUMER),
self.testers)
for server, client in itertools.product(servers, clients):
for failure in self._compare_flight_implementations(server,
client):
failures.append(failure)
return failures
def _gold_tests(self, gold_dir):
prefix = os.path.basename(os.path.normpath(gold_dir))
SUFFIX = ".json.gz"
golds = [jf for jf in os.listdir(gold_dir) if jf.endswith(SUFFIX)]
for json_path in golds:
name = json_path[json_path.index('_')+1: -len(SUFFIX)]
base_name = prefix + "_" + name + ".gold.json"
out_path = os.path.join(self.temp_dir, base_name)
with gzip.open(os.path.join(gold_dir, json_path)) as i:
with open(out_path, "wb") as out:
out.write(i.read())
try:
skip = next(f for f in self.json_files
if f.name == name).skip
except StopIteration:
skip = set()
yield JsonFile(name, None, None, skip=skip, path=out_path)
def _compare_implementations(
self, producer, consumer, run_binaries, test_cases):
print('##########################################################')
print(
'{0} producing, {1} consuming'.format(producer.name, consumer.name)
)
print('##########################################################')
for test_case in test_cases:
json_path = test_case.path
print('==========================================================')
print('Testing file {0}'.format(json_path))
print('==========================================================')
if producer.name in test_case.skip:
print('-- Skipping test because producer {0} does '
'not support'.format(producer.name))
continue
if consumer.name in test_case.skip:
print('-- Skipping test because consumer {0} does '
'not support'.format(consumer.name))
continue
if SKIP_ARROW in test_case.skip:
print('-- Skipping test')
continue
try:
run_binaries(producer, consumer, test_case)
except Exception:
traceback.print_exc()
yield (test_case, producer, consumer, sys.exc_info())
if self.stop_on_error:
break
else:
continue
def _produce_consume(self, producer, consumer, test_case):
# Make the random access file
json_path = test_case.path
file_id = guid()[:8]
name = os.path.splitext(os.path.basename(json_path))[0]
producer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.json_as_file')
producer_stream_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.producer_file_as_stream')
consumer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.consumer_stream_as_file')
print('-- Creating binary inputs')
producer.json_to_file(json_path, producer_file_path)
# Validate the file
print('-- Validating file')
consumer.validate(json_path, producer_file_path)
print('-- Validating stream')
producer.file_to_stream(producer_file_path, producer_stream_path)
consumer.stream_to_file(producer_stream_path, consumer_file_path)
consumer.validate(json_path, consumer_file_path)
def _run_gold(self, gold_dir, producer, consumer, test_case):
json_path = test_case.path
# Validate the file
print('-- Validating file')
producer_file_path = os.path.join(
gold_dir, "generated_" + test_case.name + ".arrow_file")
consumer.validate(json_path, producer_file_path)
print('-- Validating stream')
consumer_stream_path = os.path.join(
gold_dir, "generated_" + test_case.name + ".stream")
file_id = guid()[:8]
name = os.path.splitext(os.path.basename(json_path))[0]
consumer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.consumer_stream_as_file')
consumer.stream_to_file(consumer_stream_path, consumer_file_path)
consumer.validate(json_path, consumer_file_path)
def _compare_flight_implementations(self, producer, consumer):
print('##########################################################')
print(
'{0} serving, {1} requesting'.format(producer.name, consumer.name)
)
print('##########################################################')
for test_case in self.json_files:
json_path = test_case.path
print('=' * 58)
print('Testing file {0}'.format(json_path))
print('=' * 58)
if ('Java' in (producer.name, consumer.name) and
"map" in test_case.name):
print('TODO(ARROW-1279): Enable map tests ' +
' for Java and JS once Java supports them and JS\'' +
' are unbroken')
continue
if SKIP_FLIGHT in test_case.skip:
print('-- Skipping test')
continue
try:
with producer.flight_server():
# Have the client upload the file, then download and
# compare
consumer.flight_request(producer.FLIGHT_PORT, json_path)
except Exception:
traceback.print_exc()
yield (test_case, producer, consumer, sys.exc_info())
continue
class Tester(object):
PRODUCER = False
CONSUMER = False
FLIGHT_SERVER = False
FLIGHT_CLIENT = False
FLIGHT_PORT = 31337
def __init__(self, args):
self.args = args
self.debug = args.debug
def run_shell_command(self, cmd):
cmd = ' '.join(cmd)
if self.debug:
print(cmd)
subprocess.check_call(cmd, shell=True)
def json_to_file(self, json_path, arrow_path):
raise NotImplementedError
def stream_to_file(self, stream_path, file_path):
raise NotImplementedError
def file_to_stream(self, file_path, stream_path):
raise NotImplementedError
def validate(self, json_path, arrow_path):
raise NotImplementedError
def flight_server(self):
raise NotImplementedError
def flight_request(self, port, json_path):
raise NotImplementedError
class JavaTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
FLIGHT_PORT = 31338
JAVA_OPTS = ['-Dio.netty.tryReflectionSetAccessible=true']
_arrow_version = load_version_from_pom()
ARROW_TOOLS_JAR = os.environ.get(
'ARROW_JAVA_INTEGRATION_JAR',
os.path.join(ARROW_HOME,
'java/tools/target/arrow-tools-{}-'
'jar-with-dependencies.jar'.format(_arrow_version)))
ARROW_FLIGHT_JAR = os.environ.get(
'ARROW_FLIGHT_JAVA_INTEGRATION_JAR',
os.path.join(ARROW_HOME,
'java/flight/target/arrow-flight-{}-'
'jar-with-dependencies.jar'.format(_arrow_version)))
ARROW_FLIGHT_SERVER = ('org.apache.arrow.flight.example.integration.'
'IntegrationTestServer')
ARROW_FLIGHT_CLIENT = ('org.apache.arrow.flight.example.integration.'
'IntegrationTestClient')
name = 'Java'
def _run(self, arrow_path=None, json_path=None, command='VALIDATE'):
cmd = ['java'] + self.JAVA_OPTS + \
['-cp', self.ARROW_TOOLS_JAR, 'org.apache.arrow.tools.Integration']
if arrow_path is not None:
cmd.extend(['-a', arrow_path])
if json_path is not None:
cmd.extend(['-j', json_path])
cmd.extend(['-c', command])
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)
def validate(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'VALIDATE')
def json_to_file(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
def stream_to_file(self, stream_path, file_path):
cmd = ['java'] + self.JAVA_OPTS + \
['-cp', self.ARROW_TOOLS_JAR,
'org.apache.arrow.tools.StreamToFile', stream_path, file_path]
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)
def file_to_stream(self, file_path, stream_path):
cmd = ['java'] + self.JAVA_OPTS + \
['-cp', self.ARROW_TOOLS_JAR,
'org.apache.arrow.tools.FileToStream', file_path, stream_path]
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)
def flight_request(self, port, json_path):
cmd = ['java'] + self.JAVA_OPTS + \
['-cp', self.ARROW_FLIGHT_JAR, self.ARROW_FLIGHT_CLIENT,
'-port', str(port), '-j', json_path]
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)
@contextlib.contextmanager
def flight_server(self):
cmd = ['java'] + self.JAVA_OPTS + \
['-cp', self.ARROW_FLIGHT_JAR, self.ARROW_FLIGHT_SERVER,
'-port', str(self.FLIGHT_PORT)]
if self.debug:
print(' '.join(cmd))
server = subprocess.Popen(cmd, stdout=subprocess.PIPE)
try:
output = server.stdout.readline().decode()
if not output.startswith("Server listening on localhost"):
raise RuntimeError(
"Flight-Java server did not start properly, output: " +
output)
yield
finally:
server.kill()
server.wait(5)
class CPPTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
EXE_PATH = os.environ.get(
'ARROW_CPP_EXE_PATH',
os.path.join(ARROW_HOME, 'cpp/build/debug'))
CPP_INTEGRATION_EXE = os.path.join(EXE_PATH, 'arrow-json-integration-test')
STREAM_TO_FILE = os.path.join(EXE_PATH, 'arrow-stream-to-file')
FILE_TO_STREAM = os.path.join(EXE_PATH, 'arrow-file-to-stream')
FLIGHT_PORT = 31337
FLIGHT_SERVER_CMD = [
os.path.join(EXE_PATH, 'flight-test-integration-server'),
"-port", str(FLIGHT_PORT)]
FLIGHT_CLIENT_CMD = [
os.path.join(EXE_PATH, 'flight-test-integration-client'),
"-host", "localhost"]
name = 'C++'
def _run(self, arrow_path=None, json_path=None, command='VALIDATE'):
cmd = [self.CPP_INTEGRATION_EXE, '--integration']
if arrow_path is not None:
cmd.append('--arrow=' + arrow_path)
if json_path is not None:
cmd.append('--json=' + json_path)
cmd.append('--mode=' + command)
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)
def validate(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'VALIDATE')
def json_to_file(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
def stream_to_file(self, stream_path, file_path):
cmd = ['cat', stream_path, '|', self.STREAM_TO_FILE, '>', file_path]
self.run_shell_command(cmd)
def file_to_stream(self, file_path, stream_path):
cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path]
self.run_shell_command(cmd)
@contextlib.contextmanager
def flight_server(self):
if self.debug:
print(' '.join(self.FLIGHT_SERVER_CMD))
server = subprocess.Popen(self.FLIGHT_SERVER_CMD,
stdout=subprocess.PIPE)
try:
output = server.stdout.readline().decode()
if not output.startswith("Server listening on localhost"):
raise RuntimeError(
"Flight-C++ server did not start properly, output: " +
output)
yield
finally:
server.kill()
server.wait(5)
def flight_request(self, port, json_path):
cmd = self.FLIGHT_CLIENT_CMD + [
'-port=' + str(port),
'-path=' + json_path,
]
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)
class JSTester(Tester):
PRODUCER = True
CONSUMER = True
EXE_PATH = os.path.join(ARROW_HOME, 'js/bin')
VALIDATE = os.path.join(EXE_PATH, 'integration.js')
JSON_TO_ARROW = os.path.join(EXE_PATH, 'json-to-arrow.js')
STREAM_TO_FILE = os.path.join(EXE_PATH, 'stream-to-file.js')
FILE_TO_STREAM = os.path.join(EXE_PATH, 'file-to-stream.js')
name = 'JS'
def _run(self, exe_cmd, arrow_path=None, json_path=None,
command='VALIDATE'):
cmd = [exe_cmd]
if arrow_path is not None:
cmd.extend(['-a', arrow_path])
if json_path is not None:
cmd.extend(['-j', json_path])
cmd.extend(['--mode', command])
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)
def validate(self, json_path, arrow_path):
return self._run(self.VALIDATE, arrow_path, json_path, 'VALIDATE')
def json_to_file(self, json_path, arrow_path):
cmd = ['node',
'--no-warnings', self.JSON_TO_ARROW,
'-a', arrow_path,
'-j', json_path]
self.run_shell_command(cmd)
def stream_to_file(self, stream_path, file_path):
cmd = ['cat', stream_path, '|',
'node', '--no-warnings', self.STREAM_TO_FILE, '>',
file_path]
self.run_shell_command(cmd)
def file_to_stream(self, file_path, stream_path):
cmd = ['cat', file_path, '|',
'node', '--no-warnings', self.FILE_TO_STREAM, '>',
stream_path]
self.run_shell_command(cmd)
class GoTester(Tester):
PRODUCER = True
CONSUMER = True
# FIXME(sbinet): revisit for Go modules
GOPATH = os.getenv('GOPATH', '~/go')
GOBIN = os.environ.get('GOBIN', os.path.join(GOPATH, 'bin'))
GO_INTEGRATION_EXE = os.path.join(GOBIN, 'arrow-json-integration-test')
STREAM_TO_FILE = os.path.join(GOBIN, 'arrow-stream-to-file')
FILE_TO_STREAM = os.path.join(GOBIN, 'arrow-file-to-stream')
name = 'Go'
def _run(self, arrow_path=None, json_path=None, command='VALIDATE'):
cmd = [self.GO_INTEGRATION_EXE]
if arrow_path is not None:
cmd.extend(['-arrow', arrow_path])
if json_path is not None:
cmd.extend(['-json', json_path])
cmd.extend(['-mode', command])
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)
def validate(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'VALIDATE')
def json_to_file(self, json_path, arrow_path):
return self._run(arrow_path, json_path, 'JSON_TO_ARROW')
def stream_to_file(self, stream_path, file_path):
cmd = ['cat', stream_path, '|', self.STREAM_TO_FILE, '>', file_path]
self.run_shell_command(cmd)
def file_to_stream(self, file_path, stream_path):
cmd = [self.FILE_TO_STREAM, file_path, '>', stream_path]
self.run_shell_command(cmd)
def get_static_json_files():
glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json')
return [JsonFile(name=os.path.basename(p), path=p, skip=set(),
schema=None, batches=None)
for p in glob.glob(glob_pattern)]
def run_all_tests(args):
testers = []
if args.enable_cpp:
testers.append(CPPTester(args))
if args.enable_java:
testers.append(JavaTester(args))
if args.enable_js:
testers.append(JSTester(args))
if args.enable_go:
testers.append(GoTester(args))
static_json_files = get_static_json_files()
generated_json_files = get_generated_json_files(tempdir=args.tempdir,
flight=args.run_flight)
json_files = static_json_files + generated_json_files
runner = IntegrationRunner(json_files, testers, args)
failures = []
failures.extend(runner.run())
if args.run_flight:
failures.extend(runner.run_flight())
fail_count = 0
if failures:
print("################# FAILURES #################")
for test_case, producer, consumer, exc_info in failures:
fail_count += 1
print("FAILED TEST:", end=" ")
print(test_case.name, producer.name, "producing, ",
consumer.name, "consuming")
if exc_info:
traceback.print_exception(*exc_info)
print()
print(fail_count, "failures")
if fail_count > 0:
sys.exit(1)
def write_js_test_json(directory):
generate_map_case().write(os.path.join(directory, 'map.json'))
generate_nested_case().write(os.path.join(directory, 'nested.json'))
generate_decimal_case().write(os.path.join(directory, 'decimal.json'))
generate_datetime_case().write(os.path.join(directory, 'datetime.json'))
(generate_dictionary_case()
.write(os.path.join(directory, 'dictionary.json')))
(generate_primitive_case([])
.write(os.path.join(directory, 'primitive_no_batches.json')))
(generate_primitive_case([7, 10])
.write(os.path.join(directory, 'primitive.json')))
(generate_primitive_case([0, 0, 0])
.write(os.path.join(directory, 'primitive-empty.json')))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Arrow integration test CLI')
parser.add_argument('--enable-c++', dest='enable_cpp',
action='store', type=int, default=1,
help='Include C++ in integration tests')
parser.add_argument('--enable-java', dest='enable_java',
action='store', type=int, default=1,
help='Include Java in integration tests')
parser.add_argument('--enable-js', dest='enable_js',
action='store', type=int, default=1,
help='Include JavaScript in integration tests')
parser.add_argument('--enable-go', dest='enable_go',
action='store', type=int, default=1,
help='Include Go in integration tests')
parser.add_argument('--write_generated_json', dest='generated_json_path',
action='store', default=False,
help='Generate test JSON')
parser.add_argument('--run_flight', dest='run_flight',
action='store_true', default=False,
help='Run Flight integration tests')
parser.add_argument('--debug', dest='debug', action='store_true',
default=False,
help='Run executables in debug mode as relevant')
parser.add_argument('--tempdir', dest='tempdir',
default=tempfile.mkdtemp(),
help=('Directory to use for writing '
'integration test temporary files'))
parser.add_argument('-x', '--stop-on-error', dest='stop_on_error',
action='store_true', default=False,
help='Stop on first error')
parser.add_argument('--gold_dirs', action='append',
help="gold integration test file paths")
args = parser.parse_args()
if args.generated_json_path:
try:
os.makedirs(args.generated_json_path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
write_js_test_json(args.generated_json_path)
else:
run_all_tests(args)