blob: e98fb8ed4ae4bd3f627de287b4f22f5149652ada [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.api.serialization.customserializers
import com.fasterxml.jackson.core.{JsonParser, JsonProcessingException}
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonNode}
import com.fasterxml.jackson.databind.jsontype.TypeDeserializer
import org.apache.wayang.api.MultiContext
import org.apache.wayang.api.serialization.SerializationUtils.mapper
import org.apache.wayang.core.api.Configuration
import org.apache.wayang.java.Java
import org.apache.wayang.postgres.Postgres
import org.apache.wayang.spark.Spark
import org.apache.wayang.sqlite3.Sqlite3
import java.io.IOException
class MultiContextDeserializer extends JsonDeserializer[MultiContext] {
override def deserializeWithType(p: JsonParser, ctxt: DeserializationContext, typeDeserializer: TypeDeserializer): AnyRef = {
this.deserialize(p, ctxt)
}
@throws[IOException]
@throws[JsonProcessingException]
override def deserialize(jp: JsonParser, ctxt: DeserializationContext): MultiContext = {
// Deserialize each field of MultiContext separately
val node: JsonNode = jp.getCodec.readTree(jp)
val configurationParser: JsonParser = node.get("configuration").traverse(jp.getCodec)
val configuration: Configuration = mapper.readValue(configurationParser, classOf[Configuration])
val sinkParser: JsonParser = node.get("sink").traverse(jp.getCodec)
val sink: Option[MultiContext.UnarySink] = mapper.readValue(sinkParser, new TypeReference[Option[MultiContext.UnarySink]]() {})
val pluginsParser: JsonParser = node.get("plugins").traverse(jp.getCodec)
val plugins: List[String] = mapper.readValue(pluginsParser, new TypeReference[List[String]]() {})
//
// Create the whole deserialized multi context
//
// 1. Add configuration
val multiContext = new MultiContext(configuration)
// 2. Add sink
sink match {
case Some(MultiContext.TextFileSink(url)) =>
println(s"It's a TextFileSink with url: $url")
multiContext.withTextFileSink(url)
case Some(MultiContext.ObjectFileSink(url)) =>
println(s"It's an ObjectFileSink with url: $url")
multiContext.withObjectFileSink(url)
case None =>
println("No sink defined")
case _ =>
println("Unknown sink type")
}
// TODO: Add all plugins
// 3. Add plugins
val javaPluginName = Java.basicPlugin.getClass.getName
val sparkPluginName = Spark.basicPlugin.getClass.getName
val postgresPluginName = Postgres.plugin().getClass.getName
// val flinkPluginName = Flink.basicPlugin().getClass.getName
val sqlite3PluginName = Sqlite3.plugin().getClass.getName
plugins.foreach {
case pluginName if pluginName == javaPluginName => multiContext.register(Java.basicPlugin())
case pluginName if pluginName == sparkPluginName => multiContext.register(Spark.basicPlugin())
case pluginName if pluginName == postgresPluginName => multiContext.register(Postgres.plugin())
// case pluginName if pluginName == flinkPluginName => multiContext.register(Flink.basicPlugin())
case pluginName if pluginName == sqlite3PluginName => multiContext.register(Sqlite3.plugin())
case _ => println("Unknown plugin detected")
}
multiContext
}
}