blob: fa20eec1a4c9740d0ec405ad5ee81db71a152191 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.appdata.query.serde;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Map;
import java.util.Set;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.appdata.schemas.Message;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* This is a utility class which simplifies deserializing AppData messages. The factory is configured by specifying the {@link Message} Classes
* that messages will be deserialized into. The {@link MessageDeserializerFactory} knows the correct type of a Message, its deserializer, and its
* validator by looking up annotations specified on the {@link Message} class. An example of a {@link Message} class that can be deserialized by
* {@link MessageDeserializerFactory} is below.
* <br/>
* <br/>
* <pre>
* <code>
* {@literal @}MessageType(type="MyMessageType")
* {@literal @}MessageDeserializerInfo(clazz=MyAppDataMessageDeserializer.class)
* {@literal @}MessageValidatorInfo(clazz=MyAppDataMessageValidator.class)
* public class MyAppDataMessage extends Message
* {
* ...
* }
* </code>
* </pre>
* @since 3.0.0
*/
public class MessageDeserializerFactory
{
/**
* A map from message type to the corresponding class.
*/
private final Map<String, Class<? extends Message>> typeToClass = Maps.newHashMap();
/**
* A map from query types to their corresponding deserializers.
*/
private final Map<String, CustomMessageDeserializer> typeToCustomQueryBuilder = Maps.newHashMap();
/**
* A map from query types to their corresponding validators.
*/
private final Map<String, CustomMessageValidator> typeToCustomQueryValidator = Maps.newHashMap();
/**
* A map from Message classes to their corresponding contexts.
*/
private final Map<Class<? extends Message>, Object> deserializationContext = Maps.newHashMap();
/**
* Creates a {@link MessageDeserializerFactory} to deserialize the messages corresponding to the specified classes.
* @param schemas The classes to deserialize Messages into.
*/
public MessageDeserializerFactory(Class<? extends Message>... schemas)
{
setClasses(schemas);
}
/**
* This sets the context object to use when deserializing the specified class of messages.
* @param clazz The Class of messages that a context a being set for.
* @param context The context to use when deserializing messages corresponding to the specified class.
*/
public void setContext(Class<? extends Message> clazz, Object context)
{
deserializationContext.put(clazz, context);
}
/**
* This is a helper method which validates setting the desired class that data will be deserialized into.
* @param schemas The classes that data will be deserialized into.
*/
private void setClasses(Class<? extends Message>[] schemas)
{
Preconditions.checkArgument(schemas.length != 0, "No schemas provided.");
Set<Class<? extends Message>> clazzes = Sets.newHashSet();
for (Class<? extends Message> schema : schemas) {
Preconditions.checkNotNull(schema, "Provided schema cannot be null");
Preconditions.checkArgument(!clazzes.contains(schema), "Schema %s was passed twice.", schema);
clazzes.add(schema);
Annotation[] ans = schema.getAnnotations();
String schemaType = null;
Class<? extends CustomMessageDeserializer> cqd = null;
Class<? extends CustomMessageValidator> cqv = null;
for (Annotation an : ans) {
if (an instanceof MessageType) {
if (schemaType != null) {
throw new IllegalArgumentException("Cannot specify the " + MessageType.class +
" annotation twice on the class: " + schema);
}
schemaType = ((MessageType)an).type();
LOG.debug("Detected schemaType for {} is {}", schema, schemaType);
} else if (an instanceof MessageDeserializerInfo) {
if (cqd != null) {
throw new IllegalArgumentException("Cannot specify the " + MessageDeserializerInfo.class +
" annotation twice on the class: " + schema);
}
cqd = ((MessageDeserializerInfo)an).clazz();
} else if (an instanceof MessageValidatorInfo) {
if (cqv != null) {
throw new IllegalArgumentException("Cannot specify the " + MessageValidatorInfo.class +
" annotation twice on the class: ");
}
cqv = ((MessageValidatorInfo)an).clazz();
}
}
if (schemaType == null) {
throw new IllegalArgumentException("No " + MessageType.class + " annotation found on class: " + schema);
}
if (cqd == null) {
throw new IllegalArgumentException("No " + MessageDeserializerInfo.class + " annotation found on class: " +
schema);
}
if (cqv == null) {
throw new IllegalArgumentException(
"No " + MessageValidatorInfo.class + " annotation found on class: " + schema);
}
Class<? extends Message> prevSchema = typeToClass.put(schemaType, schema);
LOG.debug("prevSchema {}:", prevSchema);
if (prevSchema != null) {
throw new IllegalArgumentException("Cannot have the " +
schemaType + " schemaType defined on multiple classes: " + schema + ", " + prevSchema);
}
try {
CustomMessageDeserializer cqdI = cqd.newInstance();
CustomMessageValidator cqvI = cqv.newInstance();
typeToCustomQueryBuilder.put(schemaType, cqdI);
typeToCustomQueryValidator.put(schemaType, cqvI);
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
}
}
/**
* This is the method that is called to deserialize a given json string into a Message object of
* the appropriate type.
* @param json The json to deserialize.
* @return The deserialized Message.
* @throws IOException
*/
public Message deserialize(String json) throws IOException
{
String type;
try {
JSONObject jsonObject = new JSONObject(json);
type = jsonObject.getString(Message.FIELD_TYPE);
} catch (JSONException e) {
throw new IOException(e);
}
CustomMessageDeserializer cqb = typeToCustomQueryBuilder.get(type);
if (cqb == null) {
throw new IOException("The query type " +
type +
" does not have a corresponding deserializer.");
}
CustomMessageValidator cqv = typeToCustomQueryValidator.get(type);
Object context = deserializationContext.get(typeToClass.get(type));
Message data = cqb.deserialize(json, typeToClass.get(type), context);
LOG.debug("{}", data);
if (data == null || !(cqv != null && cqv.validate(data, context))) {
return null;
}
data.setType(type);
return data;
}
private static final Logger LOG = LoggerFactory.getLogger(MessageDeserializerFactory.class);
}