blob: c168f7728085dfdf1dd6b364f8e2618d0a342779 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
/** A utility class that converts Spark values to Iceberg's internal representation. */
public class SparkValueConverter {
private SparkValueConverter() {}
public static Record convert(Schema schema, Row row) {
return convert(schema.asStruct(), row);
}
public static Object convert(Type type, Object object) {
if (object == null) {
return null;
}
switch (type.typeId()) {
case STRUCT:
return convert(type.asStructType(), (Row) object);
case LIST:
List<Object> convertedList = Lists.newArrayList();
List<?> list = (List<?>) object;
for (Object element : list) {
convertedList.add(convert(type.asListType().elementType(), element));
}
return convertedList;
case MAP:
Map<Object, Object> convertedMap = Maps.newLinkedHashMap();
Map<?, ?> map = (Map<?, ?>) object;
for (Map.Entry<?, ?> entry : map.entrySet()) {
convertedMap.put(
convert(type.asMapType().keyType(), entry.getKey()),
convert(type.asMapType().valueType(), entry.getValue()));
}
return convertedMap;
case DATE:
// if spark.sql.datetime.java8API.enabled is set to true, java.time.LocalDate
// for Spark SQL DATE type otherwise java.sql.Date is returned.
return DateTimeUtils.anyToDays(object);
case TIMESTAMP:
// if spark.sql.datetime.java8API.enabled is set to true, java.time.Instant
// for Spark SQL TIMESTAMP type is returned otherwise java.sql.Timestamp is returned.
return DateTimeUtils.anyToMicros(object);
case BINARY:
return ByteBuffer.wrap((byte[]) object);
case INTEGER:
return ((Number) object).intValue();
case BOOLEAN:
case LONG:
case FLOAT:
case DOUBLE:
case DECIMAL:
case STRING:
case FIXED:
return object;
default:
throw new UnsupportedOperationException("Not a supported type: " + type);
}
}
private static Record convert(Types.StructType struct, Row row) {
if (row == null) {
return null;
}
Record record = GenericRecord.create(struct);
List<Types.NestedField> fields = struct.fields();
for (int i = 0; i < fields.size(); i += 1) {
Types.NestedField field = fields.get(i);
Type fieldType = field.type();
switch (fieldType.typeId()) {
case STRUCT:
record.set(i, convert(fieldType.asStructType(), row.getStruct(i)));
break;
case LIST:
record.set(i, convert(fieldType.asListType(), row.getList(i)));
break;
case MAP:
record.set(i, convert(fieldType.asMapType(), row.getJavaMap(i)));
break;
default:
record.set(i, convert(fieldType, row.get(i)));
}
}
return record;
}
public static Object convertToSpark(Type type, Object object) {
if (object == null) {
return null;
}
switch (type.typeId()) {
case STRUCT:
case LIST:
case MAP:
return new UnsupportedOperationException("Complex types currently not supported");
case DATE:
return DateTimeUtils.daysToLocalDate((int) object);
case TIMESTAMP:
Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType();
if (ts.shouldAdjustToUTC()) {
return DateTimeUtils.microsToInstant((long) object);
} else {
return DateTimeUtils.microsToLocalDateTime((long) object);
}
case BINARY:
return ByteBuffers.toByteArray((ByteBuffer) object);
case INTEGER:
case BOOLEAN:
case LONG:
case FLOAT:
case DOUBLE:
case DECIMAL:
case STRING:
case FIXED:
return object;
default:
throw new UnsupportedOperationException("Not a supported type: " + type);
}
}
}