package org.apache.helix.rest.server.resources.helix;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;

import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.api.exceptions.HelixConflictException;
import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.manager.zk.ZKUtil;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ControllerHistory;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
import org.apache.helix.model.RESTConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.rest.common.HttpConstants;
import org.apache.helix.rest.server.json.cluster.ClusterTopology;
import org.apache.helix.rest.server.service.ClusterService;
import org.apache.helix.rest.server.service.ClusterServiceImpl;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Path("/clusters")
public class ClusterAccessor extends AbstractHelixResource {
  private static Logger LOG = LoggerFactory.getLogger(ClusterAccessor.class.getName());

  public enum ClusterProperties {
    controller,
    instances,
    liveInstances,
    resources,
    paused,
    maintenance,
    messages,
    stateModelDefinitions,
    clusters,
    maintenanceSignal,
    maintenanceHistory,
    clusterName
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  public Response getClusters() {
    HelixAdmin helixAdmin = getHelixAdmin();
    List<String> clusters = helixAdmin.getClusters();

    Map<String, List<String>> dataMap = new HashMap<>();
    dataMap.put(ClusterProperties.clusters.name(), clusters);

    return JSONRepresentation(dataMap);
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}")
  public Response getClusterInfo(@PathParam("clusterId") String clusterId) {
    if (!doesClusterExist(clusterId)) {
      return notFound();
    }

    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();

    Map<String, Object> clusterInfo = new HashMap<>();
    clusterInfo.put(Properties.id.name(), clusterId);

    LiveInstance controller = dataAccessor.getProperty(keyBuilder.controllerLeader());
    if (controller != null) {
      clusterInfo.put(ClusterProperties.controller.name(), controller.getInstanceName());
    } else {
      clusterInfo.put(ClusterProperties.controller.name(), "No Lead Controller!");
    }

    boolean paused = dataAccessor.getBaseDataAccessor()
        .exists(keyBuilder.pause().getPath(), AccessOption.PERSISTENT);
    clusterInfo.put(ClusterProperties.paused.name(), paused);
    boolean maintenance = getHelixAdmin().isInMaintenanceMode(clusterId);
    clusterInfo.put(ClusterProperties.maintenance.name(), maintenance);

    List<String> idealStates = dataAccessor.getChildNames(keyBuilder.idealStates());
    clusterInfo.put(ClusterProperties.resources.name(), idealStates);
    List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
    clusterInfo.put(ClusterProperties.instances.name(), instances);
    List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances());
    clusterInfo.put(ClusterProperties.liveInstances.name(), liveInstances);

    return JSONRepresentation(clusterInfo);
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @PUT
  @Path("{clusterId}")
  public Response createCluster(@PathParam("clusterId") String clusterId,
      @DefaultValue("false") @QueryParam("recreate") String recreate,
      @DefaultValue("false") @QueryParam("addCloudConfig") String addCloudConfig,
      String cloudConfigManifest) {

    boolean recreateIfExists = Boolean.parseBoolean(recreate);
    boolean cloudConfigIncluded = Boolean.parseBoolean(addCloudConfig);

    ClusterSetup clusterSetup = getClusterSetup();

    CloudConfig cloudConfig = null;
    if (cloudConfigIncluded) {
      ZNRecord record;
      try {
        record = toZNRecord(cloudConfigManifest);
        cloudConfig = new CloudConfig.Builder(record).build();
      } catch (IOException | HelixException e) {
        String errMsg = "Failed to generate a valid CloudConfig from " + cloudConfigManifest;
        LOG.error(errMsg, e);
        return badRequest(errMsg + " Exception: " + e.getMessage());
      }
    }

    try {
      clusterSetup.addCluster(clusterId, recreateIfExists, cloudConfig);
    } catch (Exception ex) {
      LOG.error("Failed to create cluster {}. Exception: {}.", clusterId, ex);
      return serverError(ex);
    }
    return created();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @DELETE
  @Path("{clusterId}")
  public Response deleteCluster(@PathParam("clusterId") String clusterId) {
    ClusterSetup clusterSetup = getClusterSetup();

    try {
      clusterSetup.deleteCluster(clusterId);
    } catch (HelixException ex) {
      LOG.info("Failed to delete cluster {}, cluster is still in use. Exception: {}.", clusterId,
          ex);
      return badRequest(ex.getMessage());
    } catch (Exception ex) {
      LOG.error("Failed to delete cluster {}. Exception: {}.", clusterId, ex);
      return serverError(ex);
    }

    return OK();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @POST
  @Path("{clusterId}")
  public Response updateCluster(@PathParam("clusterId") String clusterId,
      @QueryParam("command") String commandStr, @QueryParam("superCluster") String superCluster,
      @QueryParam("duration") Long duration, String content) {
    Command command;
    try {
      command = getCommand(commandStr);
    } catch (HelixException ex) {
      return badRequest(ex.getMessage());
    }

    ClusterSetup clusterSetup = getClusterSetup();
    HelixAdmin helixAdmin = getHelixAdmin();

    switch (command) {
      case activate:
        if (superCluster == null) {
          return badRequest("Super Cluster name is missing!");
        }
        try {
          clusterSetup.activateCluster(clusterId, superCluster, true);
        } catch (Exception ex) {
          LOG.error("Failed to add cluster {} to super cluster {}.", clusterId, superCluster);
          return serverError(ex);
        }
        break;

      case expand:
        try {
          clusterSetup.expandCluster(clusterId);
        } catch (Exception ex) {
          LOG.error("Failed to expand cluster {}.", clusterId);
          return serverError(ex);
        }
        break;

      case enable:
        try {
          helixAdmin.enableCluster(clusterId, true);
        } catch (Exception ex) {
          LOG.error("Failed to enable cluster {}.", clusterId);
          return serverError(ex);
        }
        break;

      case disable:
        try {
          helixAdmin.enableCluster(clusterId, false);
        } catch (Exception ex) {
          LOG.error("Failed to disable cluster {}.", clusterId);
          return serverError(ex);
        }
        break;

      case enableMaintenanceMode:
      case disableMaintenanceMode:
        // Try to parse the content string. If parseable, use it as a KV mapping. Otherwise, treat it
        // as a REASON String
        Map<String, String> customFieldsMap = null;
        try {
          // Try to parse content
          customFieldsMap =
              OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, String>>() {
              });
          // content is given as a KV mapping. Nullify content
          content = null;
        } catch (Exception e) {
          // NOP
        }
        helixAdmin
            .manuallyEnableMaintenanceMode(clusterId, command == Command.enableMaintenanceMode,
                content, customFieldsMap);
        break;
      case enableWagedRebalanceForAllResources:
        // Enable WAGED rebalance for all resources in the cluster
        List<String> resources = helixAdmin.getResourcesInCluster(clusterId);
        try {
          helixAdmin.enableWagedRebalance(clusterId, resources);
        } catch (HelixException e) {
          return badRequest(e.getMessage());
        }
        break;
      case purgeOfflineParticipants:
        if (duration == null || duration < 0) {
          helixAdmin
              .purgeOfflineInstances(clusterId, ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET);
        } else {
          helixAdmin.purgeOfflineInstances(clusterId, duration);
        }
        break;
      default:
        return badRequest("Unsupported command {}." + command);
    }

    return OK();
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/management-mode")
  public Response getClusterManagementMode(@PathParam("clusterId") String clusterId,
      @QueryParam("showDetails") boolean showDetails) {
    ClusterManagementMode mode = getHelixAdmin().getClusterManagementMode(clusterId);
    if (mode == null) {
      return notFound("Cluster " + clusterId + " is not in management mode");
    }

    Map<String, Object> responseMap = new HashMap<>();
    responseMap.put("cluster", clusterId);
    responseMap.put("mode", mode.getMode());
    responseMap.put("status", mode.getStatus());
    if (showDetails) {
      // To show details, query participants that are in progress to management mode.
      responseMap.put("details", getManagementModeDetails(clusterId, mode));
    }

    return JSONRepresentation(responseMap);
  }

  private Map<String, Object> getManagementModeDetails(String clusterId,
      ClusterManagementMode mode) {
    Map<String, Object> details = new HashMap<>();
    Map<String, Object> participantDetails = new HashMap<>();
    ClusterManagementMode.Status status = mode.getStatus();
    details.put("cluster", ImmutableMap.of("cluster", clusterId, "status", status.name()));

    boolean hasPendingST = false;
    Set<String> liveInstancesInProgress = new HashSet<>();

    if (ClusterManagementMode.Status.IN_PROGRESS.equals(status)) {
      HelixDataAccessor accessor = getDataAccssor(clusterId);
      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
      List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
      BaseDataAccessor<ZNRecord> baseAccessor = accessor.getBaseDataAccessor();

      if (ClusterManagementMode.Type.CLUSTER_FREEZE.equals(mode.getMode())) {
        // Entering cluster freeze mode, check live instance freeze status and pending ST
        for (LiveInstance liveInstance : liveInstances) {
          String instanceName = liveInstance.getInstanceName();
          if (!LiveInstance.LiveInstanceStatus.FROZEN.equals(liveInstance.getStatus())) {
            liveInstancesInProgress.add(instanceName);
          }
          Stat stat = baseAccessor
              .getStat(keyBuilder.messages(instanceName).getPath(), AccessOption.PERSISTENT);
          if (stat.getNumChildren() > 0) {
            hasPendingST = true;
            liveInstancesInProgress.add(instanceName);
          }
        }
      } else if (ClusterManagementMode.Type.NORMAL.equals(mode.getMode())) {
        // Exiting freeze mode, check live instance unfreeze status
        for (LiveInstance liveInstance : liveInstances) {
          if (LiveInstance.LiveInstanceStatus.FROZEN.equals(liveInstance.getStatus())) {
            liveInstancesInProgress.add(liveInstance.getInstanceName());
          }
        }
      }
    }

    participantDetails.put("status", status.name());
    participantDetails.put("liveInstancesInProgress", liveInstancesInProgress);
    if (ClusterManagementMode.Type.CLUSTER_FREEZE.equals(mode.getMode())) {
      // Add pending ST result for cluster freeze mode
      participantDetails.put("hasPendingStateTransition", hasPendingST);
    }

    details.put(ClusterProperties.liveInstances.name(), participantDetails);
    return details;
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @POST
  @Path("{clusterId}/management-mode")
  public Response updateClusterManagementMode(@PathParam("clusterId") String clusterId,
      @DefaultValue("{}") String content) {
    ClusterManagementModeRequest request;
    try {
      request = OBJECT_MAPPER.readerFor(ClusterManagementModeRequest.class).readValue(content);
    } catch (JsonProcessingException e) {
      LOG.warn("Failed to parse json string: {}", content, e);
      return badRequest("Invalid payload json body: " + content);
    }

    // Need to add cluster name
    request = ClusterManagementModeRequest.newBuilder()
        .withClusterName(clusterId)
        .withMode(request.getMode())
        .withCancelPendingST(request.isCancelPendingST())
        .withReason(request.getReason())
        .build();

    try {
      getHelixAdmin().setClusterManagementMode(request);
    } catch (HelixConflictException e) {
      return Response.status(Response.Status.CONFLICT).entity(e.getMessage()).build();
    } catch (HelixException e) {
      return serverError(e.getMessage());
    }

    return JSONRepresentation(ImmutableMap.of("acknowledged", true));
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/configs")
  public Response getClusterConfig(@PathParam("clusterId") String clusterId) {
    ConfigAccessor accessor = getConfigAccessor();
    ClusterConfig config = null;
    try {
      config = accessor.getClusterConfig(clusterId);
    } catch (HelixException ex) {
      // cluster not found.
      LOG.info("Failed to get cluster config for cluster {}, cluster not found. Exception: {}.",
          clusterId, ex);
    } catch (Exception ex) {
      LOG.error("Failed to get cluster config for cluster {}. Exception: {}", clusterId, ex);
      return serverError(ex);
    }
    if (config == null) {
      return notFound();
    }
    return JSONRepresentation(config.getRecord());
  }


  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @PUT
  @Path("{clusterId}/customized-state-config")
  public Response addCustomizedStateConfig(@PathParam("clusterId") String clusterId,
      String content) {
    if (!doesClusterExist(clusterId)) {
      return notFound(String.format("Cluster %s does not exist", clusterId));
    }

    HelixAdmin admin = getHelixAdmin();
    ZNRecord record;
    try {
      record = toZNRecord(content);
    } catch (IOException e) {
      return badRequest("Input is not a vaild ZNRecord!");
    }

    try {
      CustomizedStateConfig customizedStateConfig =
          new CustomizedStateConfig.Builder(record).build();
      admin.addCustomizedStateConfig(clusterId, customizedStateConfig);
    } catch (Exception ex) {
      LOG.error("Cannot add CustomizedStateConfig to cluster: {} Exception: {}",
          clusterId, ex);
      return serverError(ex);
    }

    return OK();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @DELETE
  @Path("{clusterId}/customized-state-config")
  public Response removeCustomizedStateConfig(@PathParam("clusterId") String clusterId) {
    if (!doesClusterExist(clusterId)) {
      return notFound(String.format("Cluster %s does not exist", clusterId));
    }

    HelixAdmin admin = getHelixAdmin();
    try {
      admin.removeCustomizedStateConfig(clusterId);
    } catch (Exception ex) {
      LOG.error(
          "Cannot remove CustomizedStateConfig from cluster: {}, Exception: {}",
          clusterId, ex);
      return serverError(ex);
    }

    return OK();
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/customized-state-config")
  public Response getCustomizedStateConfig(@PathParam("clusterId") String clusterId) {
    if (!doesClusterExist(clusterId)) {
      return notFound(String.format("Cluster %s does not exist", clusterId));
    }

    ConfigAccessor configAccessor = getConfigAccessor();
    CustomizedStateConfig customizedStateConfig =
        configAccessor.getCustomizedStateConfig(clusterId);

    if (customizedStateConfig != null) {
      return JSONRepresentation(customizedStateConfig.getRecord());
    }

    return notFound();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @POST
  @Path("{clusterId}/customized-state-config")
  public Response updateCustomizedStateConfig(@PathParam("clusterId") String clusterId,
      @QueryParam("command") String commandStr, @QueryParam("type") String type) {
    if (!doesClusterExist(clusterId)) {
      return notFound(String.format("Cluster %s does not exist", clusterId));
    }

    Command command;
    if (commandStr == null || commandStr.isEmpty()) {
      command = Command.add; // Default behavior
    } else {
      try {
        command = getCommand(commandStr);
      } catch (HelixException ex) {
        return badRequest(ex.getMessage());
      }
    }

    HelixAdmin admin = getHelixAdmin();

    try {
      switch (command) {
      case delete:
        admin.removeTypeFromCustomizedStateConfig(clusterId, type);
        break;
      case add:
        admin.addTypeToCustomizedStateConfig(clusterId, type);
        break;
      default:
        return badRequest("Unsupported command " + commandStr);
      }
    } catch (Exception ex) {
      LOG.error("Failed to {} CustomizedStateConfig for cluster {} new type: {}, Exception: {}", command, clusterId, type, ex);
      return serverError(ex);
    }
    return OK();
  }


  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/topology")
  public Response getClusterTopology(@PathParam("clusterId") String clusterId) throws IOException {
    //TODO reduce the GC by dependency injection
    ClusterService clusterService =
        new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
    ObjectMapper objectMapper = new ObjectMapper();
    ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);

    return OK(objectMapper.writeValueAsString(clusterTopology));
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/topologymap")
  public Response getClusterTopologyMap(@PathParam("clusterId") String clusterId) {
    HelixAdmin admin = getHelixAdmin();
    Map<String, List<String>> topologyMap;
    try {
      topologyMap = admin.getClusterTopology(clusterId).getTopologyMap();
    } catch (HelixException ex) {
      return badRequest(ex.getMessage());
    }
    return JSONRepresentation(topologyMap);
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/faultzonemap")
  public Response getClusterFaultZoneMap(@PathParam("clusterId") String clusterId) {
    HelixAdmin admin = getHelixAdmin();
    Map<String, List<String>> faultZoneMap;
    try {
      faultZoneMap = admin.getClusterTopology(clusterId).getFaultZoneMap();
    } catch (HelixException ex) {
      return badRequest(ex.getMessage());
    }
    return JSONRepresentation(faultZoneMap);
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @POST
  @Path("{clusterId}/configs")
  public Response updateClusterConfig(@PathParam("clusterId") String clusterId,
      @QueryParam("command") String commandStr, String content) {
    Command command;
    try {
      command = getCommand(commandStr);
    } catch (HelixException ex) {
      return badRequest(ex.getMessage());
    }

    ZNRecord record;
    try {
      record = toZNRecord(content);
    } catch (IOException e) {
      LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e);
      return badRequest("Input is not a valid ZNRecord!");
    }

    if (!clusterId.equals(record.getId())) {
      return badRequest("ID does not match the cluster name in input!");
    }

    ClusterConfig config = new ClusterConfig(record);
    ConfigAccessor configAccessor = getConfigAccessor();
    try {
      switch (command) {
        case update:
          configAccessor.updateClusterConfig(clusterId, config);
          break;
        case delete: {
          HelixConfigScope clusterScope =
              new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
                  .forCluster(clusterId).build();
          configAccessor.remove(clusterScope, config.getRecord());
        }
        break;

        default:
          return badRequest("Unsupported command " + commandStr);
      }
    } catch (HelixException ex) {
      return notFound(ex.getMessage());
    } catch (Exception ex) {
      LOG
          .error("Failed to {} cluster config, cluster {}, new config: {}. Exception: {}.", command,
              clusterId, content, ex);
      return serverError(ex);
    }
    return OK();
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/controller")
  public Response getClusterController(@PathParam("clusterId") String clusterId) {
    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
    Map<String, Object> controllerInfo = new HashMap<>();
    controllerInfo.put(Properties.id.name(), clusterId);

    LiveInstance leader = dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeader());
    if (leader != null) {
      controllerInfo.put(ClusterProperties.controller.name(), leader.getInstanceName());
      controllerInfo.putAll(leader.getRecord().getSimpleFields());
    } else {
      controllerInfo.put(ClusterProperties.controller.name(), "No Lead Controller!");
    }

    return JSONRepresentation(controllerInfo);
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/controller/history")
  public Response getClusterControllerLeadershipHistory(@PathParam("clusterId") String clusterId) {
    return JSONRepresentation(
        getControllerHistory(clusterId, ControllerHistory.HistoryType.CONTROLLER_LEADERSHIP));
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/controller/maintenanceHistory")
  public Response getClusterMaintenanceHistory(@PathParam("clusterId") String clusterId) {
    return JSONRepresentation(
        getControllerHistory(clusterId, ControllerHistory.HistoryType.MAINTENANCE));
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/controller/maintenanceSignal")
  public Response getClusterMaintenanceSignal(@PathParam("clusterId") String clusterId) {
    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
    MaintenanceSignal maintenanceSignal =
        dataAccessor.getProperty(dataAccessor.keyBuilder().maintenance());
    if (maintenanceSignal != null) {
      Map<String, String> maintenanceInfo = maintenanceSignal.getRecord().getSimpleFields();
      maintenanceInfo.put(ClusterProperties.clusterName.name(), clusterId);
      return JSONRepresentation(maintenanceInfo);
    }
    return notFound(String.format("Cluster %s is not in maintenance mode!", clusterId));
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/controller/messages")
  public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId) {
    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);

    Map<String, Object> controllerMessages = new HashMap<>();
    controllerMessages.put(Properties.id.name(), clusterId);

    List<String> messages =
        dataAccessor.getChildNames(dataAccessor.keyBuilder().controllerMessages());
    controllerMessages.put(ClusterProperties.messages.name(), messages);
    controllerMessages.put(Properties.count.name(), messages.size());

    return JSONRepresentation(controllerMessages);
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/controller/messages/{messageId}")
  public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId,
      @PathParam("messageId") String messageId) {
    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
    Message message =
        dataAccessor.getProperty(dataAccessor.keyBuilder().controllerMessage(messageId));
    return JSONRepresentation(message.getRecord());
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/statemodeldefs")
  public Response getClusterStateModelDefinitions(@PathParam("clusterId") String clusterId) {
    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
    List<String> stateModelDefs =
        dataAccessor.getChildNames(dataAccessor.keyBuilder().stateModelDefs());

    Map<String, Object> clusterStateModelDefs = new HashMap<>();
    clusterStateModelDefs.put(Properties.id.name(), clusterId);
    clusterStateModelDefs.put(ClusterProperties.stateModelDefinitions.name(), stateModelDefs);

    return JSONRepresentation(clusterStateModelDefs);
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/statemodeldefs/{statemodel}")
  public Response getClusterStateModelDefinition(@PathParam("clusterId") String clusterId,
      @PathParam("statemodel") String statemodel) {
    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
    StateModelDefinition stateModelDef =
        dataAccessor.getProperty(dataAccessor.keyBuilder().stateModelDef(statemodel));

    if (stateModelDef == null) {
      return badRequest("Statemodel not found!");
    }
    return JSONRepresentation(stateModelDef.getRecord());
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @PUT
  @Path("{clusterId}/statemodeldefs/{statemodel}")
  public Response createClusterStateModelDefinition(@PathParam("clusterId") String clusterId,
      @PathParam("statemodel") String statemodel, String content) {
    ZNRecord record;
    try {
      record = toZNRecord(content);
    } catch (IOException e) {
      LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e);
      return badRequest("Input is not a valid ZNRecord!");
    }
    RealmAwareZkClient zkClient = getRealmAwareZkClient();
    String path = PropertyPathBuilder.stateModelDef(clusterId);
    try {
      ZKUtil.createChildren(zkClient, path, record);
    } catch (Exception e) {
      LOG.error("Failed to create zk node with path {}. Exception: {}", path, e);
      return badRequest("Failed to create a Znode for stateModel! " + e);
    }

    return OK();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @POST
  @Path("{clusterId}/statemodeldefs/{statemodel}")
  public Response setClusterStateModelDefinition(@PathParam("clusterId") String clusterId,
      @PathParam("statemodel") String statemodel, String content) {
    ZNRecord record;
    try {
      record = toZNRecord(content);
    } catch (IOException e) {
      LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e);
      return badRequest("Input is not a valid ZNRecord!");
    }

    StateModelDefinition stateModelDefinition = new StateModelDefinition(record);
    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);

    PropertyKey key = dataAccessor.keyBuilder().stateModelDef(stateModelDefinition.getId());
    boolean retcode = true;
    try {
      retcode = dataAccessor.setProperty(key, stateModelDefinition);
    } catch (Exception e) {
      LOG.error("Failed to set StateModelDefinition key: {}. Exception: {}.", key, e);
      return badRequest("Failed to set the content " + content);
    }

    return OK();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @DELETE
  @Path("{clusterId}/statemodeldefs/{statemodel}")
  public Response removeClusterStateModelDefinition(@PathParam("clusterId") String clusterId,
      @PathParam("statemodel") String statemodel) {
    //Shall we validate the statemodel string not having special character such as ../ etc?
    if (!StringUtils.isAlphanumeric(statemodel)) {
      return badRequest("Invalid statemodel name!");
    }

    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
    PropertyKey key = dataAccessor.keyBuilder().stateModelDef(statemodel);
    boolean retcode = true;
    try {
      retcode = dataAccessor.removeProperty(key);
    } catch (Exception e) {
      LOG.error("Failed to remove StateModelDefinition key: {}. Exception: {}.", key, e);
      retcode = false;
    }
    if (!retcode) {
      return badRequest("Failed to remove!");
    }
    return OK();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @PUT
  @Path("{clusterId}/restconfig")
  public Response createRESTConfig(@PathParam("clusterId") String clusterId,
      String content) {
    ZNRecord record;
    try {
      record = toZNRecord(content);
    } catch (IOException e) {
      LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e);
      return badRequest("Input is not a valid ZNRecord!");
    }

    if (!record.getId().equals(clusterId)) {
      return badRequest("ID does not match the cluster name in input!");
    }

    RESTConfig config = new RESTConfig(record);
    ConfigAccessor configAccessor = getConfigAccessor();
    try {
      configAccessor.setRESTConfig(clusterId, config);
    } catch (HelixException ex) {
      // TODO: Could use a more generic error for HelixException
      return notFound(ex.getMessage());
    } catch (Exception ex) {
      LOG.error("Failed to create rest config, cluster {}, new config: {}. Exception: {}.", clusterId, content, ex);
      return serverError(ex);
    }
    return OK();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @POST
  @Path("{clusterId}/restconfig")
  public Response updateRESTConfig(@PathParam("clusterId") String clusterId,
      @QueryParam("command") String commandStr, String content) {
    //TODO: abstract out the logic that is duplicated from cluster config methods
    Command command;
    try {
      command = getCommand(commandStr);
    } catch (HelixException ex) {
      return badRequest(ex.getMessage());
    }

    ZNRecord record;
    try {
      record = toZNRecord(content);
    } catch (IOException e) {
      LOG.error("Failed to deserialize user's input {}. Exception: {}", content, e);
      return badRequest("Input is not a valid ZNRecord!");
    }

    RESTConfig config = new RESTConfig(record);
    ConfigAccessor configAccessor = getConfigAccessor();
    try {
      switch (command) {
        case update:
          configAccessor.updateRESTConfig(clusterId, config);
          break;
        case delete: {
          HelixConfigScope scope =
              new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.REST)
                  .forCluster(clusterId).build();
          configAccessor.remove(scope, config.getRecord());
        }
        break;
        default:
          return badRequest("Unsupported command " + commandStr);
      }
    } catch (HelixException ex) {
      return notFound(ex.getMessage());
    } catch (Exception ex) {
      LOG.error(
          "Failed to {} rest config, cluster {}, new config: {}. Exception: {}", command, clusterId, content, ex);
      return serverError(ex);
    }
    return OK();
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/restconfig")
  public Response getRESTConfig(@PathParam("clusterId") String clusterId) {
    ConfigAccessor accessor = getConfigAccessor();
    RESTConfig config = null;
    try {
      config = accessor.getRESTConfig(clusterId);
    } catch (HelixException ex) {
      LOG.info(
          "Failed to get rest config for cluster {}, cluster not found. Exception: {}.", clusterId, ex);
    } catch (Exception ex) {
      LOG.error("Failed to get rest config for cluster {}. Exception: {}.", clusterId, ex);
      return serverError(ex);
    }
    if (config == null) {
      return notFound();
    }
    return JSONRepresentation(config.getRecord());
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @DELETE
  @Path("{clusterId}/restconfig")
  public Response deleteRESTConfig(@PathParam("clusterId") String clusterId) {
    ConfigAccessor accessor = getConfigAccessor();
    try {
      accessor.deleteRESTConfig(clusterId);
    } catch (HelixException ex) {
      LOG.info("Failed to delete rest config for cluster {}, cluster rest config is not found. Exception: {}.", clusterId, ex);
      return notFound(ex.getMessage());
    } catch (Exception ex) {
      LOG.error("Failed to delete rest config, cluster {}, Exception: {}.", clusterId, ex);
      return serverError(ex);
    }
    return OK();
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/maintenance")
  public Response getClusterMaintenanceMode(@PathParam("clusterId") String clusterId) {
    return JSONRepresentation(ImmutableMap
        .of(ClusterProperties.maintenance.name(), getHelixAdmin().isInMaintenanceMode(clusterId)));
  }

  private boolean doesClusterExist(String cluster) {
    RealmAwareZkClient zkClient = getRealmAwareZkClient();
    return ZKUtil.isClusterSetup(cluster, zkClient);
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @PUT
  @Path("{clusterId}/cloudconfig")
  public Response addCloudConfig(@PathParam("clusterId") String clusterId, String content) {

    RealmAwareZkClient zkClient = getRealmAwareZkClient();
    if (!ZKUtil.isClusterSetup(clusterId, zkClient)) {
      return notFound("Cluster is not properly setup!");
    }

    HelixAdmin admin = getHelixAdmin();
    ZNRecord record;
    try {
      record = toZNRecord(content);
    } catch (IOException e) {
      LOG.error("Failed to deserialize user's input " + content + ", Exception: " + e);
      return badRequest("Input is not a vaild ZNRecord!");
    }

    try {
      CloudConfig cloudConfig = new CloudConfig.Builder(record).build();
      admin.addCloudConfig(clusterId, cloudConfig);
    } catch (HelixException ex) {
      LOG.error("Error in adding a CloudConfig to cluster: " + clusterId, ex);
      return badRequest(ex.getMessage());
    } catch (Exception ex) {
      LOG.error("Cannot add CloudConfig to cluster: " + clusterId, ex);
      return serverError(ex);
    }

    return OK();
  }

  @ResponseMetered(name = HttpConstants.READ_REQUEST)
  @Timed(name = HttpConstants.READ_REQUEST)
  @GET
  @Path("{clusterId}/cloudconfig")
  public Response getCloudConfig(@PathParam("clusterId") String clusterId) {

    RealmAwareZkClient zkClient = getRealmAwareZkClient();
    if (!ZKUtil.isClusterSetup(clusterId, zkClient)) {
      return notFound();
    }

    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
    CloudConfig cloudConfig = configAccessor.getCloudConfig(clusterId);

    if (cloudConfig != null) {
      return JSONRepresentation(cloudConfig.getRecord());
    }

    return notFound();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @DELETE
  @Path("{clusterId}/cloudconfig")
  public Response deleteCloudConfig(@PathParam("clusterId") String clusterId) {
    HelixAdmin admin = getHelixAdmin();
    admin.removeCloudConfig(clusterId);
    return OK();
  }

  @ResponseMetered(name = HttpConstants.WRITE_REQUEST)
  @Timed(name = HttpConstants.WRITE_REQUEST)
  @POST
  @Path("{clusterId}/cloudconfig")
  public Response updateCloudConfig(@PathParam("clusterId") String clusterId,
      @QueryParam("command") String commandStr, String content) {

    RealmAwareZkClient zkClient = getRealmAwareZkClient();
    if (!ZKUtil.isClusterSetup(clusterId, zkClient)) {
      return notFound();
    }

    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
    // Here to update cloud config
    Command command;
    if (commandStr == null || commandStr.isEmpty()) {
      command = Command.update; // Default behavior
    } else {
      try {
        command = getCommand(commandStr);
      } catch (HelixException ex) {
        return badRequest(ex.getMessage());
      }
    }

    ZNRecord record;
    CloudConfig cloudConfig;
    try {
      record = toZNRecord(content);
      cloudConfig = new CloudConfig(record);
    } catch (IOException e) {
      LOG.error("Failed to deserialize user's input " + content + ", Exception: " + e);
      return badRequest("Input is not a vaild ZNRecord!");
    }
    try {
      switch (command) {
      case delete: {
        configAccessor.deleteCloudConfigFields(clusterId, cloudConfig);
      }
      break;
      case update: {
        try {
          configAccessor.updateCloudConfig(clusterId, cloudConfig);
        } catch (HelixException ex) {
          LOG.error("Error in updating a CloudConfig to cluster: " + clusterId, ex);
          return badRequest(ex.getMessage());
        } catch (Exception ex) {
          LOG.error("Cannot update CloudConfig for cluster: " + clusterId, ex);
          return serverError(ex);
        }
      }
      break;
      default:
        return badRequest("Unsupported command " + commandStr);
      }
    } catch (Exception ex) {
      LOG.error("Failed to " + command + " cloud config, cluster " + clusterId + " new config: "
          + content + ", Exception: " + ex);
      return serverError(ex);
    }
    return OK();
  }

  /**
   * Reads HISTORY ZNode from the metadata store and generates a Map object that contains the
   * pertinent history entries depending on the history type.
   * @param clusterId
   * @param historyType
   * @return
   */
  private Map<String, Object> getControllerHistory(String clusterId,
      ControllerHistory.HistoryType historyType) {
    HelixDataAccessor dataAccessor = getDataAccssor(clusterId);
    Map<String, Object> history = new HashMap<>();
    history.put(Properties.id.name(), clusterId);

    ControllerHistory historyRecord =
        dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeaderHistory());

    switch (historyType) {
      case CONTROLLER_LEADERSHIP:
        history.put(Properties.history.name(),
            historyRecord != null ? historyRecord.getHistoryList() : Collections.emptyList());
        break;
      case MAINTENANCE:
        history.put(ClusterProperties.maintenanceHistory.name(),
            historyRecord != null ? historyRecord.getMaintenanceHistoryList()
                : Collections.emptyList());
        break;
    }
    return history;
  }
}
