| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.apex.malhar.lib.appdata.query.serde; |
| |
| import java.io.IOException; |
| import java.lang.annotation.Annotation; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.codehaus.jettison.json.JSONException; |
| import org.codehaus.jettison.json.JSONObject; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.apex.malhar.lib.appdata.schemas.Message; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * This is a utility class which simplifies deserializing AppData messages. The factory is configured by specifying the {@link Message} Classes |
| * that messages will be deserialized into. The {@link MessageDeserializerFactory} knows the correct type of a Message, its deserializer, and its |
| * validator by looking up annotations specified on the {@link Message} class. An example of a {@link Message} class that can be deserialized by |
| * {@link MessageDeserializerFactory} is below. |
| * <br/> |
| * <br/> |
| * <pre> |
| * <code> |
| * {@literal @}MessageType(type="MyMessageType") |
| * {@literal @}MessageDeserializerInfo(clazz=MyAppDataMessageDeserializer.class) |
| * {@literal @}MessageValidatorInfo(clazz=MyAppDataMessageValidator.class) |
| * public class MyAppDataMessage extends Message |
| * { |
| * ... |
| * } |
| * </code> |
| * </pre> |
| * @since 3.0.0 |
| */ |
| public class MessageDeserializerFactory |
| { |
| /** |
| * A map from message type to the corresponding class. |
| */ |
| private final Map<String, Class<? extends Message>> typeToClass = Maps.newHashMap(); |
| /** |
| * A map from query types to their corresponding deserializers. |
| */ |
| private final Map<String, CustomMessageDeserializer> typeToCustomQueryBuilder = Maps.newHashMap(); |
| /** |
| * A map from query types to their corresponding validators. |
| */ |
| private final Map<String, CustomMessageValidator> typeToCustomQueryValidator = Maps.newHashMap(); |
| /** |
| * A map from Message classes to their corresponding contexts. |
| */ |
| private final Map<Class<? extends Message>, Object> deserializationContext = Maps.newHashMap(); |
| |
| /** |
| * Creates a {@link MessageDeserializerFactory} to deserialize the messages corresponding to the specified classes. |
| * @param schemas The classes to deserialize Messages into. |
| */ |
| public MessageDeserializerFactory(Class<? extends Message>... schemas) |
| { |
| setClasses(schemas); |
| } |
| |
| /** |
| * This sets the context object to use when deserializing the specified class of messages. |
| * @param clazz The Class of messages that a context a being set for. |
| * @param context The context to use when deserializing messages corresponding to the specified class. |
| */ |
| public void setContext(Class<? extends Message> clazz, Object context) |
| { |
| deserializationContext.put(clazz, context); |
| } |
| |
| /** |
| * This is a helper method which validates setting the desired class that data will be deserialized into. |
| * @param schemas The classes that data will be deserialized into. |
| */ |
| private void setClasses(Class<? extends Message>[] schemas) |
| { |
| Preconditions.checkArgument(schemas.length != 0, "No schemas provided."); |
| |
| Set<Class<? extends Message>> clazzes = Sets.newHashSet(); |
| |
| for (Class<? extends Message> schema : schemas) { |
| Preconditions.checkNotNull(schema, "Provided schema cannot be null"); |
| Preconditions.checkArgument(!clazzes.contains(schema), "Schema %s was passed twice.", schema); |
| clazzes.add(schema); |
| |
| Annotation[] ans = schema.getAnnotations(); |
| |
| String schemaType = null; |
| Class<? extends CustomMessageDeserializer> cqd = null; |
| Class<? extends CustomMessageValidator> cqv = null; |
| |
| for (Annotation an : ans) { |
| if (an instanceof MessageType) { |
| if (schemaType != null) { |
| throw new IllegalArgumentException("Cannot specify the " + MessageType.class + |
| " annotation twice on the class: " + schema); |
| } |
| |
| schemaType = ((MessageType)an).type(); |
| |
| LOG.debug("Detected schemaType for {} is {}", schema, schemaType); |
| } else if (an instanceof MessageDeserializerInfo) { |
| if (cqd != null) { |
| throw new IllegalArgumentException("Cannot specify the " + MessageDeserializerInfo.class + |
| " annotation twice on the class: " + schema); |
| } |
| |
| cqd = ((MessageDeserializerInfo)an).clazz(); |
| } else if (an instanceof MessageValidatorInfo) { |
| if (cqv != null) { |
| throw new IllegalArgumentException("Cannot specify the " + MessageValidatorInfo.class + |
| " annotation twice on the class: "); |
| } |
| |
| cqv = ((MessageValidatorInfo)an).clazz(); |
| } |
| } |
| |
| if (schemaType == null) { |
| throw new IllegalArgumentException("No " + MessageType.class + " annotation found on class: " + schema); |
| } |
| |
| if (cqd == null) { |
| throw new IllegalArgumentException("No " + MessageDeserializerInfo.class + " annotation found on class: " + |
| schema); |
| } |
| |
| if (cqv == null) { |
| throw new IllegalArgumentException( |
| "No " + MessageValidatorInfo.class + " annotation found on class: " + schema); |
| } |
| |
| Class<? extends Message> prevSchema = typeToClass.put(schemaType, schema); |
| LOG.debug("prevSchema {}:", prevSchema); |
| |
| if (prevSchema != null) { |
| throw new IllegalArgumentException("Cannot have the " + |
| schemaType + " schemaType defined on multiple classes: " + schema + ", " + prevSchema); |
| } |
| |
| try { |
| CustomMessageDeserializer cqdI = cqd.newInstance(); |
| CustomMessageValidator cqvI = cqv.newInstance(); |
| typeToCustomQueryBuilder.put(schemaType, cqdI); |
| typeToCustomQueryValidator.put(schemaType, cqvI); |
| } catch (InstantiationException | IllegalAccessException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| } |
| |
| /** |
| * This is the method that is called to deserialize a given json string into a Message object of |
| * the appropriate type. |
| * @param json The json to deserialize. |
| * @return The deserialized Message. |
| * @throws IOException |
| */ |
| public Message deserialize(String json) throws IOException |
| { |
| String type; |
| |
| try { |
| JSONObject jsonObject = new JSONObject(json); |
| type = jsonObject.getString(Message.FIELD_TYPE); |
| } catch (JSONException e) { |
| throw new IOException(e); |
| } |
| |
| CustomMessageDeserializer cqb = typeToCustomQueryBuilder.get(type); |
| |
| if (cqb == null) { |
| throw new IOException("The query type " + |
| type + |
| " does not have a corresponding deserializer."); |
| } |
| |
| CustomMessageValidator cqv = typeToCustomQueryValidator.get(type); |
| Object context = deserializationContext.get(typeToClass.get(type)); |
| Message data = cqb.deserialize(json, typeToClass.get(type), context); |
| |
| LOG.debug("{}", data); |
| |
| if (data == null || !(cqv != null && cqv.validate(data, context))) { |
| return null; |
| } |
| |
| data.setType(type); |
| return data; |
| } |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MessageDeserializerFactory.class); |
| } |