blob: d005eec44092fc94e994b503eb3362fffd264a15 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Row;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask;
import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask;
import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.writeNullMask;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Serializer for {@link Row}.
*/
@Internal
public final class RowSerializer extends TypeSerializer<Row> {
private static final long serialVersionUID = 1L;
private final TypeSerializer<Object>[] fieldSerializers;
private final int arity;
private transient boolean[] nullMask;
@SuppressWarnings("unchecked")
public RowSerializer(TypeSerializer<?>[] fieldSerializers) {
this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers);
this.arity = fieldSerializers.length;
this.nullMask = new boolean[fieldSerializers.length];
}
/**
* Returns the serializers for the fields in the row.
*
* @return The serializers for the fields in the row.
*/
public TypeSerializer<?>[] getFieldSerializers() {
return fieldSerializers;
}
@Override
public boolean isImmutableType() {
return false;
}
@Override
public TypeSerializer<Row> duplicate() {
TypeSerializer<?>[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length];
for (int i = 0; i < fieldSerializers.length; i++) {
duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
}
return new RowSerializer(duplicateFieldSerializers);
}
@Override
public Row createInstance() {
return new Row(fieldSerializers.length);
}
@Override
public Row copy(Row from) {
int len = fieldSerializers.length;
if (from.getArity() != len) {
throw new RuntimeException("Row arity of from does not match serializers.");
}
Row result = new Row(len);
for (int i = 0; i < len; i++) {
Object fromField = from.getField(i);
if (fromField != null) {
Object copy = fieldSerializers[i].copy(fromField);
result.setField(i, copy);
} else {
result.setField(i, null);
}
}
return result;
}
@Override
public Row copy(Row from, Row reuse) {
int len = fieldSerializers.length;
// cannot reuse, do a non-reuse copy
if (reuse == null) {
return copy(from);
}
if (from.getArity() != len || reuse.getArity() != len) {
throw new RuntimeException(
"Row arity of reuse or from is incompatible with this RowSerializer.");
}
for (int i = 0; i < len; i++) {
Object fromField = from.getField(i);
if (fromField != null) {
Object reuseField = reuse.getField(i);
if (reuseField != null) {
Object copy = fieldSerializers[i].copy(fromField, reuseField);
reuse.setField(i, copy);
} else {
Object copy = fieldSerializers[i].copy(fromField);
reuse.setField(i, copy);
}
} else {
reuse.setField(i, null);
}
}
return reuse;
}
@Override
public int getLength() {
return -1;
}
public int getArity() {
return arity;
}
@Override
public void serialize(Row record, DataOutputView target) throws IOException {
int len = fieldSerializers.length;
if (record.getArity() != len) {
throw new RuntimeException("Row arity of from does not match serializers.");
}
// write a null mask
writeNullMask(len, record, target);
// serialize non-null fields
for (int i = 0; i < len; i++) {
Object o = record.getField(i);
if (o != null) {
fieldSerializers[i].serialize(o, target);
}
}
}
@Override
public Row deserialize(DataInputView source) throws IOException {
int len = fieldSerializers.length;
Row result = new Row(len);
// read null mask
readIntoNullMask(len, source, nullMask);
for (int i = 0; i < len; i++) {
if (nullMask[i]) {
result.setField(i, null);
} else {
result.setField(i, fieldSerializers[i].deserialize(source));
}
}
return result;
}
@Override
public Row deserialize(Row reuse, DataInputView source) throws IOException {
int len = fieldSerializers.length;
if (reuse.getArity() != len) {
throw new RuntimeException("Row arity of from does not match serializers.");
}
// read null mask
readIntoNullMask(len, source, nullMask);
for (int i = 0; i < len; i++) {
if (nullMask[i]) {
reuse.setField(i, null);
} else {
Object reuseField = reuse.getField(i);
if (reuseField != null) {
reuse.setField(i, fieldSerializers[i].deserialize(reuseField, source));
} else {
reuse.setField(i, fieldSerializers[i].deserialize(source));
}
}
}
return reuse;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
int len = fieldSerializers.length;
// copy null mask
readIntoAndCopyNullMask(len, source, target, nullMask);
for (int i = 0; i < len; i++) {
if (!nullMask[i]) {
fieldSerializers[i].copy(source, target);
}
}
}
@Override
public boolean equals(Object obj) {
if (canEqual(obj)) {
RowSerializer other = (RowSerializer) obj;
if (this.fieldSerializers.length == other.fieldSerializers.length) {
for (int i = 0; i < this.fieldSerializers.length; i++) {
if (!this.fieldSerializers[i].equals(other.fieldSerializers[i])) {
return false;
}
}
return true;
}
}
return false;
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof RowSerializer;
}
@Override
public int hashCode() {
return Arrays.hashCode(fieldSerializers);
}
// --------------------------------------------------------------------------------------------
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
this.nullMask = new boolean[fieldSerializers.length];
}
// --------------------------------------------------------------------------------------------
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------
@Override
public RowSerializerConfigSnapshot snapshotConfiguration() {
return new RowSerializerConfigSnapshot(fieldSerializers);
}
@Override
public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof RowSerializerConfigSnapshot) {
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousFieldSerializersAndConfigs =
((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) {
boolean requireMigration = false;
TypeSerializer<?>[] convertDeserializers = new TypeSerializer<?>[fieldSerializers.length];
CompatibilityResult<?> compatResult;
int i = 0;
for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> f : previousFieldSerializersAndConfigs) {
compatResult = CompatibilityUtil.resolveCompatibilityResult(
f.f0,
UnloadableDummyTypeSerializer.class,
f.f1,
fieldSerializers[i]);
if (compatResult.isRequiresMigration()) {
requireMigration = true;
if (compatResult.getConvertDeserializer() == null) {
// one of the field serializers cannot provide a fallback deserializer
return CompatibilityResult.requiresMigration();
} else {
convertDeserializers[i] =
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
}
}
i++;
}
if (requireMigration) {
return CompatibilityResult.requiresMigration(new RowSerializer(convertDeserializers));
} else {
return CompatibilityResult.compatible();
}
}
}
return CompatibilityResult.requiresMigration();
}
public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
private static final int VERSION = 1;
/** This empty nullary constructor is required for deserializing the configuration. */
public RowSerializerConfigSnapshot() {}
public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) {
super(fieldSerializers);
}
@Override
public int getVersion() {
return VERSION;
}
}
}