/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.datatorrent.stram.plan.logical;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;

import javax.validation.ValidationException;

import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.apex.api.plugin.DAGSetupPlugin.DAGSetupPluginContext;
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Attribute.AttributeMap.AttributeInitializer;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.GenericOperator;
import com.datatorrent.api.Module;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.StringCodec;
import com.datatorrent.api.StringCodec.JsonStringCodec;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.ModuleMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
import com.datatorrent.stram.util.ObjectMapperFactory;

import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_CONFIGURE;
import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_POPULATE;
import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_VALIDATE;
import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_CONFIGURE;
import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_POPULATE;
import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_VALIDATE;
import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.SETUP;
import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.TEARDOWN;

/**
 *
 * Builder for the DAG logical representation of operators and streams from properties.<p>
 * <br>
 * Supports reading as name-value pairs from Hadoop {@link Configuration} or opProps file.
 * <br>
 *
 * @since 0.3.2
 */
public class LogicalPlanConfiguration
{

  private static final Logger LOG = LoggerFactory.getLogger(LogicalPlanConfiguration.class);

  public static final String GATEWAY_PREFIX = StreamingApplication.DT_PREFIX + "gateway.";
  public static final String GATEWAY_LISTEN_ADDRESS = GATEWAY_PREFIX + "listenAddress";

  public static final String STREAM_PREFIX = StreamingApplication.APEX_PREFIX + "stream.";
  public static final String STREAM_SOURCE = "source";
  public static final String STREAM_SINKS = "sinks";
  public static final String STREAM_TEMPLATE = "template";
  public static final String STREAM_LOCALITY = "locality";
  public static final String STREAM_SCHEMA = "schema";

  public static final String OPERATOR_PREFIX = StreamingApplication.APEX_PREFIX + "operator.";
  public static final String OPERATOR_CLASSNAME = "classname";
  public static final String OPERATOR_TEMPLATE = "template";

  public static final String TEMPLATE_idRegExp = "matchIdRegExp";
  public static final String TEMPLATE_appNameRegExp = "matchAppNameRegExp";
  public static final String TEMPLATE_classNameRegExp = "matchClassNameRegExp";

  public static final String CLASS = "class";
  public static final String KEY_SEPARATOR = ".";
  public static final String KEY_SEPARATOR_SPLIT_REGEX = "\\.";

  private static final String CLASS_SUFFIX = "." + CLASS;

  private static final String WILDCARD = "*";
  private static final String WILDCARD_PATTERN = ".*";

  /**
   * This is done to initialize the serial id of these interfaces.
   */
  static {
    Object[] serial = new Object[]{Context.DAGContext.serialVersionUID, OperatorContext.serialVersionUID, PortContext.serialVersionUID};
    LOG.debug("Initialized attributes {}", serial);
  }

  public static final String KEY_APPLICATION_NAME = keyAndDeprecation(Context.DAGContext.APPLICATION_NAME);
  public static final String KEY_GATEWAY_CONNECT_ADDRESS = keyAndDeprecation(Context.DAGContext.GATEWAY_CONNECT_ADDRESS);

  private static String keyAndDeprecation(Attribute<?> attr)
  {
    String key = StreamingApplication.APEX_PREFIX + attr.getName();
    Configuration.addDeprecation(StreamingApplication.DT_PREFIX + attr.getName(), key);
    return key;
  }

  private final DAGSetupPluginManager pluginManager;

  /**
   * This represents an element that can be referenced in a DT property.
   */
  protected enum StramElement
  {
    APPLICATION("application"), GATEWAY("gateway"), TEMPLATE("template"), OPERATOR("operator"), STREAM("stream"),
    PORT("port"), INPUT_PORT("inputport"), OUTPUT_PORT("outputport"),
    ATTR("attr"), PROP("prop"), CLASS("class"), PATH("path"), UNIFIER("unifier");
    private final String value;

    /**
     * Creates a {@link StramElement} with the corresponding name.
     *
     * @param value The name of the {@link StramElement}.
     */
    StramElement(String value)
    {
      this.value = value;
    }

    /**
     * Gets the name of the {@link StramElement}.
     *
     * @return The name of the {@link StramElement}.
     */
    public String getValue()
    {
      return value;
    }

    /**
     * Gets the {@link StramElement} corresponding to the given name.
     *
     * @param value The name for which a {@link StramElement} is desired.
     * @return The {@link StramElement} corresponding to the given name.
     */
    public static StramElement fromValue(String value)
    {
      StramElement velement = null;
      for (StramElement element : StramElement.values()) {
        if (element.getValue().equals(value)) {
          velement = element;
          break;
        }
      }
      return velement;
    }

  }

  /**
   * This is an enum which represents a type of configuration.
   */
  protected enum ConfElement
  {
    STRAM(null, null, null, null),
    APPLICATION(StramElement.APPLICATION, STRAM, null, DAGContext.class),
    TEMPLATE(StramElement.TEMPLATE, STRAM, null, null),
    GATEWAY(StramElement.GATEWAY, ConfElement.APPLICATION, null, null),
    OPERATOR(StramElement.OPERATOR, ConfElement.APPLICATION, null, OperatorContext.class),
    STREAM(StramElement.STREAM, ConfElement.APPLICATION, null, null),
    PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class),
    UNIFIER(StramElement.UNIFIER, ConfElement.PORT, null, null);

    protected static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap();
    protected static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap();

    static {
      initialize();
    }

    protected static void initialize()
    {
      STRAM.setChildren(Sets.newHashSet(APPLICATION, TEMPLATE));
      APPLICATION.setChildren(Sets.newHashSet(GATEWAY, OPERATOR, STREAM));
      OPERATOR.setChildren(Sets.newHashSet(PORT));
      PORT.setChildren(Sets.newHashSet(UNIFIER));

      STRAM_ELEMENT_TO_CONF_ELEMENT.clear();

      //Initialize StramElement to ConfElement
      for (ConfElement confElement: ConfElement.values()) {
        STRAM_ELEMENT_TO_CONF_ELEMENT.put(confElement.getStramElement(), confElement);

        for (StramElement sElement: confElement.getAllRelatedElements()) {
          STRAM_ELEMENT_TO_CONF_ELEMENT.put(sElement, confElement);
        }
      }

      //Initialize attributes
      for (ConfElement confElement: ConfElement.values()) {
        if (confElement.getParent() == null) {
          continue;
        }

        setAmbiguousAttributes(confElement);
      }

      // build context to conf element map
      CONTEXT_TO_CONF_ELEMENT.clear();

      for (ConfElement confElement: ConfElement.values()) {
        CONTEXT_TO_CONF_ELEMENT.put(confElement.getContextClass(), confElement);
      }

      //Check if all the context classes are accounted for
      Set<Class<? extends Context>> confElementContextClasses = Sets.newHashSet();

      for (ConfElement confElement: ConfElement.values()) {
        if (confElement.getContextClass() == null) {
          continue;
        }

        confElementContextClasses.add(confElement.getContextClass());
      }

      if (!ContextUtils.CONTEXT_CLASSES.equals(confElementContextClasses)) {
        throw new IllegalStateException("All the context classes " + ContextUtils.CONTEXT_CLASSES + " found in "
                                        + Context.class + " are not used by ConfElements " + confElementContextClasses);
      }
    }

    /**
     * This is a recursive method to initialize the ambiguous elements for each
     * {@link ConfElement}.
     *
     * @param element The current {@link ConfElement} at which to start initializing
     * the ambiguous elements.
     * @return The set of all simple attribute names encountered up to this point.
     */
    public static Set<String> setAmbiguousAttributes(ConfElement element)
    {
      Set<String> ambiguousAttributes = Sets.newHashSet();
      Set<String> allChildAttributes = Sets.newHashSet(element.getContextAttributes());

      for (ConfElement childElement: element.getChildren()) {
        Set<String> allAttributes = setAmbiguousAttributes(childElement);
        ambiguousAttributes.addAll(childElement.getAmbiguousAttributes());

        @SuppressWarnings("unchecked")
        Set<String> intersection = Sets.newHashSet(CollectionUtils.intersection(allChildAttributes, allAttributes));
        ambiguousAttributes.addAll(intersection);
        allChildAttributes.addAll(allAttributes);
      }

      element.setAmbiguousAttributes(ambiguousAttributes);
      element.setAllChildAttributes(allChildAttributes);

      return allChildAttributes;
    }

    private final StramElement element;
    private final ConfElement parent;
    private Set<ConfElement> children = Sets.newHashSet();
    private final Set<StramElement> allRelatedElements = Sets.newHashSet();
    private final Class<? extends Context> contextClass;
    private Set<String> ambiguousAttributes = Sets.newHashSet();
    private Set<String> contextAttributes = Sets.newHashSet();
    private Set<String> allChildAttributes = Sets.newHashSet();

    /**
     * This creates a {@link ConfElement}.
     *
     * @param element The current {@link StramElement} representing a {@link ConfElement}.
     * @param parent The parent {@link ConfElement}.
     * @param additionalRelatedElements Any additional {@link StramElement} that could be
     * related to this {@link ConfElement}.
     * @param contextClass The {@link Context} class that contains all the attributes to
     * be used by this {@link ConfElement}.
     */
    ConfElement(StramElement element,
        ConfElement parent,
        Set<StramElement> additionalRelatedElements,
        Class<? extends Context> contextClass)
    {
      this.element = element;
      this.parent = parent;

      if (additionalRelatedElements != null) {
        this.allRelatedElements.addAll(additionalRelatedElements);
      }

      this.allRelatedElements.add(element);

      this.contextClass = contextClass;

      this.contextAttributes = contextClass != null ? ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass) : new HashSet<String>();
    }

    private void setAllChildAttributes(Set<String> allChildAttributes)
    {
      this.allChildAttributes = Preconditions.checkNotNull(allChildAttributes);
    }

    public Set<String> getAllChildAttributes()
    {
      return allChildAttributes;
    }

    private void setAmbiguousAttributes(Set<String> ambiguousAttributes)
    {
      this.ambiguousAttributes = Preconditions.checkNotNull(ambiguousAttributes);
    }

    /**
     * Gets the simple names of attributes which are specified under multiple configurations which
     * include this configuration or any child configurations.
     *
     * @return The set of ambiguous simple attribute names.
     */
    public Set<String> getAmbiguousAttributes()
    {
      return ambiguousAttributes;
    }

    /**
     * Gets the {@link Context} class that corresponds to this {@link ConfElement}.
     *
     * @return The {@link Context} class that corresponds to this {@link ConfElement}.
     */
    public Class<? extends Context> getContextClass()
    {
      return contextClass;
    }

    /**
     * Gets the {@link StramElement} representing this {@link ConfElement}.
     *
     * @return The {@link StramElement} corresponding to this {@link ConfElement}.
     */
    public StramElement getStramElement()
    {
      return element;
    }

    /**
     * Gets the attributes contained in the {@link Context} associated with this {@link ConfElement}.
     *
     * @return A {@link java.util.Set} containing the simple attribute names of all of the attributes
     * contained in the {@link Context} associated with this {@link ConfElement}.
     */
    public Set<String> getContextAttributes()
    {
      return contextAttributes;
    }

    /**
     * Gets the {@link ConfElement} that is the parent of this {@link ConfElement}.
     *
     * @return The {@link ConfElement} that is the parent of this {@link ConfElement}.
     */
    public ConfElement getParent()
    {
      return parent;
    }

    /**
     * Sets the child {@link ConfElement}s of this {@link ConfElement}.
     *
     * @param children The child {@link ConfElement}s of this {@link ConfElement}.
     */
    private void setChildren(Set<ConfElement> children)
    {
      this.children = Preconditions.checkNotNull(children);
    }

    /**
     * Gets the child {@link ConfElement}s of this {@link ConfElement}.
     *
     * @return The child {@link ConfElement} of this {@link ConfElement}
     */
    public Set<ConfElement> getChildren()
    {
      return children;
    }

    /**
     * Gets all the {@link StramElement}s that are represented by this {@link ConfElement}.
     *
     * @return All the {@link StramElement}s that are represented by this {@link ConfElement}.
     */
    public Set<StramElement> getAllRelatedElements()
    {
      return allRelatedElements;
    }

    /**
     * Gets the {@link StramElement} representing the {@link Conf} type which can be a parent of the {@link Conf} type
     * represented by the given {@link StramElement}.
     *
     * @param conf The {@link StramElement} representing the {@link Conf} type of interest.
     * @return The {@link StramElement} representing the {@link Conf} type which can be a parent of the given {@link Conf} type.
     */
    public static StramElement getAllowedParentConf(StramElement conf)
    {
      ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(conf);

      if (confElement == null) {
        throw new IllegalArgumentException(conf + " is not a valid conf element.");
      }

      return confElement.getParent().getStramElement();
    }

    /**
     * Creates a list of {@link StramElement}s which represent the path from the current {@link Conf} type to
     * a root {@link Conf} type. This path includes the current {@link Conf} type as well as the root.
     *
     * @param conf The current {@link Conf} type.
     * @return A path from the current {@link Conf} type to a root {@link Conf} type, which includes the current and root
     * {@link Conf} types.
     */
    public static List<StramElement> getPathFromChildToRootInclusive(StramElement conf)
    {
      ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(conf);

      if (confElement == null) {
        throw new IllegalArgumentException(conf + " does not represent a valid configuration type.");
      }

      List<StramElement> path = Lists.newArrayList();

      for (; confElement != null; confElement = confElement.getParent()) {
        path.add(confElement.getStramElement());
      }

      return path;
    }

    /**
     * Creates a list of {@link StramElement}s which represent the path from the root {@link Conf} type to
     * the current {@link Conf} type. This path includes the root {@link Conf} type as well as the current {@link Conf} type.
     *
     * @param conf The current {@link Conf} type.
     * @return A path from the root {@link Conf} type to the current {@link Conf} type, which includes the current and root
     * {@link Conf} types.
     */
    public static List<StramElement> getPathFromRootToChildInclusive(StramElement conf)
    {
      List<StramElement> path = getPathFromChildToRootInclusive(conf);
      return Lists.reverse(path);
    }

    /**
     * Creates a list of {@link StramElement}s which represent the path from the current {@link Conf} type to
     * a parent {@link Conf} type. This path includes the current {@link Conf} type as well as the parent.
     *
     * @param child The current {@link Conf} type.
     * @param parent The parent {@link Conf} type.
     * @return A path from the current {@link Conf} type to a parent {@link Conf} type, which includes the current and parent
     * {@link Conf} types.
     */
    public static List<StramElement> getPathFromChildToParentInclusive(StramElement child, StramElement parent)
    {
      ConfElement confElement = STRAM_ELEMENT_TO_CONF_ELEMENT.get(child);

      if (confElement == null) {
        throw new IllegalArgumentException(child + " does not represent a valid configuration type.");
      }

      List<StramElement> path = Lists.newArrayList();

      if (child == parent) {
        path.add(child);
        return path;
      }

      for (; confElement != null; confElement = confElement.getParent()) {
        path.add(confElement.getStramElement());

        if (confElement.getStramElement() == parent) {
          break;
        }
      }

      if (path.get(path.size() - 1) != parent) {
        throw new IllegalArgumentException(parent + " is not a valid parent of " + child);
      }

      return path;
    }

    /**
     * Creates a list of {@link StramElement}s which represent the path from the parent {@link Conf} type to
     * a child {@link Conf} type. This path includes the parent {@link Conf} type as well as the current {@link Conf} type.
     *
     * @param child The current {@link Conf} type.
     * @param parent The parent {@link Conf} type.
     * @return A path from the parent {@link Conf} type to the current {@link Conf} type, which includes the current and parent
     * {@link Conf} types.
     */
    public static List<StramElement> getPathFromParentToChildInclusive(StramElement child, StramElement parent)
    {
      List<StramElement> path = getPathFromChildToParentInclusive(child, parent);
      return Lists.reverse(path);
    }

    /**
     * This method searches the current {@link ConfElement} and its children to find a {@link ConfElement}
     * that contains the given simple {@link Attribute} name.
     *
     * @param current The current {@link ConfElement}.
     * @param simpleAttributeName The simple {@link Attribute} name to search for.
     * @return The {@link ConfElement} that contains the given attribute, or null if no {@link ConfElement} contains
     * the given attribute.
     */
    public static ConfElement findConfElementWithAttribute(ConfElement current, String simpleAttributeName)
    {
      if (current.getContextAttributes().contains(simpleAttributeName)) {
        return current;
      }

      for (ConfElement childConfElement: current.getChildren()) {
        ConfElement result = findConfElementWithAttribute(childConfElement, simpleAttributeName);

        if (result != null) {
          return result;
        }
      }

      return null;
    }

    protected static Conf addConfs(Conf parentConf, ConfElement childConfElement)
    {
      //Figure out what configurations need to be added to hold this attribute
      List<StramElement> path = ConfElement.getPathFromParentToChildInclusive(childConfElement.getStramElement(), parentConf.getConfElement().getStramElement());

      for (int pathIndex = 1; pathIndex < path.size(); pathIndex++) {
        LOG.debug("Adding conf");
        StramElement pathElement = path.get(pathIndex);
        //Add the configurations we need to hold this attribute
        parentConf = addConf(pathElement, WILDCARD, parentConf);
      }

      return parentConf;
    }

  }

  /**
   * Utility class that holds methods for handling {@link Context} classes.
   */
  @SuppressWarnings("unchecked")
  protected static class ContextUtils
  {
    private static final Map<String, Type> ATTRIBUTES_TO_TYPE = Maps.newHashMap();
    public static final Map<Class<? extends Context>, Set<String>> CONTEXT_CLASS_TO_ATTRIBUTES = Maps.newHashMap();
    public static final Set<Class<? extends Context>> CONTEXT_CLASSES = Sets.newHashSet();
    public static final Map<Class<? extends Context>, Map<String, Attribute<?>>> CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE = Maps.newHashMap();

    static {
      initialize();
    }

    @VisibleForTesting
    protected static void initialize()
    {
      CONTEXT_CLASSES.clear();

      for (Class<?> clazz: Context.class.getDeclaredClasses()) {
        if (!Context.class.isAssignableFrom(clazz)) {
          continue;
        }

        CONTEXT_CLASSES.add((Class<? extends Context>)clazz);
      }

      buildAttributeMaps(CONTEXT_CLASSES);
    }

    @VisibleForTesting
    protected static void buildAttributeMaps(Set<Class<? extends Context>> contextClasses)
    {
      CONTEXT_CLASS_TO_ATTRIBUTES.clear();
      CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.clear();
      ATTRIBUTES_TO_TYPE.clear();

      for (Class<? extends Context> contextClass: contextClasses) {
        Set<String> contextAttributes = Sets.newHashSet();

        Field[] fields = contextClass.getDeclaredFields();

        for (Field field: fields) {
          if (!Attribute.class.isAssignableFrom(field.getType())) {
            continue;
          }

          Type fieldType = ((ParameterizedType)field.getGenericType()).getActualTypeArguments()[0];
          contextAttributes.add(field.getName());

          Type existingType = ATTRIBUTES_TO_TYPE.get(field.getName());

          if (existingType != null && !existingType.equals(fieldType)) {
            throw new ValidationException("The attribute " + field.getName() +
                " is defined with two different types in two different context classes: " +
                fieldType + " and " + existingType + "\n" +
                "Attributes with the same name are required to have the same type accross all Context classes.");
          }

          ATTRIBUTES_TO_TYPE.put(field.getName(), fieldType);
        }

        CONTEXT_CLASS_TO_ATTRIBUTES.put(contextClass, contextAttributes);
      }

      for (Class<? extends Context> contextClass: contextClasses) {
        Map<String, Attribute<?>> simpleAttributeNameToAttribute = Maps.newHashMap();
        CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.put(contextClass, simpleAttributeNameToAttribute);

        Set<Attribute<Object>> attributes = AttributeInitializer.getAttributes(contextClass);

        LOG.debug("context class {} and attributes {}", contextClass, attributes);

        for (Attribute<Object> attribute: attributes) {
          simpleAttributeNameToAttribute.put(AttributeParseUtils.getSimpleName(attribute), attribute);
        }
      }
    }

    private ContextUtils()
    {
      //Private construct to prevent instantiation of utility class
    }

    /**
     * This method is only used for testing.
     *
     * @param contextClass
     * @param attribute
     */
    @VisibleForTesting
    protected static void addAttribute(Class<? extends Context> contextClass, Attribute<?> attribute)
    {
      Set<String> attributeNames = CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass);

      if (attributeNames == null) {
        attributeNames = Sets.newHashSet();
        CONTEXT_CLASS_TO_ATTRIBUTES.put(contextClass, attributeNames);
      }

      attributeNames.add(attribute.getSimpleName());

      CONTEXT_CLASSES.add(contextClass);
      Map<String, Attribute<?>> attributeMap = CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(contextClass);

      if (attributeMap == null) {
        attributeMap = Maps.newHashMap();
        CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.put(contextClass, attributeMap);
      }

      attributeMap.put(attribute.getSimpleName(), attribute);
    }

    /**
     * This method is only used for testing.
     *
     * @param contextClass
     * @param attribute
     */
    @VisibleForTesting
    protected static void removeAttribute(Class<? extends Context> contextClass, Attribute<?> attribute)
    {
      Set<String> attributeNames = CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass);

      if (attributeNames != null) {
        attributeNames.remove(attribute.getSimpleName());

        if (attributeNames.isEmpty()) {
          CONTEXT_CLASS_TO_ATTRIBUTES.remove(contextClass);
        }
      }

      if (!CONTEXT_CLASS_TO_ATTRIBUTES.keySet().contains(contextClass)) {
        CONTEXT_CLASSES.remove(contextClass);
      }

      Map<String, Attribute<?>> attributeMap = CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(contextClass);

      if (attributeMap != null) {
        attributeMap.remove(attribute.getSimpleName());

        if (attributeMap.isEmpty()) {
          CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.remove(contextClass);
        }
      }
    }

  }

  /**
   * Utility class that holds methods for parsing.
   */
  protected static class AttributeParseUtils
  {
    public static final Set<String> ALL_SIMPLE_ATTRIBUTE_NAMES;

    static {
      ALL_SIMPLE_ATTRIBUTE_NAMES = Sets.newHashSet();

      initialize();
    }

    public static void initialize()
    {
      ALL_SIMPLE_ATTRIBUTE_NAMES.clear();

      for (Map.Entry<Class<? extends Context>, Set<String>> entry: ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.entrySet()) {
        ALL_SIMPLE_ATTRIBUTE_NAMES.addAll(entry.getValue());
      }
    }

    private AttributeParseUtils()
    {
      //Private construct to prevent instantiation of utility class
    }

    /**
     * This method creates all the appropriate child {@link Conf}s of the given parent {@link Conf} and adds the given
     * attribute to the parent {@link Conf} if appropriate as well as all the child {@link Conf}s of the parent if
     * appropriate.
     *
     * @param conf The parent {@link Conf}.
     * @param attributeName The simple name of the attribute to add.
     * @param attrValue The value of the attribute.
     */
    protected static void processAllConfsForAttribute(Conf conf, String attributeName, String attrValue)
    {
      ConfElement confElement = conf.getConfElement();

      LOG.debug("Current confElement {} and name {}", confElement.getStramElement(), conf.getId());

      if (confElement.getContextAttributes().contains(attributeName)) {
        LOG.debug("Adding attribute");
        @SuppressWarnings("unchecked")
        Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confElement.getContextClass()).get(attributeName);
        conf.setAttribute(attr, attrValue);
      }

      for (ConfElement childConfElement: confElement.getChildren()) {

        if (!childConfElement.getAllChildAttributes().contains(attributeName)) {
          continue;
        }

        Conf childConf = addConf(childConfElement.getStramElement(), WILDCARD, conf);
        processAllConfsForAttribute(childConf, attributeName, attrValue);
      }
    }

    /**
     * This extracts the name of an attribute from the given set of keys.
     *
     * @param element The {@link StramElement} corresponding to the current element being parsed.
     * @param keys The split keys that are being parse.
     * @param index The current key that the parser is on.
     * @return The FQN name of an attribute or just the name of an Attribute.
     */
    public static String getAttributeName(StramElement element, String[] keys, int index)
    {

      if (element != null && element != StramElement.ATTR) {
        throw new IllegalArgumentException("The given " + StramElement.class + " must either have a value of null or " + StramElement.ATTR + " but it had a value of " + element);
      }

      String attributeName;

      if (element == StramElement.ATTR) {
        attributeName = getCompleteKey(keys, index + 1);
      } else {
        attributeName = getCompleteKey(keys, index);
      }

      return attributeName;
    }

    /**
     * This method checks to see if the attribute name is simple or is prefixed with the FQCN of the {@link Context}
     * class which contains it.
     *
     * @param attributeName The attribute name to check.
     * @return True if the attribute name is simple. False otherwise.
     */
    public static boolean isSimpleAttributeName(String attributeName)
    {
      return !attributeName.contains(KEY_SEPARATOR);
    }

    /**
     * Gets the {@link Context} class that the given attributeName belongs to.
     *
     * @param attributeName The {@link Attribute} name whose {@link Context} class needs to be
     * discovered.
     * @return The {@link Context} class that the given {@link Attribute} name belongs to.
     */
    @SuppressWarnings("unchecked")
    public static Class<? extends Context> getContainingContextClass(String attributeName)
    {
      if (isSimpleAttributeName(attributeName)) {
        throw new IllegalArgumentException("The given attribute name " + attributeName + " is simple.");
      }

      LOG.debug("Attribute Name {}", attributeName);

      int lastSeparator = attributeName.lastIndexOf(KEY_SEPARATOR);
      String contextClassName = attributeName.substring(0, lastSeparator);

      int lastPeriod = contextClassName.lastIndexOf(KEY_SEPARATOR);

      StringBuilder sb = new StringBuilder(contextClassName);
      sb.setCharAt(lastPeriod, '$');
      contextClassName = sb.toString();

      Class<? extends Context> contextClass;

      try {
        Class<?> clazz = Class.forName(contextClassName);

        if (Context.class.isAssignableFrom(clazz)) {
          contextClass = (Class<? extends Context>)clazz;
        } else {
          throw new IllegalArgumentException("The provided context class name " + contextClassName + " is not valid.");
        }
      } catch (ClassNotFoundException ex) {
        throw new IllegalArgumentException(ex);
      }

      String simpleAttributeName = getSimpleAttributeName(attributeName);

      if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(simpleAttributeName)) {
        throw new ValidationException(simpleAttributeName + " is not a valid attribute of " + contextClass);
      }

      return contextClass;
    }

    /**
     * This extract this simple {@link Attribute} name from the given {@link Attribute} name.
     *
     * @param attributeName The attribute name to extract a simple attribute name from.
     * @return The simple attribute name.
     */
    public static String getSimpleAttributeName(String attributeName)
    {
      if (isSimpleAttributeName(attributeName)) {
        return attributeName;
      }

      if (attributeName.endsWith(KEY_SEPARATOR)) {
        throw new IllegalArgumentException("The given attribute name ends with \"" + KEY_SEPARATOR + "\" so a simple name cannot be extracted.");
      }

      return attributeName.substring(attributeName.lastIndexOf(KEY_SEPARATOR) + 1, attributeName.length());
    }

    /**
     * Gets the simple name of an {@link Attribute}, which does not include the FQCN of the {@link Context} class
     * which contains it.
     *
     * @param attribute The {@link Attribute} of interest.
     * @return The name of an {@link Attribute}.
     */
    public static String getSimpleName(Attribute<?> attribute)
    {
      return getSimpleAttributeName(attribute.name);
    }

  }

  public class JSONObject2String implements StringCodec<Object>, Serializable
  {
    private static final long serialVersionUID = -664977453308585878L;

    @Override
    public Object fromString(String jsonObj)
    {
      LOG.debug("JONString {}", jsonObj);
      ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer();
      try {
        return mapper.readValue(jsonObj, Object.class);
      } catch (IOException e) {
        throw new RuntimeException("Error parsing json content", e);
      }
    }

    @Override
    public String toString(Object pojo)
    {
      ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer();
      try {
        return mapper.writeValueAsString(pojo);
      } catch (IOException e) {
        throw new RuntimeException("Error writing object as json", e);
      }
    }

  }

  /**
   * This is an abstract class representing the configuration applied to an element in the DAG.
   */
  private abstract static class Conf
  {
    protected Conf parentConf = null;

    protected final Map<Attribute<Object>, String> attributes = Maps.newHashMap();
    protected final PropertiesWithModifiableDefaults properties = new PropertiesWithModifiableDefaults();

    protected Map<StramElement, Map<String, ? extends Conf>> children = Maps.newHashMap();

    protected String id;

    public void setId(String id)
    {
      this.id = id;
    }

    public String getId()
    {
      return id;
    }

    public void setParentConf(Conf parentConf)
    {
      this.parentConf = parentConf;
    }

    @SuppressWarnings("unchecked")
    public <T extends Conf> T getParentConf()
    {
      return (T)parentConf;
    }

    /**
     * Gets an ancestor {@link Conf} of this {@link Conf} of the given {@link StramElement} type.
     *
     * @param <T>             The {@link Conf} Class of the ancestor conf
     * @param ancestorElement The {@link StramElement} representing the type of the ancestor {@link Conf}.
     * @return The ancestor {@link Conf} of the corresponding {@link StramElement} type, or null if no ancestor
     * {@link Conf} with
     * the given {@link StramElement} type exists.
     */
    @SuppressWarnings("unchecked")
    public <T extends Conf> T getAncestorConf(StramElement ancestorElement)
    {
      if (getConfElement().getStramElement() == ancestorElement) {
        return (T)this;
      }
      if (parentConf == null) {
        return null;
      } else {
        return parentConf.getAncestorConf(ancestorElement);
      }
    }

    /**
     * This method retrieves a child {@link Conf} of the given {@link StramElement} type with the given name. If
     * a child {@link Conf} with the given name and {@link StramElement} type doesn't exist, then it is added.
     * @param <T> The type of the child {@link Conf}.
     * @param id The name of the child {@link Conf}.
     * @param childType The {@link StramElement} representing the type of the child {@link Conf}.
     * @param clazz The {@link java.lang.Class} of the child {@link Conf} to add if a {@link Conf} of the given id
     * and {@link StramElement} type is not present.
     * @return A child {@link Conf} of this {@link Conf} with the given id and {@link StramElement} type.
     */
    public <T extends Conf> T getOrAddChild(String id, StramElement childType, Class<T> clazz)
    {
      @SuppressWarnings("unchecked")
      Map<String, T> elChildren = (Map<String, T>)children.get(childType);
      if (elChildren == null) {
        elChildren = Maps.newHashMap();
        children.put(childType, elChildren);
      }
      T conf = getOrAddConf(elChildren, id, clazz);
      if (conf != null) {
        conf.setParentConf(this);
      }
      return conf;
    }

    public void setAttribute(Attribute<Object> attr, String value)
    {
      attributes.put(attr, value);
    }

    public void setProperty(String name, String value)
    {
      properties.setProperty(name, value);
    }

    public void setDefaultProperties(Properties defaults)
    {
      properties.setDefaultProperties(defaults);
    }

    /**
     * This method returns a list of all the child {@link Conf}s of this {@link Conf} with the matching name
     * and {@link StramElement} type.
     * @param <T> The types of the child {@link Conf}s.
     * @param name The name of the child {@link Conf}s to return. If the name of the specified child {@link Conf}
     * is null then configurations with the name specified as a {@link LogicalPlanConfiguration#WILDCARD} are matched.
     * @param childType The {@link StramElement} corresponding to the type of a child {@link Conf}.
     * @return The list of child {@link Conf}s with a matching name and {@link StramElement} type.
     */
    public <T extends Conf> List<T> getMatchingChildConf(String name, StramElement childType)
    {
      List<T> childConfs = new ArrayList<>();
      Map<String, T> elChildren = getChildren(childType);
      for (Map.Entry<String, T> entry : elChildren.entrySet()) {
        String key = entry.getKey();
        boolean match = false;
        boolean exact = false;
        // Match WILDCARD to null
        if (name == null) {
          if (key == null) {
            match = true;
            exact = true;
          } else if (key.equals(WILDCARD)) {
            match = true;
          }
        } else {
          // Also treat WILDCARD as match any character string when running regular express match
          if (key.equals(WILDCARD)) {
            key = WILDCARD_PATTERN;
          }
          if (name.matches(key)) {
            match = true;
          }
          if (name.equals(key)) {
            exact = true;
          }
        }
        // There will be a better match preference order
        if (match) {
          if (!exact) {
            childConfs.add(entry.getValue());
          } else {
            childConfs.add(0, entry.getValue());
          }
        }
      }
      return childConfs;
    }

    /**
     * Returns the {@link Conf} corresponding to the given id from the given map. If a {@link Conf} with the
     * given id is not present in the given map, then a new {@link Conf} of the given class is created and added
     * to the map.
     * @param <T> The type of the {@link Conf}s contained in the map.
     * @param map The map to retrieve a {@link Conf} from or add a {@link Conf} to.
     * @param id The name of the {@link Conf} to retrieve from or add to the given map.
     * @param clazz The {@link java.lang.Class} of the {@link Conf} to add to the given map, if a {@link Conf} with
     * the given name is not present in the given map.
     * @return A {@link Conf} with the given name, contained in the given map.
     */
    protected <T extends Conf> T getOrAddConf(Map<String, T> map, String id, Class<T> clazz)
    {
      T conf = map.get(id);
      if (conf == null) {
        try {
          Constructor<T> declaredConstructor = clazz.getDeclaredConstructor(new Class<?>[]{});
          conf = declaredConstructor.newInstance(new Object[]{});
          conf.setId(id);
          map.put(id, conf);
        } catch (IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException |
            SecurityException | InvocationTargetException e) {
          LOG.error("Error instantiating configuration", e);
        }
      }
      return conf;
    }

    public <T extends Conf> T getChild(String id, StramElement childType)
    {
      T conf = null;
      @SuppressWarnings("unchecked")
      Map<String, T> elChildren = (Map<String, T>)children.get(childType);
      if (elChildren != null) {
        conf = elChildren.get(id);
      }
      return conf;
    }

    @SuppressWarnings("unchecked")
    public <T extends Conf> Map<String, T> getChildren(StramElement childType)
    {
      // Always return non null so caller will not have to do extra check as expected
      Map<String, T> elChildren = (Map<String, T>)children.get(childType);
      if (elChildren == null) {
        elChildren = Maps.newHashMap();
        children.put(childType, elChildren);
      }
      return elChildren;
    }

    // Override for parsing of custom elements other than attributes and opProps
    // Make this config parse element as the entry point for parsing in future instead of the generic method in
    // parent class
    public void parseElement(StramElement element, String[] keys, int index, String propertyValue)
    {
    }

    public boolean isAllowedChild(StramElement childType)
    {
      StramElement[] childElements = getChildElements();
      if (childElements != null) {
        for (StramElement childElement : childElements) {
          if (childType == childElement) {
            return true;
          }
        }
      }
      return false;
    }

    public StramElement getDefaultChildElement()
    {
      if ((getConfElement().getContextClass() == null) && isAllowedChild(StramElement.PROP)) {
        return StramElement.PROP;
      }
      return null;
    }

    public boolean ignoreUnknownChildren()
    {
      return getDefaultChildElement() == null;
    }

    public abstract StramElement[] getChildElements();

    public abstract ConfElement getConfElement();
  }

  private static class StramConf extends Conf
  {

    private final Map<String, String> appAliases = Maps.newHashMap();

    private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.APPLICATION,
        StramElement.GATEWAY, StramElement.TEMPLATE, StramElement.OPERATOR,
        StramElement.PORT, StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM,
        StramElement.TEMPLATE, StramElement.ATTR, StramElement.UNIFIER};

    StramConf()
    {
    }

    @Override
    public StramElement[] getChildElements()
    {
      return CHILD_ELEMENTS;
    }

    @Override
    public ConfElement getConfElement()
    {
      return ConfElement.STRAM;
    }
  }

  /**
   * This holds the configuration information for an Apex application.
   */
  private static class AppConf extends Conf
  {

    private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.GATEWAY, StramElement.OPERATOR,
        StramElement.PORT, StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.ATTR,
        StramElement.CLASS, StramElement.PATH, StramElement.PROP, StramElement.UNIFIER};

    @SuppressWarnings("unused")
    AppConf()
    {
    }

    @Override
    public void parseElement(StramElement element, String[] keys, int index, String propertyValue)
    {
      if ((element == StramElement.CLASS) || (element == StramElement.PATH)) {
        StramConf stramConf = getParentConf();
        stramConf.appAliases.put(propertyValue, getId());
      }
    }

    @Override
    public StramElement[] getChildElements()
    {
      return CHILD_ELEMENTS;
    }

    @Override
    public StramElement getDefaultChildElement()
    {
      return StramElement.PROP;
    }

    @Override
    public ConfElement getConfElement()
    {
      return ConfElement.APPLICATION;
    }
  }

  private static class GatewayConf extends Conf
  {

    private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.PROP};

    @SuppressWarnings("unused")
    GatewayConf()
    {
    }

    @Override
    public StramElement[] getChildElements()
    {
      return CHILD_ELEMENTS;
    }

    @Override
    public ConfElement getConfElement()
    {
      return ConfElement.GATEWAY;
    }
  }

  /**
   * Named set of opProps that can be used to instantiate streams or operators
   * with common settings.
   */
  private static class TemplateConf extends Conf
  {

    private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.PROP};

    @SuppressWarnings("unused")
    TemplateConf()
    {
    }

    @Override
    public StramElement[] getChildElements()
    {
      return CHILD_ELEMENTS;
    }

    @Override
    public ConfElement getConfElement()
    {
      return ConfElement.TEMPLATE;
    }

    @Override
    public void setProperty(String name, String value)
    {
      if (name.equals(TEMPLATE_appNameRegExp)) {
        appNameRegExp = value;
      } else if (name.equals(TEMPLATE_idRegExp)) {
        idRegExp = value;
      } else if (name.equals(TEMPLATE_classNameRegExp)) {
        classNameRegExp = value;
      } else {
        super.setProperty(name, value);
      }
    }

    private String idRegExp;
    private String appNameRegExp;
    private String classNameRegExp;

  }

  /**
   * This holds the configuration information for a stream that connects two operators in an Apex application.
   */
  private static class StreamConf extends Conf
  {

    private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.TEMPLATE, StramElement.PROP};

    private OperatorConf sourceNode;
    private final Set<OperatorConf> targetNodes = new HashSet<>();

    @SuppressWarnings("unused")
    StreamConf()
    {
    }

    @Override
    public ConfElement getConfElement()
    {
      return ConfElement.STREAM;
    }

    /**
     * Locality for adjacent operators.
     * @return boolean
     */
    public DAG.Locality getLocality()
    {
      String v = properties.getProperty(STREAM_LOCALITY, null);
      return (v != null) ? DAG.Locality.valueOf(v) : null;
    }

    /**
     * Set source on stream to the node output port.
     * @param portName
     * @param node
     */
    public StreamConf setSource(String portName, OperatorConf node)
    {
      if (this.sourceNode != null) {
        throw new IllegalArgumentException(String.format("Stream already receives input from %s", sourceNode));
      }
      node.outputs.put(portName, this);
      this.sourceNode = node;
      return this;
    }

    public StreamConf addSink(String portName, OperatorConf targetNode)
    {
      if (targetNode.inputs.containsKey(portName)) {
        throw new IllegalArgumentException(String.format("Port %s already connected to stream %s", portName, targetNode.inputs.get(portName)));
      }
      //LOG.debug("Adding {} to {}", targetNode, this);
      targetNode.inputs.put(portName, this);
      targetNodes.add(targetNode);
      return this;
    }

    @Override
    public void setProperty(String name, String value)
    {
      AppConf appConf = getParentConf();
      if (STREAM_SOURCE.equals(name)) {
        if (sourceNode != null) {
          // multiple sources not allowed
          //throw new IllegalArgumentException("Duplicate " + propertyName);
          throw new IllegalArgumentException("Duplicate " + name);
        }
        String[] parts = getNodeAndPortId(value);
        setSource(parts[1], appConf.getOrAddChild(parts[0], StramElement.OPERATOR, OperatorConf.class));
      } else if (STREAM_SINKS.equals(name)) {
        String[] targetPorts = value.split(",");
        for (String nodeAndPort : targetPorts) {
          String[] parts = getNodeAndPortId(nodeAndPort.trim());
          addSink(parts[1], appConf.getOrAddChild(parts[0], StramElement.OPERATOR, OperatorConf.class));
        }
      } else if (STREAM_TEMPLATE.equals(name)) {
        StramConf stramConf = getAncestorConf(null);
        TemplateConf templateConf = (TemplateConf)stramConf.getOrAddChild(value, StramElement.TEMPLATE, elementMaps.get(StramElement.TEMPLATE));
        setDefaultProperties(templateConf.properties);
      } else {
        super.setProperty(name, value);
      }
    }

    private String[] getNodeAndPortId(String s)
    {
      String[] parts = s.split("\\.");
      if (parts.length != 2) {
        throw new IllegalArgumentException("Invalid node.port reference: " + s);
      }
      return parts;
    }

    @Override
    public StramElement[] getChildElements()
    {
      return CHILD_ELEMENTS;
    }

    @Override
    public String toString()
    {
      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
          .append("id", this.id)
          .toString();
    }

  }

  /**
   * This is a simple extension of {@link java.util.Properties} which allows you to specify default properties.
   */
  private static class PropertiesWithModifiableDefaults extends Properties
  {
    private static final long serialVersionUID = -4675421720308249982L;

    /**
     * @param defaults
     */
    void setDefaultProperties(Properties defaults)
    {
      super.defaults = defaults;
    }
  }

  /**
   * This holds the configuration information for an operator in an Apex application.
   */
  private static class OperatorConf extends Conf
  {
    private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.PORT,
        StramElement.INPUT_PORT, StramElement.OUTPUT_PORT,
        StramElement.ATTR, StramElement.PROP};

    @SuppressWarnings("unused")
    OperatorConf()
    {
    }

    private final Map<String, StreamConf> inputs = Maps.newHashMap();
    private final Map<String, StreamConf> outputs = Maps.newHashMap();
    private String templateRef;

    @Override
    public ConfElement getConfElement()
    {
      return ConfElement.OPERATOR;
    }

    @Override
    public void setProperty(String name, String value)
    {
      if (OPERATOR_TEMPLATE.equals(name)) {
        templateRef = value;
        // Setting opProps from the template as default opProps as before
        // Revisit this
        StramConf stramConf = getAncestorConf(null);
        TemplateConf templateConf = (TemplateConf)stramConf
            .getOrAddChild(value, StramElement.TEMPLATE, elementMaps.get(StramElement.TEMPLATE));
        setDefaultProperties(templateConf.properties);
      } else {
        super.setProperty(name, value);
      }
    }

    private String getClassNameReqd()
    {
      String className = properties.getProperty(OPERATOR_CLASSNAME);
      if (className == null) {
        throw new IllegalArgumentException(String.format("Operator '%s' is missing property '%s'", getId(), LogicalPlanConfiguration.OPERATOR_CLASSNAME));
      }
      return className;
    }

    /**
     * Properties for the node. Template values (if set) become property defaults.
     *
     * @return Map<String,String>
     */
    private Map<String, String> getProperties()
    {
      return Maps.fromProperties(properties);
    }

    @Override
    public StramElement[] getChildElements()
    {
      return CHILD_ELEMENTS;
    }

    @Override
    public StramElement getDefaultChildElement()
    {
      return StramElement.PROP;
    }

    /**
     *
     * @return String
     */
    @Override
    public String toString()
    {
      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
          .append("id", this.id)
          .toString();
    }

  }

  /**
   * This holds the configuration information for a port on an operator in an Apex application.
   */
  private static class PortConf extends Conf
  {

    private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.ATTR, StramElement.UNIFIER};

    @SuppressWarnings("unused")
    PortConf()
    {
    }

    @Override
    public StramElement[] getChildElements()
    {
      return CHILD_ELEMENTS;
    }

    @Override
    public ConfElement getConfElement()
    {
      return ConfElement.PORT;
    }
  }

  private static final Map<StramElement, Class<? extends Conf>> elementMaps = Maps.newHashMap();

  static {
    elementMaps.put(null, StramConf.class);
    elementMaps.put(StramElement.APPLICATION, AppConf.class);
    elementMaps.put(StramElement.GATEWAY, GatewayConf.class);
    elementMaps.put(StramElement.TEMPLATE, TemplateConf.class);
    elementMaps.put(StramElement.OPERATOR, OperatorConf.class);
    elementMaps.put(StramElement.STREAM, StreamConf.class);
    elementMaps.put(StramElement.PORT, PortConf.class);
    elementMaps.put(StramElement.INPUT_PORT, PortConf.class);
    elementMaps.put(StramElement.OUTPUT_PORT, PortConf.class);
    elementMaps.put(StramElement.UNIFIER, OperatorConf.class);
  }

  /**
   * This is a helper method which performs the following checks:<br/><br/>
   * <ol>
   *    <li>If the given {@link StramElement} corresponds to a {@link Conf} type which is
   * the same as the type of the given {@link Conf}, then the given {@link Conf} is returned.</li>
   *    <li>If the given {@link StramElement} corresponds to a {@link Conf} type which is
   * a valid parent {@link Conf} type for the given ancestorConf, then the given ancestor {@link Conf} is
   * returned.</li>
   * @param element The {@link StramElement} type corresponding to this {@link Conf} or
   * to a valid ancestor {@link Conf}.
   * @param ancestorConf The {@link Conf} to return.
   * @return The given {@link Conf}, or null if the first call to this method passes a null {@link StramElement}.
   */
  private static Conf getConf(StramElement element, Conf ancestorConf)
  {
    if (element == ancestorConf.getConfElement().getStramElement()) {
      return ancestorConf;
    }
    // If top most element is reached and didnt match ancestor conf
    // then terminate search
    if (element == null) {
      return null;
    }
    StramElement parentElement = ConfElement.getAllowedParentConf(element);
    Conf parentConf = getConf(parentElement, ancestorConf);

    if (parentConf == null) {
      throw new IllegalArgumentException("The given StramElement is not the same type as the given ancestorConf, " +
                                         "and it is not a valid type for a parent conf.");
    }

    return parentConf.getOrAddChild(WILDCARD, element, elementMaps.get(element));
  }

  /**
   * This method adds a child {@link Conf} with the given {@link StramElement} type and name to the given
   * ancestorConf.
   * @param element The {@link StramElement} of the child {@link Conf} to add to the given ancestorConf.
   * @param name The name of the child {@link Conf} to add to the given ancestorConf.
   * @param ancestorConf The {@link Conf} to add a child {@link Conf} to.
   * @return The child {@link Conf} that was added to the given ancestorConf.
   */
  private static Conf addConf(StramElement element, String name, Conf ancestorConf)
  {
    StramElement parentElement = ConfElement.getAllowedParentConf(element);
    Conf conf1 = null;
    Conf parentConf = getConf(parentElement, ancestorConf);
    if (parentConf != null) {
      conf1 = parentConf.getOrAddChild(name, element, elementMaps.get(element));
    }
    return conf1;
  }

  /**
   * This method returns a list of all the child {@link Conf}s of the given {@link List} of {@link Conf}s with the matching name
   * and {@link StramElement} type.
   * @param <T> The types of the child {@link Conf}s.
   * @param confs The list of {@link Conf}s whose children will be searched.
   * @param name The name of the child {@link Conf}s to return. If the name of the specified child {@link Conf}
   * is null then configurations with the name specified as a {@link LogicalPlanConfiguration#WILDCARD} are matched.
   * @param childType The {@link StramElement} corresponding to the type of a child {@link Conf}.
   * @return The list of child {@link Conf}s with a matching name and {@link StramElement} type.
   */
  private <T extends Conf> List<T> getMatchingChildConf(List<? extends Conf> confs, String name, StramElement childType)
  {
    List<T> childConfs = Lists.newArrayList();
    for (Conf conf1 : confs) {
      List<T> matchingConfs = conf1.getMatchingChildConf(name, childType);
      childConfs.addAll(matchingConfs);
    }
    return childConfs;
  }

  private final Properties properties = new Properties();
  public final Configuration conf;

  private final StramConf stramConf = new StramConf();

  public LogicalPlanConfiguration(Configuration conf)
  {
    this.conf = conf;
    this.addFromConfiguration(conf);
    this.pluginManager = DAGSetupPluginManager.getInstance(conf);
  }

  /**
   * Add operators from flattened name value pairs in configuration object.
   * @param conf
   */
  public final void addFromConfiguration(Configuration conf)
  {
    addFromProperties(toProperties(conf), null);
  }

  private static Properties toProperties(Configuration conf)
  {
    Iterator<Entry<String, String>> it = conf.iterator();
    Properties props = new Properties();
    while (it.hasNext()) {
      Entry<String, String> e = it.next();
      props.put(e.getKey(), e.getValue());
    }
    return props;
  }

  /**
   * Get the application alias name for an application class if one is available.
   * The path for the application class is specified as a parameter. If an alias was specified
   * in the configuration file or configuration opProps for the application class it is returned
   * otherwise null is returned.
   *
   * @param appPath The path of the application class in the jar
   * @return The alias name if one is available, null otherwise
   */
  public String getAppAlias(String appPath)
  {
    String appAlias;
    if (appPath.endsWith(CLASS_SUFFIX)) {
      appPath = appPath.replace("/", KEY_SEPARATOR).substring(0, appPath.length() - CLASS_SUFFIX.length());
    }
    appAlias = stramConf.appAliases.get(appPath);
    if (appAlias == null) {
      try {
        ApplicationAnnotation an = Thread.currentThread().getContextClassLoader().loadClass(appPath).getAnnotation(ApplicationAnnotation.class);
        if (an != null && StringUtils.isNotBlank(an.name())) {
          appAlias = an.name();
        }
      } catch (ClassNotFoundException e) {
        // ignore
      }
    }
    return appAlias;
  }

  public LogicalPlanConfiguration addFromJson(JSONObject json, Configuration conf) throws JSONException
  {
    Properties prop = new Properties();
    JSONArray operatorArray = json.getJSONArray("operators");
    for (int i = 0; i < operatorArray.length(); i++) {
      JSONObject operator = operatorArray.getJSONObject(i);
      String operatorPrefix = StreamingApplication.APEX_PREFIX + StramElement.OPERATOR.getValue() + KEY_SEPARATOR + operator.getString("name") + ".";
      prop.setProperty(operatorPrefix + "classname", operator.getString("class"));
      JSONObject operatorProperties = operator.optJSONObject("properties");
      if (operatorProperties != null) {
        String propertiesPrefix = operatorPrefix + StramElement.PROP.getValue() + KEY_SEPARATOR;
        @SuppressWarnings("unchecked")
        Iterator<String> iter = operatorProperties.keys();
        while (iter.hasNext()) {
          String key = iter.next();
          prop.setProperty(propertiesPrefix + key, operatorProperties.get(key).toString());
        }
      }
      JSONObject operatorAttributes = operator.optJSONObject("attributes");
      if (operatorAttributes != null) {
        String attributesPrefix = operatorPrefix + StramElement.ATTR.getValue() + KEY_SEPARATOR;
        @SuppressWarnings("unchecked")
        Iterator<String> iter = operatorAttributes.keys();
        while (iter.hasNext()) {
          String key = iter.next();
          prop.setProperty(attributesPrefix + key, operatorAttributes.getString(key));
        }
      }
      JSONArray portArray = operator.optJSONArray("ports");
      if (portArray != null) {
        String portsPrefix = operatorPrefix + StramElement.PORT.getValue() + KEY_SEPARATOR;
        for (int j = 0; j < portArray.length(); j++) {
          JSONObject port = portArray.getJSONObject(j);
          JSONObject portAttributes = port.optJSONObject("attributes");
          if (portAttributes != null) {
            String portAttributePrefix = portsPrefix + port.getString("name") + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR;
            @SuppressWarnings("unchecked")
            Iterator<String> iter = portAttributes.keys();
            while (iter.hasNext()) {
              String key = iter.next();
              prop.setProperty(portAttributePrefix + key, portAttributes.getString(key));
            }
          }
        }
      }
    }

    JSONObject appAttributes = json.optJSONObject("attributes");
    if (appAttributes != null) {
      String attributesPrefix = StreamingApplication.APEX_PREFIX + StramElement.ATTR.getValue() + KEY_SEPARATOR;
      @SuppressWarnings("unchecked")
      Iterator<String> iter = appAttributes.keys();
      while (iter.hasNext()) {
        String key = iter.next();
        prop.setProperty(attributesPrefix + key, appAttributes.getString(key));
      }
    }

    JSONArray streamArray = json.getJSONArray("streams");
    for (int i = 0; i < streamArray.length(); i++) {
      JSONObject stream = streamArray.getJSONObject(i);
      String name = stream.optString("name", "stream-" + i);
      String streamPrefix = StreamingApplication.APEX_PREFIX + StramElement.STREAM.getValue() + KEY_SEPARATOR + name + KEY_SEPARATOR;
      JSONObject source = stream.getJSONObject("source");
      prop.setProperty(streamPrefix + STREAM_SOURCE, source.getString("operatorName") + KEY_SEPARATOR + source.getString("portName"));
      JSONArray sinks = stream.getJSONArray("sinks");
      StringBuilder sinkPropertyValue = new StringBuilder();
      for (int j = 0; j < sinks.length(); j++) {
        if (sinkPropertyValue.length() > 0) {
          sinkPropertyValue.append(",");
        }
        JSONObject sink = sinks.getJSONObject(j);
        sinkPropertyValue.append(sink.getString("operatorName")).append(KEY_SEPARATOR).append(sink.getString("portName"));
      }
      prop.setProperty(streamPrefix + STREAM_SINKS, sinkPropertyValue.toString());
      String locality = stream.optString("locality", null);
      if (locality != null) {
        prop.setProperty(streamPrefix + STREAM_LOCALITY, locality);
      }
      JSONObject schema = stream.optJSONObject("schema");
      if (schema != null) {
        String schemaClass = schema.getString("class");
        prop.setProperty(streamPrefix + STREAM_SCHEMA, schemaClass);
      }
    }
    return addFromProperties(prop, conf);
  }


  /**
   * Read operator configurations from properties. The properties can be in any
   * random order, as long as they represent a consistent configuration in their
   * entirety.
   *
   * @param props
   * @param conf configuration for variable substitution and evaluation
   * @return Logical plan configuration.
   */
  public LogicalPlanConfiguration addFromProperties(Properties props, Configuration conf)
  {
    if (conf != null) {
      StramClientUtils.evalProperties(props, conf);
    }
    for (final String propertyName : props.stringPropertyNames()) {
      String propertyValue = props.getProperty(propertyName);
      this.properties.setProperty(propertyName, propertyValue);
      if (propertyName.startsWith(StreamingApplication.DT_PREFIX) ||
          propertyName.startsWith(StreamingApplication.APEX_PREFIX)) {
        String[] keyComps = propertyName.split(KEY_SEPARATOR_SPLIT_REGEX);
        parseStramPropertyTokens(keyComps, 1, propertyName, propertyValue, stramConf);
      }
    }
    return this;
  }

  /**
   * This method is used to parse an Apex property name.
   * @param keys The keys into which an Apex property is split into.
   * @param index The current index that the parser is on for processing the property name.
   * @param propertyName The original unsplit Apex property name.
   * @param propertyValue The value corresponding to the Apex property.
   * @param conf The current {@link Conf} to add properties to.
   */
  private void parseStramPropertyTokens(String[] keys, int index, String propertyName, String propertyValue, Conf conf)
  {
    if (index < keys.length) {
      String key = keys[index];
      StramElement element = getElement(key, conf);
      if ((element == null) && conf.ignoreUnknownChildren()) {
        return;
      }
      if ((element == StramElement.APPLICATION) || (element == StramElement.OPERATOR) || (element == StramElement.STREAM)
          || (element == StramElement.PORT) || (element == StramElement.INPUT_PORT) || (element == StramElement.OUTPUT_PORT)
          || (element == StramElement.TEMPLATE)) {
        parseAppElement(index, keys, element, conf, propertyName, propertyValue);
      } else if (element == StramElement.GATEWAY) {
        parseGatewayElement(element, conf, keys, index, propertyName, propertyValue);
      } else if ((element == StramElement.UNIFIER)) {
        parseUnifierElement(element, conf, keys, index, propertyName, propertyValue);
      } else if ((element == StramElement.ATTR) || ((element == null) && (conf.getDefaultChildElement() == StramElement.ATTR))) {
        parseAttributeElement(element, keys, index, conf, propertyValue, propertyName);
      } else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) {
        parsePropertyElement(element, keys, index, conf, propertyValue, propertyName);
      } else if (element != null) {
        conf.parseElement(element, keys, index, propertyValue);
      }
    }
  }

  /**
   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an app element.
   * @param element The current {@link StramElement} of the property being parsed.
   * @param keys The keys that the property being parsed was split into.
   * @param index The current key that the parser is on.
   * @param propertyValue The value associated with the property being parsed.
   * @param propertyName The complete unprocessed name of the property being parsed.
   */
  private void parseAppElement(int index, String[] keys, StramElement element, Conf conf1, String propertyName, String propertyValue)
  {
    if ((index + 1) < keys.length) {
      String name = keys[index + 1];
      Conf elConf = addConf(element, name, conf1);
      if (elConf != null) {
        parseStramPropertyTokens(keys, index + 2, propertyName, propertyValue, elConf);
      } else {
        LOG.error("Invalid configuration key: {}", propertyName);
      }
    } else {
      LOG.warn("Invalid configuration key: {}", propertyName);
    }
  }

  /**
   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a gateway element.
   * @param element The current {@link StramElement} of the property being parsed.
   * @param keys The keys that the property being parsed was split into.
   * @param index The current key that the parser is on.
   * @param propertyValue The value associated with the property being parsed.
   * @param propertyName The complete unprocessed name of the property being parsed.
   */
  private void parseGatewayElement(StramElement element, Conf conf1, String[] keys, int index, String propertyName, String propertyValue)
  {
    Conf elConf = addConf(element, null, conf1);
    if (elConf != null) {
      parseStramPropertyTokens(keys, index + 1, propertyName, propertyValue, elConf);
    } else {
      LOG.error("Invalid configuration key: {}", propertyName);
    }
  }

  /**
   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a unifier element.
   * @param element The current {@link StramElement} of the property being parsed.
   * @param keys The keys that the property being parsed was split into.
   * @param index The current key that the parser is on.
   * @param propertyValue The value associated with the property being parsed.
   * @param propertyName The complete unprocessed name of the property being parsed.
   */
  private void parseUnifierElement(StramElement element, Conf conf1, String[] keys, int index, String propertyName, String propertyValue)
  {
    Conf elConf = addConf(element, null, conf1);
    if (elConf != null) {
      parseStramPropertyTokens(keys, index + 1, propertyName, propertyValue, elConf);
    } else {
      LOG.error("Invalid configuration key: {}", propertyName);
    }
  }

  /**
   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an attribute.
   * @param element The current {@link StramElement} of the property being parsed.
   * @param keys The keys that the property being parsed was split into.
   * @param index The current key that the parser is on.
   * @param conf The current {@link Conf}.
   * @param propertyValue The value associated with the property being parsed.
   * @param propertyName The complete unprocessed name of the property being parsed.
   */
  private void parseAttributeElement(StramElement element, String[] keys, int index, Conf conf, String propertyValue, String propertyName)
  {
    String attributeName = AttributeParseUtils.getAttributeName(element, keys, index);
    if (element != StramElement.ATTR) {
      String expName = getCompleteKey(keys, 0, index) + KEY_SEPARATOR + StramElement.ATTR.getValue() + KEY_SEPARATOR + attributeName;
      LOG.warn("Referencing the attribute as {} instead of {} is deprecated!", getCompleteKey(keys, 0), expName);
    }
    if (conf.getConfElement().getStramElement() == null) {
      conf = addConf(StramElement.APPLICATION, WILDCARD, conf);
    }
    if (conf != null) {
      if (AttributeParseUtils.isSimpleAttributeName(attributeName)) {
        //The provided attribute name was a simple name
        if (!AttributeParseUtils.ALL_SIMPLE_ATTRIBUTE_NAMES.contains(attributeName)) {
          throw new ValidationException("Invalid attribute reference: " + getCompleteKey(keys, 0));
        }
        if (!conf.getConfElement().getAllChildAttributes().contains(attributeName)) {
          throw new ValidationException(attributeName + " is not defined for the " + conf.getConfElement().getStramElement() + " or any of its child configurations.");
        }
        if (conf.getConfElement().getAmbiguousAttributes().contains(attributeName)) {
          //If the attribute name is ambiguous at this configuration level we should tell the user.
          LOG.warn("The attribute " + attributeName + " is ambiguous when specified on an " + conf.getConfElement().getStramElement());
        }
        if (conf.getConfElement().getContextAttributes().contains(attributeName)) {
          @SuppressWarnings(value = "unchecked")
          Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(conf.getConfElement().getContextClass()).get(attributeName);
          conf.setAttribute(attr, propertyValue);
        } else {
          AttributeParseUtils.processAllConfsForAttribute(conf, attributeName, propertyValue);
        }
      } else {
        //This is a FQ attribute name
        Class<? extends Context> contextClass = AttributeParseUtils.getContainingContextClass(attributeName);
        //Convert to a simple name
        attributeName = AttributeParseUtils.getSimpleAttributeName(attributeName);
        if (!ContextUtils.CONTEXT_CLASS_TO_ATTRIBUTES.get(contextClass).contains(attributeName)) {
          throw new ValidationException(attributeName + " is not a valid attribute in " + contextClass.getCanonicalName());
        }
        ConfElement confWithAttr = ConfElement.CONTEXT_TO_CONF_ELEMENT.get(contextClass);
        conf = ConfElement.addConfs(conf, confWithAttr);
        @SuppressWarnings("unchecked")
        Attribute<Object> attr = (Attribute<Object>)ContextUtils.CONTEXT_TO_ATTRIBUTE_NAME_TO_ATTRIBUTE.get(confWithAttr.getContextClass()).get(attributeName);
        conf.setAttribute(attr, propertyValue);
      }
    } else {
      LOG.error("Invalid configuration key: {}", propertyName);
    }
  }

  /**
   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a prop.
   * @param element The current {@link StramElement} of the property being parsed.
   * @param keys The keys that the property being parsed was split into.
   * @param index The current key that the parser is on.
   * @param conf The current {@link Conf}.
   * @param propertyValue The value associated with the property being parsed.
   * @param propertyName The complete unprocessed name of the property being parsed.
   */
  private void parsePropertyElement(StramElement element, String[] keys, int index, Conf conf, String propertyValue, String propertyName)
  {
    // Currently opProps are only supported on operators and streams
    // Supporting current implementation where property can be directly specified under operator
    String prop;
    if (element == StramElement.PROP) {
      prop = getCompleteKey(keys, index + 1);
    } else {
      prop = getCompleteKey(keys, index);
    }
    if (prop != null) {
      conf.setProperty(prop, propertyValue);
    } else {
      LOG.warn("Invalid property specification, no property name specified for {}", propertyName);
    }
  }

  private StramElement getElement(String value, Conf conf)
  {
    StramElement element = null;
    try {
      element = StramElement.fromValue(value);
    } catch (IllegalArgumentException ie) {
      // fall through
    }
    // If element is not allowed treat it as text
    if ((element != null) && !conf.isAllowedChild(element)) {
      element = null;
    }
    return element;
  }

  /**
   * This constructs a string from the keys in the given keys array starting from
   * the start index inclusive until the end of the array.
   * @param keys The keys from which to construct a string.
   * @param start The token to start creating a string from.
   * @return The completed key.
   */
  private static String getCompleteKey(String[] keys, int start)
  {
    return getCompleteKey(keys, start, keys.length);
  }

  /**
   * This constructs a string from the keys in the given keys array starting from
   * the start index inclusive until the specified end index exclusive.
   * @param keys The keys from which to construct a string.
   * @param start The token to start creating a string from.
   * @param end 1 + the last index to include in the concatenation.
   * @return The completed key.
   */
  private static String getCompleteKey(String[] keys, int start, int end)
  {
    int length = 0;
    for (int keyIndex = 0; keyIndex < keys.length; keyIndex++) {
      length += keys[keyIndex].length();
    }

    StringBuilder sb = new StringBuilder(length);
    for (int i = start; i < end; ++i) {
      if (i > start) {
        sb.append(KEY_SEPARATOR);
      }
      sb.append(keys[i]);
    }
    return sb.toString();
  }

  /**
   * Return all opProps set on the builder.
   * Can be serialized to property file and used to read back into builder.
   * @return Properties
   */
  public Properties getProperties()
  {
    return this.properties;
  }

  public Map<String, String> getAppAliases()
  {
    return Collections.unmodifiableMap(this.stramConf.appAliases);
  }

  private LogicalPlan populateDAGAndValidate(LogicalPlanConfiguration tb, String appName)
  {
    LogicalPlan dag = new LogicalPlan();
    DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
    pluginManager.dispatch(SETUP, context);
    pluginManager.dispatch(PRE_POPULATE, context);
    tb.populateDAG(dag);
    // configure with embedded settings
    tb.prepareDAG(dag, null, appName);
    pluginManager.dispatch(POST_POPULATE, context);
    // configure with external settings
    prepareDAG(dag, null, appName);
    pluginManager.dispatch(PRE_VALIDATE, context);
    dag.validate();
    pluginManager.dispatch(POST_VALIDATE, context);
    pluginManager.dispatch(TEARDOWN, context);
    return dag;
  }

  public LogicalPlan createFromProperties(Properties props, String appName) throws IOException
  {
    // build DAG from properties
    LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
    tb.addFromProperties(props, conf);
    return populateDAGAndValidate(tb, appName);
  }

  public LogicalPlan createFromJson(JSONObject json, String appName) throws Exception
  {
    // build DAG from properties
    LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
    tb.addFromJson(json, conf);
    return populateDAGAndValidate(tb, appName);
  }

  public LogicalPlan createEmptyForRecovery(String appName)
  {
    // build DAG from properties
    LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
    return populateDAGAndValidate(tb, appName);
  }

  public LogicalPlan createFromStreamingApplication(StreamingApplication app, String appName)
  {
    LogicalPlan dag = new LogicalPlan();
    DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
    pluginManager.dispatch(SETUP, context);
    prepareDAG(dag, app, appName);
    pluginManager.dispatch(PRE_VALIDATE, context);
    dag.validate();
    pluginManager.dispatch(POST_VALIDATE, context);
    pluginManager.dispatch(TEARDOWN, context);
    return dag;
  }

  /**
   * Populate the logical plan structure from properties.
   * @param dag
   */
  public void populateDAG(LogicalPlan dag)
  {
    Configuration pconf = new Configuration(conf);
    for (final String propertyName : this.properties.stringPropertyNames()) {
      String propertyValue = this.properties.getProperty(propertyName);
      pconf.setIfUnset(propertyName, propertyValue);
    }

    AppConf appConf = this.stramConf.getChild(WILDCARD, StramElement.APPLICATION);
    if (appConf == null) {
      LOG.warn("Application configuration not found. Probably an empty app.");
      return;
    }

    Map<String, OperatorConf> operators = appConf.getChildren(StramElement.OPERATOR);

    Map<OperatorConf, GenericOperator> nodeMap = Maps.newHashMapWithExpectedSize(operators.size());
    // add all operators first
    for (Map.Entry<String, OperatorConf> nodeConfEntry : operators.entrySet()) {
      OperatorConf nodeConf = nodeConfEntry.getValue();
      if (!WILDCARD.equals(nodeConf.id)) {
        Class<? extends GenericOperator> nodeClass = StramUtils.classForName(nodeConf.getClassNameReqd(), GenericOperator.class);
        String optJson = nodeConf.getProperties().get(nodeClass.getName());
        GenericOperator operator = null;
        try {
          if (optJson != null) {
            // if there is a special key which is the class name, it means the operator is serialized in json format
            ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer();
            operator = mapper.readValue("{\"" + nodeClass.getName() + "\":" + optJson + "}", nodeClass);
            addOperator(dag, nodeConfEntry.getKey(), operator);
          } else {
            operator = addOperator(dag, nodeConfEntry.getKey(), nodeClass);
          }
          setOperatorProperties(operator, nodeConf.getProperties());
        } catch (IOException e) {
          throw new IllegalArgumentException("Error setting operator properties " + e.getMessage(), e);
        }
        nodeMap.put(nodeConf, operator);
      }
    }

    Map<String, StreamConf> streams = appConf.getChildren(StramElement.STREAM);

    // wire operators
    for (Map.Entry<String, StreamConf> streamConfEntry : streams.entrySet()) {
      StreamConf streamConf = streamConfEntry.getValue();
      DAG.StreamMeta sd = dag.addStream(streamConfEntry.getKey());
      sd.setLocality(streamConf.getLocality());

      String schemaClassName = streamConf.properties.getProperty(STREAM_SCHEMA);
      Class<?> schemaClass = null;
      if (schemaClassName != null) {
        schemaClass = StramUtils.classForName(schemaClassName, Object.class);
      }

      if (streamConf.sourceNode != null) {
        String portName = null;
        for (Map.Entry<String, StreamConf> e : streamConf.sourceNode.outputs.entrySet()) {
          if (e.getValue() == streamConf) {
            portName = e.getKey();
          }
        }
        GenericOperator sourceDecl = nodeMap.get(streamConf.sourceNode);
        Operators.PortMappingDescriptor sourcePortMap = new Operators.PortMappingDescriptor();
        Operators.describe(sourceDecl, sourcePortMap);
        sd.setSource(sourcePortMap.outputPorts.get(portName).component);

        if (schemaClass != null) {
          dag.setOutputPortAttribute(sourcePortMap.outputPorts.get(portName).component, PortContext.TUPLE_CLASS, schemaClass);
        }
      }

      for (OperatorConf targetNode : streamConf.targetNodes) {
        String portName = null;
        for (Map.Entry<String, StreamConf> e : targetNode.inputs.entrySet()) {
          if (e.getValue() == streamConf) {
            portName = e.getKey();
          }
        }
        GenericOperator targetDecl = nodeMap.get(targetNode);
        Operators.PortMappingDescriptor targetPortMap = new Operators.PortMappingDescriptor();
        Operators.describe(targetDecl, targetPortMap);
        sd.addSink(targetPortMap.inputPorts.get(portName).component);

        if (schemaClass != null) {
          dag.setInputPortAttribute(targetPortMap.inputPorts.get(portName).component, PortContext.TUPLE_CLASS, schemaClass);
        }
      }
    }
  }

  private GenericOperator addOperator(LogicalPlan dag, String name, GenericOperator operator)
  {
    if (operator instanceof Module) {
      dag.addModule(name, (Module)operator);
    } else if (operator instanceof Operator) {
      dag.addOperator(name, (Operator)operator);
    }
    return operator;
  }


  private GenericOperator addOperator(LogicalPlan dag, String name, Class<?> clazz)
  {
    if (Module.class.isAssignableFrom(clazz)) {
      return dag.addModule(name, (Class<Module>)clazz);
    } else if (Operator.class.isAssignableFrom(clazz)) {
      return dag.addOperator(name, (Class<Operator>)clazz);
    }
    return null;
  }

  /**
   * Populate the logical plan from the streaming application definition and configuration.
   * Configuration is resolved based on application alias, if any.
   * @param app The {@link StreamingApplication} to be run.
   * @param dag This will hold the {@link LogicalPlan} representation of the given {@link StreamingApplication}.
   * @param name The path of the application class in the jar.
   */
  public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name)
  {
    // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
    String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS);
    dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
    DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
    if (app != null) {
      pluginManager.dispatch(SETUP, context);
      pluginManager.dispatch(PRE_POPULATE, context);
      app.populateDAG(dag, conf);
      pluginManager.dispatch(POST_POPULATE, context);
    }
    pluginManager.dispatch(PRE_CONFIGURE, context);
    String appAlias = getAppAlias(name);
    String appName = appAlias == null ? name : appAlias;
    List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION);
    setApplicationConfiguration(dag, appConfs, app);
    if (dag.getAttributes().get(Context.DAGContext.APPLICATION_NAME) == null) {
      dag.setAttribute(Context.DAGContext.APPLICATION_NAME, appName);
    }

    // Expand the modules within the dag recursively
    setModuleProperties(dag, appName);
    flattenDAG(dag, conf);

    // inject external operator configuration
    setOperatorConfiguration(dag, appConfs, appName);
    setStreamConfiguration(dag, appConfs, appName);
    pluginManager.dispatch(POST_CONFIGURE, context);
  }

  private void flattenDAG(LogicalPlan dag, Configuration conf)
  {
    for (ModuleMeta moduleMeta : dag.getAllModules()) {
      moduleMeta.flattenModule(dag, conf);
    }
    dag.applyStreamLinks();
  }

  public static Properties readProperties(String filePath) throws IOException
  {
    InputStream is = new FileInputStream(filePath);
    Properties props = new Properties(System.getProperties());
    props.load(is);
    is.close();
    return props;
  }

  /**
   * Get the configuration opProps for the given operator.
   * These can be operator specific settings or settings from matching templates.
   * @param ow
   * @param appName
   * @return
   */
  public Map<String, String> getProperties(OperatorMeta ow, String appName)
  {
    List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION);
    List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR);
    return getProperties(getPropertyArgs(ow), opConfs, appName);
  }

  private Map<String, String> getApplicationProperties(List<AppConf> appConfs)
  {
    Map<String, String> appProps = Maps.newHashMap();
    // Apply the configurations in reverse order since the higher priority ones are at the beginning
    for (int i = appConfs.size() - 1; i >= 0; i--) {
      AppConf conf1 = appConfs.get(i);
      appProps.putAll(Maps.fromProperties(conf1.properties));
    }
    return appProps;
  }

  /**
   * Get the configuration opProps for the given operator.
   * These can be operator specific settings or settings from matching templates.
   * @param pa
   * @param opConfs
   * @param appName
   */
  private Map<String, String> getProperties(PropertyArgs pa, List<OperatorConf> opConfs, String appName)
  {
    Map<String, String> opProps = Maps.newHashMap();
    Map<String, TemplateConf> templates = stramConf.getChildren(StramElement.TEMPLATE);
    // list of all templates that match operator, ordered by priority
    if (!templates.isEmpty()) {
      TreeMap<Integer, TemplateConf> matchingTemplates = getMatchingTemplates(pa, appName, templates);
      if (matchingTemplates != null && !matchingTemplates.isEmpty()) {
        // combined map of prioritized template settings
        for (TemplateConf t : matchingTemplates.descendingMap().values()) {
          opProps.putAll(Maps.fromProperties(t.properties));
        }
      }

      List<TemplateConf> refTemplates = getDirectTemplates(opConfs, templates);
      for (TemplateConf t : refTemplates) {
        opProps.putAll(Maps.fromProperties(t.properties));
      }
    }
    // direct settings
    // Apply the configurations in reverse order since the higher priority ones are at the beginning
    for (int i = opConfs.size() - 1; i >= 0; i--) {
      Conf conf1 = opConfs.get(i);
      opProps.putAll(Maps.fromProperties(conf1.properties));
    }
    return opProps;
  }

  private List<TemplateConf> getDirectTemplates(List<OperatorConf> opConfs, Map<String, TemplateConf> templates)
  {
    List<TemplateConf> refTemplates = Lists.newArrayList();
    for (TemplateConf t : templates.values()) {
      for (OperatorConf opConf : opConfs) {
        if (t.id.equals(opConf.templateRef)) {
          refTemplates.add(t);
        }
      }
    }
    return refTemplates;
  }

  private static class PropertyArgs
  {
    String name;
    String className;

    public PropertyArgs(String name, String className)
    {
      this.name = name;
      this.className = className;
    }
  }

  private PropertyArgs getPropertyArgs(OperatorMeta om)
  {
    return new PropertyArgs(om.getName(), om.getGenericOperator().getClass().getName());
  }

  /**
   * Produce the collections of templates that apply for the given id.
   * @param pa
   * @param appName
   * @param templates
   * @return TreeMap<Integer, TemplateConf>
   */
  private TreeMap<Integer, TemplateConf> getMatchingTemplates(PropertyArgs pa, String appName, Map<String, TemplateConf> templates)
  {
    TreeMap<Integer, TemplateConf> tm = Maps.newTreeMap();
    for (TemplateConf t : templates.values()) {
      if ((t.idRegExp != null && pa.name.matches(t.idRegExp))) {
        tm.put(1, t);
      } else if (appName != null && t.appNameRegExp != null
          && appName.matches(t.appNameRegExp)) {
        tm.put(2, t);
      } else if (t.classNameRegExp != null
          && pa.className.matches(t.classNameRegExp)) {
        tm.put(3, t);
      }
    }
    return tm;
  }

  /**
   * Inject the configuration opProps into the operator instance.
   * @param operator
   * @param properties
   * @return Operator
   */
  public static GenericOperator setOperatorProperties(GenericOperator operator, Map<String, String> properties)
  {
    try {
      // populate custom opProps
      BeanUtils.populate(operator, properties);
      return operator;
    } catch (IllegalAccessException | InvocationTargetException e) {
      throw new IllegalArgumentException("Error setting operator properties", e);
    }
  }

  public static StreamingApplication setApplicationProperties(StreamingApplication application, Map<String, String> properties)
  {
    try {
      BeanUtils.populate(application, properties);
      return application;
    } catch (IllegalAccessException | InvocationTargetException e) {
      throw new IllegalArgumentException("Error setting application properties", e);
    }
  }

  public static BeanMap getObjectProperties(Object obj)
  {
    return new BeanMap(obj);
  }

  /**
   * Set any opProps from configuration on the operators in the DAG. This
   * method may throw unchecked exception if the configuration contains
   * opProps that are invalid for an operator.
   *
   * @param dag
   * @param applicationName
   */
  public void setOperatorProperties(LogicalPlan dag, String applicationName)
  {
    List<AppConf> appConfs = stramConf.getMatchingChildConf(applicationName, StramElement.APPLICATION);
    for (OperatorMeta ow : dag.getAllOperators()) {
      List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR);
      Map<String, String> opProps = getProperties(getPropertyArgs(ow), opConfs, applicationName);
      setOperatorProperties(ow.getGenericOperator(), opProps);
    }
  }

  /**
   * Set any properties from configuration on the modules in the DAG. This
   * method may throw unchecked exception if the configuration contains
   * properties that are invalid for a module.
   *
   * @param dag
   * @param applicationName
   */
  public void setModuleProperties(LogicalPlan dag, String applicationName)
  {
    List<AppConf> appConfs = stramConf.getMatchingChildConf(applicationName, StramElement.APPLICATION);
    setModuleConfiguration(dag, appConfs, applicationName);
  }

  /**
   * Set the application configuration.
   * @param dag
   * @param appName
   * @param app
   */
  public void setApplicationConfiguration(final LogicalPlan dag, String appName, StreamingApplication app)
  {
    List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION);
    setApplicationConfiguration(dag, appConfs, app);
  }

  private void setApplicationConfiguration(final LogicalPlan dag, List<AppConf> appConfs, StreamingApplication app)
  {
    setAttributes(appConfs, dag.getAttributes());
    if (app != null) {
      Map<String, String> appProps = getApplicationProperties(appConfs);
      setApplicationProperties(app, appProps);
    }
  }

  private void setOperatorConfiguration(final LogicalPlan dag, List<AppConf> appConfs, String appName)
  {
    for (final OperatorMeta ow : dag.getAllOperators()) {
      List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR);

      // Set the operator attributes
      setAttributes(opConfs, ow.getAttributes());
      // Set the operator opProps
      Map<String, String> opProps = getProperties(getPropertyArgs(ow), opConfs, appName);
      setOperatorProperties(ow.getOperator(), opProps);

      // Set the port attributes
      for (Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : ow.getInputStreams().entrySet()) {
        final InputPortMeta im = entry.getKey();
        List<PortConf> inPortConfs = getMatchingChildConf(opConfs, im.getPortName(), StramElement.INPUT_PORT);
        // Add the generic port attributes as well
        List<PortConf> portConfs = getMatchingChildConf(opConfs, im.getPortName(), StramElement.PORT);
        inPortConfs.addAll(portConfs);
        setAttributes(inPortConfs, im.getAttributes());
      }

      for (Entry<LogicalPlan.OutputPortMeta, LogicalPlan.StreamMeta> entry : ow.getOutputStreams().entrySet()) {
        final OutputPortMeta om = entry.getKey();
        List<PortConf> outPortConfs = getMatchingChildConf(opConfs, om.getPortName(), StramElement.OUTPUT_PORT);
        // Add the generic port attributes as well
        List<PortConf> portConfs = getMatchingChildConf(opConfs, om.getPortName(), StramElement.PORT);
        outPortConfs.addAll(portConfs);
        setAttributes(outPortConfs, om.getAttributes());
        List<OperatorConf> unifConfs = getMatchingChildConf(outPortConfs, null, StramElement.UNIFIER);
        if (unifConfs.size() != 0) {
          setAttributes(unifConfs, om.getUnifierMeta().getAttributes());
        }
      }
      ow.populateAggregatorMeta();
    }
  }

  private void setModuleConfiguration(final LogicalPlan dag, List<AppConf> appConfs, String appName)
  {
    for (final ModuleMeta mw : dag.getAllModules()) {
      List<OperatorConf> opConfs = getMatchingChildConf(appConfs, mw.getName(), StramElement.OPERATOR);
      Map<String, String> opProps = getProperties(getPropertyArgs(mw), opConfs, appName);
      setOperatorProperties(mw.getGenericOperator(), opProps);
    }
  }

  private void setStreamConfiguration(LogicalPlan dag, List<AppConf> appConfs, String appAlias)
  {
    for (StreamMeta sm : dag.getAllStreams()) {
      List<StreamConf> smConfs = getMatchingChildConf(appConfs, sm.getName(), StramElement.STREAM);
      for (StreamConf smConf : smConfs) {
        DAG.Locality locality = smConf.getLocality();
        if (locality != null) {
          sm.setLocality(locality);
          break;
        }
      }
    }
  }

  private void setAttributes(List<? extends Conf> confs, Attribute.AttributeMap attributeMap)
  {
    Set<Attribute<Object>> processedAttributes = Sets.newHashSet();
    //json object codec for complex attributes
    JSONObject2String jsonCodec = new JSONObject2String();
    if (confs.size() > 0) {
      for (Conf conf1 : confs) {
        for (Map.Entry<Attribute<Object>, String> e : conf1.attributes.entrySet()) {
          Attribute<Object> attribute = e.getKey();
          if (attribute.codec == null) {
            String msg = String.format("Attribute does not support property configuration: %s %s", attribute.name, e.getValue());
            throw new UnsupportedOperationException(msg);
          } else {
            if (processedAttributes.add(attribute)) {
              String val = e.getValue();
              if (val.trim().charAt(0) == '{' && !(attribute.codec instanceof JsonStringCodec)) {
                // complex attribute in json
                attributeMap.put(attribute, jsonCodec.fromString(val));
              } else {
                attributeMap.put(attribute, attribute.codec.fromString(val));
              }
            }
          }
        }
      }
    }
  }

}
