blob: d8b558e6ca62def94da3f26357b21db82c3b1c5b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.file.src;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A serializer for the {@link PendingSplitsCheckpoint}. */
@PublicEvolving
public final class PendingSplitsCheckpointSerializer<T extends FileSourceSplit>
implements SimpleVersionedSerializer<PendingSplitsCheckpoint<T>> {
private static final int VERSION = 1;
private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF;
private final SimpleVersionedSerializer<T> splitSerializer;
public PendingSplitsCheckpointSerializer(SimpleVersionedSerializer<T> splitSerializer) {
this.splitSerializer = checkNotNull(splitSerializer);
}
// ------------------------------------------------------------------------
@Override
public int getVersion() {
return VERSION;
}
@Override
public byte[] serialize(PendingSplitsCheckpoint<T> checkpoint) throws IOException {
checkArgument(
checkpoint.getClass() == PendingSplitsCheckpoint.class,
"Cannot serialize subclasses of PendingSplitsCheckpoint");
// optimization: the splits lazily cache their own serialized form
if (checkpoint.serializedFormCache != null) {
return checkpoint.serializedFormCache;
}
final SimpleVersionedSerializer<T> splitSerializer = this.splitSerializer; // stack cache
final Collection<T> splits = checkpoint.getSplits();
final Collection<Path> processedPaths = checkpoint.getAlreadyProcessedPaths();
final ArrayList<byte[]> serializedSplits = new ArrayList<>(splits.size());
final ArrayList<byte[]> serializedPaths = new ArrayList<>(processedPaths.size());
int totalLen =
16; // four ints: magic, version of split serializer, count splits, count paths
for (T split : splits) {
final byte[] serSplit = splitSerializer.serialize(split);
serializedSplits.add(serSplit);
totalLen += serSplit.length + 4; // 4 bytes for the length field
}
for (Path path : processedPaths) {
final byte[] serPath = path.toString().getBytes(StandardCharsets.UTF_8);
serializedPaths.add(serPath);
totalLen += serPath.length + 4; // 4 bytes for the length field
}
final byte[] result = new byte[totalLen];
final ByteBuffer byteBuffer = ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
byteBuffer.putInt(VERSION_1_MAGIC_NUMBER);
byteBuffer.putInt(splitSerializer.getVersion());
byteBuffer.putInt(serializedSplits.size());
byteBuffer.putInt(serializedPaths.size());
for (byte[] splitBytes : serializedSplits) {
byteBuffer.putInt(splitBytes.length);
byteBuffer.put(splitBytes);
}
for (byte[] pathBytes : serializedPaths) {
byteBuffer.putInt(pathBytes.length);
byteBuffer.put(pathBytes);
}
assert byteBuffer.remaining() == 0;
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
checkpoint.serializedFormCache = result;
return result;
}
@Override
public PendingSplitsCheckpoint<T> deserialize(int version, byte[] serialized)
throws IOException {
if (version == 1) {
return deserializeV1(serialized);
}
throw new IOException("Unknown version: " + version);
}
private PendingSplitsCheckpoint<T> deserializeV1(byte[] serialized) throws IOException {
final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
final int magic = bb.getInt();
if (magic != VERSION_1_MAGIC_NUMBER) {
throw new IOException(
String.format(
"Invalid magic number for PendingSplitsCheckpoint. "
+ "Expected: %X , found %X",
VERSION_1_MAGIC_NUMBER, magic));
}
final int splitSerializerVersion = bb.getInt();
final int numSplits = bb.getInt();
final int numPaths = bb.getInt();
final SimpleVersionedSerializer<T> splitSerializer = this.splitSerializer; // stack cache
final ArrayList<T> splits = new ArrayList<>(numSplits);
final ArrayList<Path> paths = new ArrayList<>(numPaths);
for (int remaining = numSplits; remaining > 0; remaining--) {
final byte[] bytes = new byte[bb.getInt()];
bb.get(bytes);
final T split = splitSerializer.deserialize(splitSerializerVersion, bytes);
splits.add(split);
}
for (int remaining = numPaths; remaining > 0; remaining--) {
final byte[] bytes = new byte[bb.getInt()];
bb.get(bytes);
final Path path = new Path(new String(bytes, StandardCharsets.UTF_8));
paths.add(path);
}
return PendingSplitsCheckpoint.reusingCollection(splits, paths);
}
}