blob: 0ccae0f95abe5423dae441407bc1d9e2b3b1da84 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
def create_concat_record_reader(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.concat_record_reader import ConcatRecordReader
reader_class = j_reader.getClass()
queue_field = reader_class.getDeclaredField("queue")
queue_field.setAccessible(True)
j_supplier_queue = queue_field.get(j_reader)
return ConcatRecordReader(converter, j_supplier_queue)
def create_data_file_record_reader(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.data_file_record_reader import DataFileRecordReader
reader_class = j_reader.getClass()
wrapped_reader_field = reader_class.getDeclaredField("reader")
wrapped_reader_field.setAccessible(True)
j_wrapped_reader = wrapped_reader_field.get(j_reader)
wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
index_mapping_field = reader_class.getDeclaredField("indexMapping")
index_mapping_field.setAccessible(True)
index_mapping = index_mapping_field.get(j_reader)
partition_info_field = reader_class.getDeclaredField("partitionInfo")
partition_info_field.setAccessible(True)
j_partition_info = partition_info_field.get(j_reader)
partition_info = convert_partition_info(j_partition_info)
return DataFileRecordReader(wrapped_reader, index_mapping, partition_info)
def create_filter_reader(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.filter_record_reader import FilterRecordReader
reader_class = j_reader.getClass()
wrapped_reader_field = reader_class.getDeclaredField("val$thisReader")
wrapped_reader_field.setAccessible(True)
j_wrapped_reader = wrapped_reader_field.get(j_reader)
wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
if primary_keys is not None:
return FilterRecordReader(wrapped_reader, predicate)
else:
return wrapped_reader
def create_pyarrow_reader_for_parquet(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.pyarrow_dataset_reader import PyArrowDatasetReader
reader_class = j_reader.getClass()
factory_field = reader_class.getDeclaredField("this$0")
factory_field.setAccessible(True)
j_factory = factory_field.get(j_reader)
factory_class = j_factory.getClass()
batch_size_field = factory_class.getDeclaredField("batchSize")
batch_size_field.setAccessible(True)
batch_size = batch_size_field.get(j_factory)
file_reader_field = reader_class.getDeclaredField("reader")
file_reader_field.setAccessible(True)
j_file_reader = file_reader_field.get(j_reader)
file_reader_class = j_file_reader.getClass()
input_file_field = file_reader_class.getDeclaredField("file")
input_file_field.setAccessible(True)
j_input_file = input_file_field.get(j_file_reader)
file_path = j_input_file.getPath().toUri().toString()
fields_field = reader_class.getDeclaredField("fields")
fields_field.setAccessible(True)
fields = fields_field.get(j_reader)
if fields is not None:
fields = [str(field.getDescriptor().getPrimitiveType().getName()) for field in fields]
return PyArrowDatasetReader('parquet', file_path, batch_size, projection,
predicate, primary_keys, fields)
def create_pyarrow_reader_for_orc(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.pyarrow_dataset_reader import PyArrowDatasetReader
reader_class = j_reader.getClass()
file_reader_field = reader_class.getDeclaredField("orcReader")
file_reader_field.setAccessible(True)
j_file_reader = file_reader_field.get(j_reader)
file_reader_class = j_file_reader.getClass()
path_field = file_reader_class.getDeclaredField("path")
path_field.setAccessible(True)
j_path = path_field.get(j_file_reader)
file_path = j_path.toUri().toString()
# TODO: Temporarily hard-coded to 1024 as we cannot reflectively obtain this value yet
batch_size = 1024
return PyArrowDatasetReader('orc', file_path, batch_size, projection, predicate, primary_keys, None)
def create_avro_format_reader(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.avro_format_reader import AvroFormatReader
reader_class = j_reader.getClass()
path_field = reader_class.getDeclaredField("filePath")
path_field.setAccessible(True)
j_path = path_field.get(j_reader)
file_path = j_path.toUri().toString()
# TODO: Temporarily hard-coded to 1024 as we cannot reflectively obtain this value yet
batch_size = 1024
return AvroFormatReader(file_path, batch_size, None)
def create_key_value_unwrap_reader(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.key_value_unwrap_reader import KeyValueUnwrapReader
reader_class = j_reader.getClass()
wrapped_reader_field = reader_class.getDeclaredField("val$reader")
wrapped_reader_field.setAccessible(True)
j_wrapped_reader = wrapped_reader_field.get(j_reader)
wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
return KeyValueUnwrapReader(wrapped_reader)
def create_transform_reader(j_reader, converter, predicate, projection, primary_keys, partition_keys):
reader_class = j_reader.getClass()
wrapped_reader_field = reader_class.getDeclaredField("val$thisReader")
wrapped_reader_field.setAccessible(True)
j_wrapped_reader = wrapped_reader_field.get(j_reader)
# TODO: implement projectKey and projectOuter
return converter.convert_java_reader(j_wrapped_reader)
def create_drop_delete_reader(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.drop_delete_reader import DropDeleteReader
reader_class = j_reader.getClass()
wrapped_reader_field = reader_class.getDeclaredField("reader")
wrapped_reader_field.setAccessible(True)
j_wrapped_reader = wrapped_reader_field.get(j_reader)
wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
return DropDeleteReader(wrapped_reader)
def create_sort_merge_reader_minhep(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.sort_merge_reader import SortMergeReader
j_reader_class = j_reader.getClass()
batch_readers_field = j_reader_class.getDeclaredField("nextBatchReaders")
batch_readers_field.setAccessible(True)
j_batch_readers = batch_readers_field.get(j_reader)
readers = []
for next_reader in j_batch_readers:
readers.append(converter.convert_java_reader(next_reader))
return SortMergeReader(readers, primary_keys, partition_keys)
def create_sort_merge_reader_loser_tree(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.sort_merge_reader import SortMergeReader
j_reader_class = j_reader.getClass()
loser_tree_field = j_reader_class.getDeclaredField("loserTree")
loser_tree_field.setAccessible(True)
j_loser_tree = loser_tree_field.get(j_reader)
j_loser_tree_class = j_loser_tree.getClass()
leaves_field = j_loser_tree_class.getDeclaredField("leaves")
leaves_field.setAccessible(True)
j_leaves = leaves_field.get(j_loser_tree)
readers = []
for j_leaf in j_leaves:
j_leaf_class = j_leaf.getClass()
j_leaf_reader_field = j_leaf_class.getDeclaredField("reader")
j_leaf_reader_field.setAccessible(True)
j_leaf_reader = j_leaf_reader_field.get(j_leaf)
readers.append(converter.convert_java_reader(j_leaf_reader))
return SortMergeReader(readers, primary_keys, partition_keys)
def create_key_value_wrap_record_reader(j_reader, converter, predicate, projection, primary_keys, partition_keys):
from pypaimon.pynative.reader.key_value_wrap_reader import KeyValueWrapReader
reader_class = j_reader.getClass()
wrapped_reader_field = reader_class.getDeclaredField("reader")
wrapped_reader_field.setAccessible(True)
j_wrapped_reader = wrapped_reader_field.get(j_reader)
wrapped_reader = converter.convert_java_reader(j_wrapped_reader)
level_field = reader_class.getDeclaredField("level")
level_field.setAccessible(True)
level = level_field.get(j_reader)
serializer_field = reader_class.getDeclaredField("serializer")
serializer_field.setAccessible(True)
j_serializer = serializer_field.get(j_reader)
serializer_class = j_serializer.getClass()
key_arity_field = serializer_class.getDeclaredField("keyArity")
key_arity_field.setAccessible(True)
key_arity = key_arity_field.get(j_serializer)
reused_value_field = serializer_class.getDeclaredField("reusedValue")
reused_value_field.setAccessible(True)
j_reused_value = reused_value_field.get(j_serializer)
offset_row_class = j_reused_value.getClass()
arity_field = offset_row_class.getDeclaredField("arity")
arity_field.setAccessible(True)
value_arity = arity_field.get(j_reused_value)
return KeyValueWrapReader(wrapped_reader, level, key_arity, value_arity)
def convert_partition_info(j_partition_info):
if j_partition_info is None:
return None
partition_info_class = j_partition_info.getClass()
map_field = partition_info_class.getDeclaredField("map")
map_field.setAccessible(True)
j_mapping = map_field.get(j_partition_info)
mapping = list(j_mapping) if j_mapping is not None else []
partition_field = partition_info_class.getDeclaredField("partition")
partition_field.setAccessible(True)
j_binary_row = partition_field.get(j_partition_info)
partition_type_field = partition_info_class.getDeclaredField("partitionType")
partition_type_field.setAccessible(True)
j_partition_type = partition_type_field.get(j_partition_info)
partition_values = []
if j_binary_row is not None and j_partition_type is not None:
field_count = j_binary_row.getFieldCount()
for i in range(field_count):
if j_binary_row.isNullAt(i):
partition_values.append(None)
else:
field_type = j_partition_type.getTypeAt(i)
type_info = field_type.getTypeRoot().toString()
if "INTEGER" in type_info:
partition_values.append(j_binary_row.getInt(i))
elif "BIGINT" in type_info:
partition_values.append(j_binary_row.getLong(i))
elif "VARCHAR" in type_info or "CHAR" in type_info:
binary_string = j_binary_row.getString(i)
partition_values.append(str(binary_string) if binary_string is not None else None)
elif "BOOLEAN" in type_info:
partition_values.append(j_binary_row.getBoolean(i))
elif "DOUBLE" in type_info:
partition_values.append(j_binary_row.getDouble(i))
elif "FLOAT" in type_info:
partition_values.append(j_binary_row.getFloat(i))
elif "DATE" in type_info:
partition_values.append(j_binary_row.getInt(i)) # Date stored as int
elif "TIMESTAMP" in type_info:
timestamp = j_binary_row.getTimestamp(i, 3) # precision=3 for millis
partition_values.append(timestamp.getMillisecond() if timestamp is not None else None)
else:
try:
partition_values.append(str(j_binary_row.getString(i) or ""))
except:
partition_values.append(None)
from pypaimon.pynative.reader.data_file_record_reader import PartitionInfo
return PartitionInfo(mapping, partition_values)