| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.streaming.util.serialization; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.api.common.ExecutionConfig; |
| import org.apache.flink.api.common.typeinfo.TypeInformation; |
| import org.apache.flink.api.common.typeutils.TypeSerializer; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.api.java.typeutils.TupleTypeInfo; |
| import org.apache.flink.api.java.typeutils.TypeExtractor; |
| import org.apache.flink.core.memory.DataInputDeserializer; |
| import org.apache.flink.core.memory.DataOutputSerializer; |
| |
| import java.io.IOException; |
| |
| /** |
| * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to |
| * transform typed from and to byte arrays. |
| * |
| * @param <K> The key type to be serialized. |
| * @param <V> The value type to be serialized. |
| */ |
| @Internal |
| public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> { |
| |
| private static final long serialVersionUID = -5359448468131559102L; |
| |
| /** The serializer for the key. */ |
| private final TypeSerializer<K> keySerializer; |
| |
| /** The serializer for the value. */ |
| private final TypeSerializer<V> valueSerializer; |
| |
| /** reusable input deserialization buffer. */ |
| private final DataInputDeserializer inputDeserializer; |
| |
| /** reusable output serialization buffer for the key. */ |
| private transient DataOutputSerializer keyOutputSerializer; |
| |
| /** reusable output serialization buffer for the value. */ |
| private transient DataOutputSerializer valueOutputSerializer; |
| |
| /** The type information, to be returned by {@link #getProducedType()}. It is |
| * transient, because it is not serializable. Note that this means that the type information |
| * is not available at runtime, but only prior to the first serialization / deserialization */ |
| private final transient TypeInformation<Tuple2<K, V>> typeInfo; |
| |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Creates a new de-/serialization schema for the given types. |
| * |
| * @param keyTypeInfo The type information for the key type de-/serialized by this schema. |
| * @param valueTypeInfo The type information for the value type de-/serialized by this schema. |
| * @param ec The execution config, which is used to parametrize the type serializers. |
| */ |
| public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) { |
| this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo); |
| this.keySerializer = keyTypeInfo.createSerializer(ec); |
| this.valueSerializer = valueTypeInfo.createSerializer(ec); |
| this.inputDeserializer = new DataInputDeserializer(); |
| } |
| |
| /** |
| * Creates a new de-/serialization schema for the given types. This constructor accepts the types |
| * as classes and internally constructs the type information from the classes. |
| * |
| * <p>If the types are parametrized and cannot be fully defined via classes, use the constructor |
| * that accepts {@link TypeInformation} instead. |
| * |
| * @param keyClass The class of the key de-/serialized by this schema. |
| * @param valueClass The class of the value de-/serialized by this schema. |
| * @param config The execution config, which is used to parametrize the type serializers. |
| */ |
| public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) { |
| this(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass), config); |
| } |
| |
| // ------------------------------------------------------------------------ |
| |
| @Override |
| public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { |
| K key = null; |
| V value = null; |
| |
| if (messageKey != null) { |
| inputDeserializer.setBuffer(messageKey, 0, messageKey.length); |
| key = keySerializer.deserialize(inputDeserializer); |
| } |
| if (message != null) { |
| inputDeserializer.setBuffer(message, 0, message.length); |
| value = valueSerializer.deserialize(inputDeserializer); |
| } |
| return new Tuple2<>(key, value); |
| } |
| |
| /** |
| * This schema never considers an element to signal end-of-stream, so this method returns always false. |
| * @param nextElement The element to test for the end-of-stream signal. |
| * @return Returns false. |
| */ |
| @Override |
| public boolean isEndOfStream(Tuple2<K, V> nextElement) { |
| return false; |
| } |
| |
| @Override |
| public byte[] serializeKey(Tuple2<K, V> element) { |
| if (element.f0 == null) { |
| return null; |
| } else { |
| // key is not null. serialize it: |
| if (keyOutputSerializer == null) { |
| keyOutputSerializer = new DataOutputSerializer(16); |
| } |
| try { |
| keySerializer.serialize(element.f0, keyOutputSerializer); |
| } |
| catch (IOException e) { |
| throw new RuntimeException("Unable to serialize record", e); |
| } |
| // check if key byte array size changed |
| byte[] res = keyOutputSerializer.getByteArray(); |
| if (res.length != keyOutputSerializer.length()) { |
| byte[] n = new byte[keyOutputSerializer.length()]; |
| System.arraycopy(res, 0, n, 0, keyOutputSerializer.length()); |
| res = n; |
| } |
| keyOutputSerializer.clear(); |
| return res; |
| } |
| } |
| |
| @Override |
| public byte[] serializeValue(Tuple2<K, V> element) { |
| // if the value is null, its serialized value is null as well. |
| if (element.f1 == null) { |
| return null; |
| } |
| |
| if (valueOutputSerializer == null) { |
| valueOutputSerializer = new DataOutputSerializer(16); |
| } |
| |
| try { |
| valueSerializer.serialize(element.f1, valueOutputSerializer); |
| } |
| catch (IOException e) { |
| throw new RuntimeException("Unable to serialize record", e); |
| } |
| |
| byte[] res = valueOutputSerializer.getByteArray(); |
| if (res.length != valueOutputSerializer.length()) { |
| byte[] n = new byte[valueOutputSerializer.length()]; |
| System.arraycopy(res, 0, n, 0, valueOutputSerializer.length()); |
| res = n; |
| } |
| valueOutputSerializer.clear(); |
| return res; |
| } |
| |
| @Override |
| public String getTargetTopic(Tuple2<K, V> element) { |
| return null; // we are never overriding the topic |
| } |
| |
| @Override |
| public TypeInformation<Tuple2<K, V>> getProducedType() { |
| if (typeInfo != null) { |
| return typeInfo; |
| } |
| else { |
| throw new IllegalStateException( |
| "The type information is not available after this class has been serialized and distributed."); |
| } |
| } |
| } |