| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.api.java.typeutils.runtime; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; |
| import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; |
| import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; |
| import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; |
| import org.apache.flink.core.memory.DataInputDeserializer; |
| import org.apache.flink.core.memory.DataInputView; |
| import org.apache.flink.core.memory.DataOutputSerializer; |
| import org.apache.flink.core.memory.DataOutputView; |
| |
| import javax.annotation.Nonnull; |
| |
| import java.io.IOException; |
| |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| import static org.apache.flink.util.Preconditions.checkState; |
| |
| /** |
| * Serializer wrapper to add support of {@code null} value serialization. |
| * |
| * <p>If the target serializer does not support {@code null} values of its type, |
| * you can use this class to wrap this serializer. |
| * This is a generic treatment of {@code null} value serialization |
| * which comes with the cost of additional byte in the final serialized value. |
| * The {@code NullableSerializer} will intercept {@code null} value serialization case |
| * and prepend the target serialized value with a boolean flag marking whether it is {@code null} or not. |
| * <pre> {@code |
| * TypeSerializer<T> originalSerializer = ...; |
| * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrap(originalSerializer); |
| * // or |
| * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrapIfNullIsNotSupported(originalSerializer); |
| * }}</pre> |
| * |
| * @param <T> type to serialize |
| */ |
| public class NullableSerializer<T> extends TypeSerializer<T> { |
| private static final long serialVersionUID = 3335569358214720033L; |
| private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; |
| |
| @Nonnull |
| private final TypeSerializer<T> originalSerializer; |
| private final byte[] padding; |
| |
| private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) { |
| this(originalSerializer, createPadding(originalSerializer.getLength(), padNullValueIfFixedLen)); |
| } |
| |
| private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, byte[] padding) { |
| this.originalSerializer = originalSerializer; |
| this.padding = padding; |
| } |
| |
| private static byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) { |
| boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen; |
| return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY; |
| } |
| |
| /** |
| * This method tries to serialize {@code null} value with the {@code originalSerializer} |
| * and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}. |
| * |
| * @param originalSerializer serializer to wrap and add {@code null} support |
| * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer |
| * @return serializer which supports {@code null} values |
| */ |
| public static <T> TypeSerializer<T> wrapIfNullIsNotSupported( |
| @Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) { |
| return checkIfNullSupported(originalSerializer) ? |
| originalSerializer : wrap(originalSerializer, padNullValueIfFixedLen); |
| } |
| |
| /** |
| * This method checks if {@code serializer} supports {@code null} value. |
| * |
| * @param serializer serializer to check |
| */ |
| public static <T> boolean checkIfNullSupported(@Nonnull TypeSerializer<T> serializer) { |
| int length = serializer.getLength() > 0 ? serializer.getLength() : 1; |
| DataOutputSerializer dos = new DataOutputSerializer(length); |
| try { |
| serializer.serialize(null, dos); |
| } |
| catch (IOException | RuntimeException e) { |
| return false; |
| } |
| checkArgument( |
| serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length, |
| "The serialized form of the null value should have the same length " + |
| "as any other if the length is fixed in the serializer"); |
| DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer()); |
| try { |
| checkArgument(serializer.deserialize(dis) == null); |
| } |
| catch (IOException e) { |
| throw new RuntimeException( |
| String.format("Unexpected failure to deserialize just serialized null value with %s", |
| serializer.getClass().getName()), e); |
| } |
| checkArgument( |
| serializer.copy(null) == null, |
| "Serializer %s has to be able properly copy null value if it can serialize it", |
| serializer.getClass().getName()); |
| return true; |
| } |
| |
| private boolean padNullValue() { |
| return padding.length > 0; |
| } |
| |
| private int nullPaddingLength() { |
| return padding.length; |
| } |
| |
| private TypeSerializer<T> originalSerializer() { |
| return originalSerializer; |
| } |
| |
| /** |
| * This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped. |
| * |
| * @param originalSerializer serializer to wrap and add {@code null} support |
| * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer |
| * @return wrapped serializer which supports {@code null} values |
| */ |
| public static <T> TypeSerializer<T> wrap( |
| @Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) { |
| return originalSerializer instanceof NullableSerializer ? |
| originalSerializer : new NullableSerializer<>(originalSerializer, padNullValueIfFixedLen); |
| } |
| |
| @Override |
| public boolean isImmutableType() { |
| return originalSerializer.isImmutableType(); |
| } |
| |
| @Override |
| public TypeSerializer<T> duplicate() { |
| TypeSerializer<T> duplicateOriginalSerializer = originalSerializer.duplicate(); |
| return duplicateOriginalSerializer == originalSerializer ? |
| this : new NullableSerializer<>(originalSerializer.duplicate(), padNullValue()); |
| } |
| |
| @Override |
| public T createInstance() { |
| return originalSerializer.createInstance(); |
| } |
| |
| @Override |
| public T copy(T from) { |
| return from == null ? null : originalSerializer.copy(from); |
| } |
| |
| @Override |
| public T copy(T from, T reuse) { |
| return from == null ? null : |
| (reuse == null ? originalSerializer.copy(from) : originalSerializer.copy(from, reuse)); |
| } |
| |
| @Override |
| public int getLength() { |
| return padNullValue() ? 1 + padding.length : -1; |
| } |
| |
| @Override |
| public void serialize(T record, DataOutputView target) throws IOException { |
| if (record == null) { |
| target.writeBoolean(true); |
| target.write(padding); |
| } |
| else { |
| target.writeBoolean(false); |
| originalSerializer.serialize(record, target); |
| } |
| } |
| |
| @Override |
| public T deserialize(DataInputView source) throws IOException { |
| boolean isNull = deserializeNull(source); |
| return isNull ? null : originalSerializer.deserialize(source); |
| } |
| |
| @Override |
| public T deserialize(T reuse, DataInputView source) throws IOException { |
| boolean isNull = deserializeNull(source); |
| return isNull ? null : (reuse == null ? |
| originalSerializer.deserialize(source) : originalSerializer.deserialize(reuse, source)); |
| } |
| |
| private boolean deserializeNull(DataInputView source) throws IOException { |
| boolean isNull = source.readBoolean(); |
| if (isNull) { |
| source.skipBytesToRead(padding.length); |
| } |
| return isNull; |
| } |
| |
| @Override |
| public void copy(DataInputView source, DataOutputView target) throws IOException { |
| boolean isNull = source.readBoolean(); |
| target.writeBoolean(isNull); |
| if (isNull) { |
| target.write(padding); |
| } |
| else { |
| originalSerializer.copy(source, target); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| return obj == this || |
| (obj != null && obj.getClass() == getClass() && |
| originalSerializer.equals(((NullableSerializer) obj).originalSerializer)); |
| } |
| |
| @Override |
| public int hashCode() { |
| return originalSerializer.hashCode(); |
| } |
| |
| @Override |
| public TypeSerializerSnapshot<T> snapshotConfiguration() { |
| return new NullableSerializerSnapshot<>(this); |
| } |
| |
| |
| /** |
| * Configuration snapshot for serializers of nullable types, containing the |
| * configuration snapshot of its original serializer. |
| * |
| * @deprecated this snapshot class is no longer in use, and is maintained only for |
| * backwards compatibility purposes. It is fully replaced |
| * by {@link NullableSerializerSnapshot}. |
| */ |
| @Deprecated |
| @Internal |
| public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<T> { |
| private static final int VERSION = 1; |
| |
| /** |
| * This empty nullary constructor is required for deserializing the configuration. |
| */ |
| public NullableSerializerConfigSnapshot() { |
| } |
| |
| NullableSerializerConfigSnapshot(TypeSerializer<T> originalSerializer) { |
| super(originalSerializer); |
| } |
| |
| @Override |
| public int getVersion() { |
| return VERSION; |
| } |
| |
| @Override |
| public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) { |
| NullableSerializer<T> previousSerializer = (NullableSerializer<T>) restoreSerializer(); |
| NullableSerializerSnapshot<T> newCompositeSnapshot = new NullableSerializerSnapshot<>(previousSerializer.nullPaddingLength()); |
| |
| return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( |
| newSerializer, |
| newCompositeSnapshot, |
| getSingleNestedSerializerAndConfig().f1 |
| ); |
| } |
| } |
| |
| /** |
| * Snapshot for serializers of nullable types, containing the |
| * snapshot of its original serializer. |
| */ |
| @SuppressWarnings({"unchecked", "WeakerAccess"}) |
| public static class NullableSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<T, NullableSerializer<T>> { |
| |
| private static final int VERSION = 2; |
| private int nullPaddingLength; |
| |
| @SuppressWarnings("unused") |
| public NullableSerializerSnapshot() { |
| super(NullableSerializer.class); |
| } |
| |
| public NullableSerializerSnapshot(NullableSerializer<T> serializerInstance) { |
| super(serializerInstance); |
| this.nullPaddingLength = serializerInstance.nullPaddingLength(); |
| } |
| |
| private NullableSerializerSnapshot(int nullPaddingLength) { |
| super(NullableSerializer.class); |
| checkArgument(nullPaddingLength >= 0, |
| "Computed NULL padding can not be negative. %s", |
| nullPaddingLength); |
| |
| this.nullPaddingLength = nullPaddingLength; |
| } |
| |
| @Override |
| protected int getCurrentOuterSnapshotVersion() { |
| return VERSION; |
| } |
| |
| @Override |
| protected TypeSerializer<?>[] getNestedSerializers(NullableSerializer<T> outerSerializer) { |
| return new TypeSerializer[]{outerSerializer.originalSerializer()}; |
| } |
| |
| @Override |
| protected NullableSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { |
| checkState(nullPaddingLength >= 0, |
| "Negative padding size after serializer construction: %s", |
| nullPaddingLength); |
| |
| final byte[] padding = (nullPaddingLength == 0) ? EMPTY_BYTE_ARRAY : new byte[nullPaddingLength]; |
| TypeSerializer<T> nestedSerializer = (TypeSerializer<T>) nestedSerializers[0]; |
| return new NullableSerializer<>(nestedSerializer, padding); |
| } |
| |
| @Override |
| protected void writeOuterSnapshot(DataOutputView out) throws IOException { |
| out.writeInt(nullPaddingLength); |
| } |
| |
| @Override |
| protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { |
| nullPaddingLength = in.readInt(); |
| } |
| |
| @Override |
| protected boolean isOuterSnapshotCompatible(NullableSerializer<T> newSerializer) { |
| return nullPaddingLength == newSerializer.nullPaddingLength(); |
| } |
| } |
| |
| } |