/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.storm.task;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.json.simple.JSONAware;

import org.apache.heron.api.generated.TopologyAPI;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.tuple.Fields;

// import org.apache.storm.generated.ComponentCommon;
// import org.apache.storm.generated.GlobalStreamId;
// import org.apache.storm.generated.Grouping;
// import org.apache.storm.utils.ThriftTopologyUtils;

public class GeneralTopologyContext implements JSONAware {
  private org.apache.heron.api.topology.GeneralTopologyContext delegate;

  @SuppressWarnings("rawtypes")
  public GeneralTopologyContext(StormTopology topology, Map stormConf,
                                Map<Integer, String> taskToComponent,
                                Map<String, List<Integer>> componentToSortedTasks,
                                Map<String, Map<String, Fields>> componentToStreamToFields,
                                String stormId) {
    throw new RuntimeException("GeneralTopologyContext should never be initiated this way");
  }

  public GeneralTopologyContext(org.apache.heron.api.topology.GeneralTopologyContext delegate) {
    this.delegate = delegate;
  }

  /**
   * Gets the unique id assigned to this topology. The id is the storm name with a
   * unique nonce appended to it.
   *
   * @return the storm id
   */
  public String getStormId() {
    return delegate.getTopologyId();
  }

  /**
   * Gets the Thrift object representing the topology.
   *
   * @return the Thrift definition representing the topology
   */
  @SuppressWarnings("deprecation")
  public StormTopology getRawTopology() {

    StormTopology stormTopology = new StormTopology();
    Map<String, SpoutSpec> spouts = new HashMap<>();
    for (TopologyAPI.Spout spout : this.delegate.getRawTopology().getSpoutsList()) {
      spouts.put(spout.getComp().getName(), new SpoutSpec(spout));
    }

    Map<String, Bolt> bolts = new HashMap<>();
    for (TopologyAPI.Bolt bolt : this.delegate.getRawTopology().getBoltsList()) {
      bolts.put(bolt.getComp().getName(), new Bolt(bolt));
    }

    stormTopology.set_spouts(spouts);
    stormTopology.set_bolts(bolts);
    return stormTopology;
  }


  /**
   * Gets the component id for the specified task id. The component id maps
   * to a component id specified for a Spout or Bolt in the topology definition.
   *
   * @param taskId the task id
   * @return the component id for the input task id
   */
  public String getComponentId(int taskId) {
    return delegate.getComponentId(taskId);
  }

  /**
   * Gets the set of streams declared for the specified component.
   */
  public Set<String> getComponentStreams(String componentId) {
    return delegate.getComponentStreams(componentId);
  }

  /**
   * Gets the task ids allocated for the given component id. The task ids are
   * always returned in ascending order.
   */
  public List<Integer> getComponentTasks(String componentId) {
    return delegate.getComponentTasks(componentId);
  }

  /**
   * Gets the declared output fields for the specified component/stream.
   */
  public Fields getComponentOutputFields(String componentId, String streamId) {
    return new Fields(delegate.getComponentOutputFields(componentId, streamId));
  }

  /**
   * Gets the declared output fields for the specified global stream id.
   */
  /*
  public Fields getComponentOutputFields(GlobalStreamId id) {
  }
  */

  /**
   * Gets the declared inputs to the specified component.
   *
   * @return A map from subscribed component/stream to the grouping subscribed with.
   */
  /*
  public Map<GlobalStreamId, Grouping> getSources(String componentId) {
    return getComponentCommon(componentId).get_inputs();
  }
  */

  /**
   * Gets information about who is consuming the outputs of the specified component,
   * and how.
   *
   * @return Map from stream id to component id to the Grouping used.
   */
  /*
    public Map<String, Map<String, Grouping>> getTargets(String componentId) {
        Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
        for(String otherComponentId: getComponentIds()) {
            Map<GlobalStreamId, Grouping> inputs =
              getComponentCommon(otherComponentId).get_inputs();
            for(GlobalStreamId id: inputs.keySet()) {
                if(id.get_componentId().equals(componentId)) {
                    Map<String, Grouping> curr = ret.get(id.get_streamId());
                    if(curr==null) curr = new HashMap<String, Grouping>();
                    curr.put(otherComponentId, inputs.get(id));
                    ret.put(id.get_streamId(), curr);
                }
            }
        }
        return ret;
    }
  */
  @Override
  public String toJSONString() {
    throw new RuntimeException("toJSONString not implemented");
  }

  /**
   * Gets a map from task id to component id.
   */
  public Map<Integer, String> getTaskToComponent() {
    return delegate.getTaskToComponent();
  }

  /**
   * Gets a list of all component ids in this topology
   */
  public Set<String> getComponentIds() {
    return delegate.getComponentIds();
  }

  /*
    public ComponentCommon getComponentCommon(String componentId) {
        return ThriftTopologyUtils.getComponentCommon(getRawTopology(), componentId);
    }
  */

  /*
    public int maxTopologyMessageTimeout() {
        Integer max = Utils.getInteger(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
        for(String spout: getRawTopology().get_spouts().keySet()) {
            ComponentCommon common = getComponentCommon(spout);
            String jsonConf = common.get_json_conf();
            if(jsonConf!=null) {
                Map conf = (Map) JSONValue.parse(jsonConf);
                Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
                if(comp!=null) {
                    max = Math.max(Utils.getInteger(comp), max);
                }
            }
        }
        return max;
    }

  */
}
