blob: ae6b9731d87626247f454569cb15d324ed75b148 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.typeutils
import java.util.{HashMap => JHashMap, Map => JMap}
import org.apache.flink.api.common.typeutils._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
/**
* The [[NullAwareMapSerializer]] is similar to MapSerializer, the only difference is that
* the [[NullAwareMapSerializer]] can handle null keys.
*/
class NullAwareMapSerializer[K, V](
val keySerializer: TypeSerializer[K],
val valueSerializer: TypeSerializer[V])
extends TypeSerializer[JMap[K, V]]{
override def isImmutableType: Boolean = false
override def duplicate(): TypeSerializer[JMap[K, V]] = {
val keySerCopy = keySerializer.duplicate()
val valueSerCopy = valueSerializer.duplicate()
new NullAwareMapSerializer(keySerCopy, valueSerCopy)
}
override def createInstance(): JMap[K, V] = new JHashMap[K, V]()
override def copy(from: JMap[K, V]): JMap[K, V] = {
val newMap = createInstance()
val fromIter = from.entrySet().iterator()
while (fromIter.hasNext) {
val entry = fromIter.next()
val key = entry.getKey
val value = entry.getValue
val newKey = if (key == null) {
null.asInstanceOf[K]
} else {
keySerializer.copy(key)
}
val newValue = if (value == null) {
null.asInstanceOf[V]
} else {
valueSerializer.copy(value)
}
newMap.put(newKey, newValue)
}
newMap
}
override def copy(from: JMap[K, V], reuse: JMap[K, V]): JMap[K, V] = copy(from)
override def getLength: Int = -1 // var length
override def serialize(map: JMap[K, V], target: DataOutputView): Unit = {
target.writeInt(map.size())
val iter = map.entrySet().iterator()
while (iter.hasNext) {
val entry = iter.next()
if (entry.getKey == null) {
target.writeBoolean(true)
} else {
target.writeBoolean(false)
keySerializer.serialize(entry.getKey, target)
}
if (entry.getValue == null) {
target.writeBoolean(true)
} else {
target.writeBoolean(false)
valueSerializer.serialize(entry.getValue, target)
}
}
}
override def deserialize(source: DataInputView): JMap[K, V] = {
val size = source.readInt()
val map = createInstance()
for (i <- 0 until size) {
val keyIsNull = source.readBoolean()
val key = if (keyIsNull) {
null.asInstanceOf[K]
} else {
keySerializer.deserialize(source)
}
val valueIsNull = source.readBoolean()
val value = if (valueIsNull) {
null.asInstanceOf[V]
} else {
valueSerializer.deserialize(source)
}
map.put(key, value)
}
map
}
override def deserialize(
reuse: JMap[K, V],
source: DataInputView): JMap[K, V] = deserialize(source)
override def copy(source: DataInputView, target: DataOutputView): Unit = {
val size = source.readInt()
target.writeInt(size)
for (i <- 0 until size) {
val keyIsNull = source.readBoolean()
target.writeBoolean(keyIsNull)
if (!keyIsNull) {
keySerializer.copy(source, target)
}
val valueIsNull = source.readBoolean()
target.writeBoolean(valueIsNull)
if (!valueIsNull) {
valueSerializer.copy(source, target)
}
}
}
override def canEqual(obj: scala.Any): Boolean = {
obj != null && getClass == obj.getClass
}
override def toString: String = {
"NullAwareMapSerializer{keySerializer=" + keySerializer +
", valueSerializer=" + valueSerializer + "}"
}
override def equals(o: Any): Boolean = {
if (this eq o.asInstanceOf[AnyRef]) return true
if (o == null || (getClass ne o.getClass)) return false
val that = o.asInstanceOf[NullAwareMapSerializer[K, V]]
this.keySerializer == that.keySerializer && this.valueSerializer == that.valueSerializer
}
override def hashCode: Int = {
var result = keySerializer.hashCode
result = 31 * result + valueSerializer.hashCode
result
}
override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
new NullAwareMapSerializerConfigSnapshot(keySerializer, valueSerializer)
}
override def ensureCompatibility(configSnapshot: TypeSerializerConfigSnapshot)
: CompatibilityResult[JMap[K, V]] = {
configSnapshot match {
case snapshot: NullAwareMapSerializerConfigSnapshot[_, _] =>
val prevConfigs = snapshot.getNestedSerializersAndConfigs
val keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
prevConfigs.get(0).f0,
classOf[UnloadableDummyTypeSerializer[_]],
prevConfigs.get(0).f1,
keySerializer)
val valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
prevConfigs.get(1).f0,
classOf[UnloadableDummyTypeSerializer[_]],
prevConfigs.get(1).f1,
valueSerializer)
if (!keyCompatResult.isRequiresMigration && !valueCompatResult.isRequiresMigration) {
CompatibilityResult.compatible[JMap[K, V]]
} else if (keyCompatResult.getConvertDeserializer != null &&
valueCompatResult.getConvertDeserializer != null) {
CompatibilityResult
.requiresMigration(new NullAwareMapSerializer[K, V](
new TypeDeserializerAdapter[K](keyCompatResult.getConvertDeserializer),
new TypeDeserializerAdapter[V](valueCompatResult.getConvertDeserializer)))
} else {
CompatibilityResult.requiresMigration[JMap[K, V]]
}
case _ => CompatibilityResult.requiresMigration[JMap[K, V]]
}
}
}