blob: 380911e56625c2890136a68799a19d5f86052b5d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
import javax.annotation.Nullable;
import java.io.IOException;
import static org.apache.flink.util.Preconditions.checkState;
/**
* Point-in-time configuration of a {@link GenericArraySerializer}.
*
* @param <C> The component type.
*
* @deprecated this is deprecated and no longer used by the {@link GenericArraySerializer}.
* It has been replaced by {@link GenericArraySerializerSnapshot}.
*/
@Internal
@Deprecated
public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerializerSnapshot<C[]> {
private static final int CURRENT_VERSION = 2;
/** The class of the components of the serializer's array type. */
@Nullable
private Class<C> componentClass;
/** Snapshot handling for the component serializer snapshot. */
@Nullable
private NestedSerializersSnapshotDelegate nestedSnapshot;
/**
* Constructor for read instantiation.
*/
@SuppressWarnings("unused")
public GenericArraySerializerConfigSnapshot() {}
/**
* Constructor to create the snapshot for writing.
*/
public GenericArraySerializerConfigSnapshot(GenericArraySerializer<C> serializer) {
this.componentClass = serializer.getComponentClass();
this.nestedSnapshot = new NestedSerializersSnapshotDelegate(serializer.getComponentSerializer());
}
// ------------------------------------------------------------------------
@Override
public int getCurrentVersion() {
return CURRENT_VERSION;
}
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
checkState(componentClass != null && nestedSnapshot != null);
out.writeUTF(componentClass.getName());
nestedSnapshot.writeNestedSerializerSnapshots(out);
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) throws IOException {
switch (readVersion) {
case 1:
readV1(in, classLoader);
break;
case 2:
readV2(in, classLoader);
break;
default:
throw new IllegalArgumentException("Unrecognized version: " + readVersion);
}
}
private void readV1(DataInputView in, ClassLoader classLoader) throws IOException {
nestedSnapshot = NestedSerializersSnapshotDelegate.legacyReadNestedSerializerSnapshots(in, classLoader);
try (DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
componentClass = InstantiationUtil.deserializeObject(inViewWrapper, classLoader);
}
catch (ClassNotFoundException e) {
throw new IOException("Could not find requested element class in classpath.", e);
}
}
private void readV2(DataInputView in, ClassLoader classLoader) throws IOException {
componentClass = InstantiationUtil.resolveClassByName(in, classLoader);
nestedSnapshot = NestedSerializersSnapshotDelegate.readNestedSerializerSnapshots(in, classLoader);
}
@Override
public GenericArraySerializer<C> restoreSerializer() {
checkState(componentClass != null && nestedSnapshot != null);
return new GenericArraySerializer<>(componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
}
@Override
public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
checkState(nestedSnapshot != null);
if (!(newSerializer instanceof GenericArraySerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
// delegate to the new snapshot class
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new GenericArraySerializerSnapshot<>(componentClass),
nestedSnapshot.getNestedSerializerSnapshots());
}
}