| package org.apache.helix.rest.server.resources.helix; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import javax.ws.rs.DELETE; |
| import javax.ws.rs.DefaultValue; |
| import javax.ws.rs.GET; |
| import javax.ws.rs.POST; |
| import javax.ws.rs.PUT; |
| import javax.ws.rs.Path; |
| import javax.ws.rs.PathParam; |
| import javax.ws.rs.QueryParam; |
| import javax.ws.rs.core.Response; |
| |
| import com.codahale.metrics.annotation.ResponseMetered; |
| import com.codahale.metrics.annotation.Timed; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.ImmutableMap; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.helix.AccessOption; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.PropertyPathBuilder; |
| import org.apache.helix.manager.zk.ZKUtil; |
| import org.apache.helix.model.CloudConfig; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.ControllerHistory; |
| import org.apache.helix.model.CustomizedStateConfig; |
| import org.apache.helix.model.HelixConfigScope; |
| import org.apache.helix.model.LiveInstance; |
| import org.apache.helix.model.MaintenanceSignal; |
| import org.apache.helix.model.Message; |
| import org.apache.helix.model.RESTConfig; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.model.builder.HelixConfigScopeBuilder; |
| import org.apache.helix.rest.common.HttpConstants; |
| import org.apache.helix.rest.server.json.cluster.ClusterTopology; |
| import org.apache.helix.rest.server.service.ClusterService; |
| import org.apache.helix.rest.server.service.ClusterServiceImpl; |
| import org.apache.helix.tools.ClusterSetup; |
| import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| @Path("/clusters") |
| public class ClusterAccessor extends AbstractHelixResource { |
| private static Logger LOG = LoggerFactory.getLogger(ClusterAccessor.class.getName()); |
| |
| public enum ClusterProperties { |
| controller, |
| instances, |
| liveInstances, |
| resources, |
| paused, |
| maintenance, |
| messages, |
| stateModelDefinitions, |
| clusters, |
| maintenanceSignal, |
| maintenanceHistory, |
| clusterName |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| public Response getClusters() { |
| HelixAdmin helixAdmin = getHelixAdmin(); |
| List<String> clusters = helixAdmin.getClusters(); |
| |
| Map<String, List<String>> dataMap = new HashMap<>(); |
| dataMap.put(ClusterProperties.clusters.name(), clusters); |
| |
| return JSONRepresentation(dataMap); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}") |
| public Response getClusterInfo(@PathParam("clusterId") String clusterId) { |
| if (!doesClusterExist(clusterId)) { |
| return notFound(); |
| } |
| |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); |
| |
| Map<String, Object> clusterInfo = new HashMap<>(); |
| clusterInfo.put(Properties.id.name(), clusterId); |
| |
| LiveInstance controller = dataAccessor.getProperty(keyBuilder.controllerLeader()); |
| if (controller != null) { |
| clusterInfo.put(ClusterProperties.controller.name(), controller.getInstanceName()); |
| } else { |
| clusterInfo.put(ClusterProperties.controller.name(), "No Lead Controller!"); |
| } |
| |
| boolean paused = dataAccessor.getBaseDataAccessor() |
| .exists(keyBuilder.pause().getPath(), AccessOption.PERSISTENT); |
| clusterInfo.put(ClusterProperties.paused.name(), paused); |
| boolean maintenance = getHelixAdmin().isInMaintenanceMode(clusterId); |
| clusterInfo.put(ClusterProperties.maintenance.name(), maintenance); |
| |
| List<String> idealStates = dataAccessor.getChildNames(keyBuilder.idealStates()); |
| clusterInfo.put(ClusterProperties.resources.name(), idealStates); |
| List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs()); |
| clusterInfo.put(ClusterProperties.instances.name(), instances); |
| List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances()); |
| clusterInfo.put(ClusterProperties.liveInstances.name(), liveInstances); |
| |
| return JSONRepresentation(clusterInfo); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @PUT |
| @Path("{clusterId}") |
| public Response createCluster(@PathParam("clusterId") String clusterId, |
| @DefaultValue("false") @QueryParam("recreate") String recreate, |
| @DefaultValue("false") @QueryParam("addCloudConfig") String addCloudConfig, |
| String cloudConfigManifest) { |
| |
| boolean recreateIfExists = Boolean.parseBoolean(recreate); |
| boolean cloudConfigIncluded = Boolean.parseBoolean(addCloudConfig); |
| |
| ClusterSetup clusterSetup = getClusterSetup(); |
| |
| CloudConfig cloudConfig = null; |
| if (cloudConfigIncluded) { |
| ZNRecord record; |
| try { |
| record = toZNRecord(cloudConfigManifest); |
| cloudConfig = new CloudConfig.Builder(record).build(); |
| } catch (IOException | HelixException e) { |
| String errMsg = "Failed to generate a valid CloudConfig from " + cloudConfigManifest; |
| LOG.error(errMsg, e); |
| return badRequest(errMsg + " Exception: " + e.getMessage()); |
| } |
| } |
| |
| try { |
| clusterSetup.addCluster(clusterId, recreateIfExists, cloudConfig); |
| } catch (Exception ex) { |
| LOG.error("Failed to create cluster {}. Exception: {}.", clusterId, ex); |
| return serverError(ex); |
| } |
| return created(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @DELETE |
| @Path("{clusterId}") |
| public Response deleteCluster(@PathParam("clusterId") String clusterId) { |
| ClusterSetup clusterSetup = getClusterSetup(); |
| |
| try { |
| clusterSetup.deleteCluster(clusterId); |
| } catch (HelixException ex) { |
| LOG.info("Failed to delete cluster {}, cluster is still in use. Exception: {}.", clusterId, |
| ex); |
| return badRequest(ex.getMessage()); |
| } catch (Exception ex) { |
| LOG.error("Failed to delete cluster {}. Exception: {}.", clusterId, ex); |
| return serverError(ex); |
| } |
| |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @POST |
| @Path("{clusterId}") |
| public Response updateCluster(@PathParam("clusterId") String clusterId, |
| @QueryParam("command") String commandStr, @QueryParam("superCluster") String superCluster, |
| @QueryParam("duration") Long duration, String content) { |
| Command command; |
| try { |
| command = getCommand(commandStr); |
| } catch (HelixException ex) { |
| return badRequest(ex.getMessage()); |
| } |
| |
| ClusterSetup clusterSetup = getClusterSetup(); |
| HelixAdmin helixAdmin = getHelixAdmin(); |
| |
| switch (command) { |
| case activate: |
| if (superCluster == null) { |
| return badRequest("Super Cluster name is missing!"); |
| } |
| try { |
| clusterSetup.activateCluster(clusterId, superCluster, true); |
| } catch (Exception ex) { |
| LOG.error("Failed to add cluster {} to super cluster {}.", clusterId, superCluster); |
| return serverError(ex); |
| } |
| break; |
| |
| case expand: |
| try { |
| clusterSetup.expandCluster(clusterId); |
| } catch (Exception ex) { |
| LOG.error("Failed to expand cluster {}.", clusterId); |
| return serverError(ex); |
| } |
| break; |
| |
| case enable: |
| try { |
| helixAdmin.enableCluster(clusterId, true); |
| } catch (Exception ex) { |
| LOG.error("Failed to enable cluster {}.", clusterId); |
| return serverError(ex); |
| } |
| break; |
| |
| case disable: |
| try { |
| helixAdmin.enableCluster(clusterId, false); |
| } catch (Exception ex) { |
| LOG.error("Failed to disable cluster {}.", clusterId); |
| return serverError(ex); |
| } |
| break; |
| |
| case enableMaintenanceMode: |
| case disableMaintenanceMode: |
| // Try to parse the content string. If parseable, use it as a KV mapping. Otherwise, treat it |
| // as a REASON String |
| Map<String, String> customFieldsMap = null; |
| try { |
| // Try to parse content |
| customFieldsMap = |
| OBJECT_MAPPER.readValue(content, new TypeReference<HashMap<String, String>>() { |
| }); |
| // content is given as a KV mapping. Nullify content |
| content = null; |
| } catch (Exception e) { |
| // NOP |
| } |
| helixAdmin |
| .manuallyEnableMaintenanceMode(clusterId, command == Command.enableMaintenanceMode, |
| content, customFieldsMap); |
| break; |
| case enableWagedRebalanceForAllResources: |
| // Enable WAGED rebalance for all resources in the cluster |
| List<String> resources = helixAdmin.getResourcesInCluster(clusterId); |
| try { |
| helixAdmin.enableWagedRebalance(clusterId, resources); |
| } catch (HelixException e) { |
| return badRequest(e.getMessage()); |
| } |
| break; |
| case purgeOfflineParticipants: |
| if (duration == null || duration < 0) { |
| helixAdmin |
| .purgeOfflineInstances(clusterId, ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET); |
| } else { |
| helixAdmin.purgeOfflineInstances(clusterId, duration); |
| } |
| break; |
| default: |
| return badRequest("Unsupported command {}." + command); |
| } |
| |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/configs") |
| public Response getClusterConfig(@PathParam("clusterId") String clusterId) { |
| ConfigAccessor accessor = getConfigAccessor(); |
| ClusterConfig config = null; |
| try { |
| config = accessor.getClusterConfig(clusterId); |
| } catch (HelixException ex) { |
| // cluster not found. |
| LOG.info("Failed to get cluster config for cluster {}, cluster not found. Exception: {}.", |
| clusterId, ex); |
| } catch (Exception ex) { |
| LOG.error("Failed to get cluster config for cluster {}. Exception: {}", clusterId, ex); |
| return serverError(ex); |
| } |
| if (config == null) { |
| return notFound(); |
| } |
| return JSONRepresentation(config.getRecord()); |
| } |
| |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @PUT |
| @Path("{clusterId}/customized-state-config") |
| public Response addCustomizedStateConfig(@PathParam("clusterId") String clusterId, |
| String content) { |
| if (!doesClusterExist(clusterId)) { |
| return notFound(String.format("Cluster %s does not exist", clusterId)); |
| } |
| |
| HelixAdmin admin = getHelixAdmin(); |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| return badRequest("Input is not a vaild ZNRecord!"); |
| } |
| |
| try { |
| CustomizedStateConfig customizedStateConfig = |
| new CustomizedStateConfig.Builder(record).build(); |
| admin.addCustomizedStateConfig(clusterId, customizedStateConfig); |
| } catch (Exception ex) { |
| LOG.error("Cannot add CustomizedStateConfig to cluster: {} Exception: {}", |
| clusterId, ex); |
| return serverError(ex); |
| } |
| |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @DELETE |
| @Path("{clusterId}/customized-state-config") |
| public Response removeCustomizedStateConfig(@PathParam("clusterId") String clusterId) { |
| if (!doesClusterExist(clusterId)) { |
| return notFound(String.format("Cluster %s does not exist", clusterId)); |
| } |
| |
| HelixAdmin admin = getHelixAdmin(); |
| try { |
| admin.removeCustomizedStateConfig(clusterId); |
| } catch (Exception ex) { |
| LOG.error( |
| "Cannot remove CustomizedStateConfig from cluster: {}, Exception: {}", |
| clusterId, ex); |
| return serverError(ex); |
| } |
| |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/customized-state-config") |
| public Response getCustomizedStateConfig(@PathParam("clusterId") String clusterId) { |
| if (!doesClusterExist(clusterId)) { |
| return notFound(String.format("Cluster %s does not exist", clusterId)); |
| } |
| |
| ConfigAccessor configAccessor = getConfigAccessor(); |
| CustomizedStateConfig customizedStateConfig = |
| configAccessor.getCustomizedStateConfig(clusterId); |
| |
| if (customizedStateConfig != null) { |
| return JSONRepresentation(customizedStateConfig.getRecord()); |
| } |
| |
| return notFound(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @POST |
| @Path("{clusterId}/customized-state-config") |
| public Response updateCustomizedStateConfig(@PathParam("clusterId") String clusterId, |
| @QueryParam("command") String commandStr, @QueryParam("type") String type) { |
| if (!doesClusterExist(clusterId)) { |
| return notFound(String.format("Cluster %s does not exist", clusterId)); |
| } |
| |
| Command command; |
| if (commandStr == null || commandStr.isEmpty()) { |
| command = Command.add; // Default behavior |
| } else { |
| try { |
| command = getCommand(commandStr); |
| } catch (HelixException ex) { |
| return badRequest(ex.getMessage()); |
| } |
| } |
| |
| HelixAdmin admin = getHelixAdmin(); |
| |
| try { |
| switch (command) { |
| case delete: |
| admin.removeTypeFromCustomizedStateConfig(clusterId, type); |
| break; |
| case add: |
| admin.addTypeToCustomizedStateConfig(clusterId, type); |
| break; |
| default: |
| return badRequest("Unsupported command " + commandStr); |
| } |
| } catch (Exception ex) { |
| LOG.error("Failed to {} CustomizedStateConfig for cluster {} new type: {}, Exception: {}", command, clusterId, type, ex); |
| return serverError(ex); |
| } |
| return OK(); |
| } |
| |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/topology") |
| public Response getClusterTopology(@PathParam("clusterId") String clusterId) throws IOException { |
| //TODO reduce the GC by dependency injection |
| ClusterService clusterService = |
| new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); |
| ObjectMapper objectMapper = new ObjectMapper(); |
| ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId); |
| |
| return OK(objectMapper.writeValueAsString(clusterTopology)); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/topologymap") |
| public Response getClusterTopologyMap(@PathParam("clusterId") String clusterId) { |
| HelixAdmin admin = getHelixAdmin(); |
| Map<String, List<String>> topologyMap; |
| try { |
| topologyMap = admin.getClusterTopology(clusterId).getTopologyMap(); |
| } catch (HelixException ex) { |
| return badRequest(ex.getMessage()); |
| } |
| return JSONRepresentation(topologyMap); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/faultzonemap") |
| public Response getClusterFaultZoneMap(@PathParam("clusterId") String clusterId) { |
| HelixAdmin admin = getHelixAdmin(); |
| Map<String, List<String>> faultZoneMap; |
| try { |
| faultZoneMap = admin.getClusterTopology(clusterId).getFaultZoneMap(); |
| } catch (HelixException ex) { |
| return badRequest(ex.getMessage()); |
| } |
| return JSONRepresentation(faultZoneMap); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @POST |
| @Path("{clusterId}/configs") |
| public Response updateClusterConfig(@PathParam("clusterId") String clusterId, |
| @QueryParam("command") String commandStr, String content) { |
| Command command; |
| try { |
| command = getCommand(commandStr); |
| } catch (HelixException ex) { |
| return badRequest(ex.getMessage()); |
| } |
| |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e); |
| return badRequest("Input is not a valid ZNRecord!"); |
| } |
| |
| if (!clusterId.equals(record.getId())) { |
| return badRequest("ID does not match the cluster name in input!"); |
| } |
| |
| ClusterConfig config = new ClusterConfig(record); |
| ConfigAccessor configAccessor = getConfigAccessor(); |
| try { |
| switch (command) { |
| case update: |
| configAccessor.updateClusterConfig(clusterId, config); |
| break; |
| case delete: { |
| HelixConfigScope clusterScope = |
| new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER) |
| .forCluster(clusterId).build(); |
| configAccessor.remove(clusterScope, config.getRecord()); |
| } |
| break; |
| |
| default: |
| return badRequest("Unsupported command " + commandStr); |
| } |
| } catch (HelixException ex) { |
| return notFound(ex.getMessage()); |
| } catch (Exception ex) { |
| LOG |
| .error("Failed to {} cluster config, cluster {}, new config: {}. Exception: {}.", command, |
| clusterId, content, ex); |
| return serverError(ex); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/controller") |
| public Response getClusterController(@PathParam("clusterId") String clusterId) { |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| Map<String, Object> controllerInfo = new HashMap<>(); |
| controllerInfo.put(Properties.id.name(), clusterId); |
| |
| LiveInstance leader = dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeader()); |
| if (leader != null) { |
| controllerInfo.put(ClusterProperties.controller.name(), leader.getInstanceName()); |
| controllerInfo.putAll(leader.getRecord().getSimpleFields()); |
| } else { |
| controllerInfo.put(ClusterProperties.controller.name(), "No Lead Controller!"); |
| } |
| |
| return JSONRepresentation(controllerInfo); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/controller/history") |
| public Response getClusterControllerLeadershipHistory(@PathParam("clusterId") String clusterId) { |
| return JSONRepresentation( |
| getControllerHistory(clusterId, ControllerHistory.HistoryType.CONTROLLER_LEADERSHIP)); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/controller/maintenanceHistory") |
| public Response getClusterMaintenanceHistory(@PathParam("clusterId") String clusterId) { |
| return JSONRepresentation( |
| getControllerHistory(clusterId, ControllerHistory.HistoryType.MAINTENANCE)); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/controller/maintenanceSignal") |
| public Response getClusterMaintenanceSignal(@PathParam("clusterId") String clusterId) { |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| MaintenanceSignal maintenanceSignal = |
| dataAccessor.getProperty(dataAccessor.keyBuilder().maintenance()); |
| if (maintenanceSignal != null) { |
| Map<String, String> maintenanceInfo = maintenanceSignal.getRecord().getSimpleFields(); |
| maintenanceInfo.put(ClusterProperties.clusterName.name(), clusterId); |
| return JSONRepresentation(maintenanceInfo); |
| } |
| return notFound(String.format("Cluster %s is not in maintenance mode!", clusterId)); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/controller/messages") |
| public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId) { |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| |
| Map<String, Object> controllerMessages = new HashMap<>(); |
| controllerMessages.put(Properties.id.name(), clusterId); |
| |
| List<String> messages = |
| dataAccessor.getChildNames(dataAccessor.keyBuilder().controllerMessages()); |
| controllerMessages.put(ClusterProperties.messages.name(), messages); |
| controllerMessages.put(Properties.count.name(), messages.size()); |
| |
| return JSONRepresentation(controllerMessages); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/controller/messages/{messageId}") |
| public Response getClusterControllerMessages(@PathParam("clusterId") String clusterId, |
| @PathParam("messageId") String messageId) { |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| Message message = |
| dataAccessor.getProperty(dataAccessor.keyBuilder().controllerMessage(messageId)); |
| return JSONRepresentation(message.getRecord()); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/statemodeldefs") |
| public Response getClusterStateModelDefinitions(@PathParam("clusterId") String clusterId) { |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| List<String> stateModelDefs = |
| dataAccessor.getChildNames(dataAccessor.keyBuilder().stateModelDefs()); |
| |
| Map<String, Object> clusterStateModelDefs = new HashMap<>(); |
| clusterStateModelDefs.put(Properties.id.name(), clusterId); |
| clusterStateModelDefs.put(ClusterProperties.stateModelDefinitions.name(), stateModelDefs); |
| |
| return JSONRepresentation(clusterStateModelDefs); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/statemodeldefs/{statemodel}") |
| public Response getClusterStateModelDefinition(@PathParam("clusterId") String clusterId, |
| @PathParam("statemodel") String statemodel) { |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| StateModelDefinition stateModelDef = |
| dataAccessor.getProperty(dataAccessor.keyBuilder().stateModelDef(statemodel)); |
| |
| if (stateModelDef == null) { |
| return badRequest("Statemodel not found!"); |
| } |
| return JSONRepresentation(stateModelDef.getRecord()); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @PUT |
| @Path("{clusterId}/statemodeldefs/{statemodel}") |
| public Response createClusterStateModelDefinition(@PathParam("clusterId") String clusterId, |
| @PathParam("statemodel") String statemodel, String content) { |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e); |
| return badRequest("Input is not a valid ZNRecord!"); |
| } |
| RealmAwareZkClient zkClient = getRealmAwareZkClient(); |
| String path = PropertyPathBuilder.stateModelDef(clusterId); |
| try { |
| ZKUtil.createChildren(zkClient, path, record); |
| } catch (Exception e) { |
| LOG.error("Failed to create zk node with path {}. Exception: {}", path, e); |
| return badRequest("Failed to create a Znode for stateModel! " + e); |
| } |
| |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @POST |
| @Path("{clusterId}/statemodeldefs/{statemodel}") |
| public Response setClusterStateModelDefinition(@PathParam("clusterId") String clusterId, |
| @PathParam("statemodel") String statemodel, String content) { |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e); |
| return badRequest("Input is not a valid ZNRecord!"); |
| } |
| |
| StateModelDefinition stateModelDefinition = new StateModelDefinition(record); |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| |
| PropertyKey key = dataAccessor.keyBuilder().stateModelDef(stateModelDefinition.getId()); |
| boolean retcode = true; |
| try { |
| retcode = dataAccessor.setProperty(key, stateModelDefinition); |
| } catch (Exception e) { |
| LOG.error("Failed to set StateModelDefinition key: {}. Exception: {}.", key, e); |
| return badRequest("Failed to set the content " + content); |
| } |
| |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @DELETE |
| @Path("{clusterId}/statemodeldefs/{statemodel}") |
| public Response removeClusterStateModelDefinition(@PathParam("clusterId") String clusterId, |
| @PathParam("statemodel") String statemodel) { |
| //Shall we validate the statemodel string not having special character such as ../ etc? |
| if (!StringUtils.isAlphanumeric(statemodel)) { |
| return badRequest("Invalid statemodel name!"); |
| } |
| |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| PropertyKey key = dataAccessor.keyBuilder().stateModelDef(statemodel); |
| boolean retcode = true; |
| try { |
| retcode = dataAccessor.removeProperty(key); |
| } catch (Exception e) { |
| LOG.error("Failed to remove StateModelDefinition key: {}. Exception: {}.", key, e); |
| retcode = false; |
| } |
| if (!retcode) { |
| return badRequest("Failed to remove!"); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @PUT |
| @Path("{clusterId}/restconfig") |
| public Response createRESTConfig(@PathParam("clusterId") String clusterId, |
| String content) { |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| LOG.error("Failed to deserialize user's input {}. Exception: {}.", content, e); |
| return badRequest("Input is not a valid ZNRecord!"); |
| } |
| |
| if (!record.getId().equals(clusterId)) { |
| return badRequest("ID does not match the cluster name in input!"); |
| } |
| |
| RESTConfig config = new RESTConfig(record); |
| ConfigAccessor configAccessor = getConfigAccessor(); |
| try { |
| configAccessor.setRESTConfig(clusterId, config); |
| } catch (HelixException ex) { |
| // TODO: Could use a more generic error for HelixException |
| return notFound(ex.getMessage()); |
| } catch (Exception ex) { |
| LOG.error("Failed to create rest config, cluster {}, new config: {}. Exception: {}.", clusterId, content, ex); |
| return serverError(ex); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @POST |
| @Path("{clusterId}/restconfig") |
| public Response updateRESTConfig(@PathParam("clusterId") String clusterId, |
| @QueryParam("command") String commandStr, String content) { |
| //TODO: abstract out the logic that is duplicated from cluster config methods |
| Command command; |
| try { |
| command = getCommand(commandStr); |
| } catch (HelixException ex) { |
| return badRequest(ex.getMessage()); |
| } |
| |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| LOG.error("Failed to deserialize user's input {}. Exception: {}", content, e); |
| return badRequest("Input is not a valid ZNRecord!"); |
| } |
| |
| RESTConfig config = new RESTConfig(record); |
| ConfigAccessor configAccessor = getConfigAccessor(); |
| try { |
| switch (command) { |
| case update: |
| configAccessor.updateRESTConfig(clusterId, config); |
| break; |
| case delete: { |
| HelixConfigScope scope = |
| new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.REST) |
| .forCluster(clusterId).build(); |
| configAccessor.remove(scope, config.getRecord()); |
| } |
| break; |
| default: |
| return badRequest("Unsupported command " + commandStr); |
| } |
| } catch (HelixException ex) { |
| return notFound(ex.getMessage()); |
| } catch (Exception ex) { |
| LOG.error( |
| "Failed to {} rest config, cluster {}, new config: {}. Exception: {}", command, clusterId, content, ex); |
| return serverError(ex); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/restconfig") |
| public Response getRESTConfig(@PathParam("clusterId") String clusterId) { |
| ConfigAccessor accessor = getConfigAccessor(); |
| RESTConfig config = null; |
| try { |
| config = accessor.getRESTConfig(clusterId); |
| } catch (HelixException ex) { |
| LOG.info( |
| "Failed to get rest config for cluster {}, cluster not found. Exception: {}.", clusterId, ex); |
| } catch (Exception ex) { |
| LOG.error("Failed to get rest config for cluster {}. Exception: {}.", clusterId, ex); |
| return serverError(ex); |
| } |
| if (config == null) { |
| return notFound(); |
| } |
| return JSONRepresentation(config.getRecord()); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @DELETE |
| @Path("{clusterId}/restconfig") |
| public Response deleteRESTConfig(@PathParam("clusterId") String clusterId) { |
| ConfigAccessor accessor = getConfigAccessor(); |
| try { |
| accessor.deleteRESTConfig(clusterId); |
| } catch (HelixException ex) { |
| LOG.info("Failed to delete rest config for cluster {}, cluster rest config is not found. Exception: {}.", clusterId, ex); |
| return notFound(ex.getMessage()); |
| } catch (Exception ex) { |
| LOG.error("Failed to delete rest config, cluster {}, Exception: {}.", clusterId, ex); |
| return serverError(ex); |
| } |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/maintenance") |
| public Response getClusterMaintenanceMode(@PathParam("clusterId") String clusterId) { |
| return JSONRepresentation(ImmutableMap |
| .of(ClusterProperties.maintenance.name(), getHelixAdmin().isInMaintenanceMode(clusterId))); |
| } |
| |
| private boolean doesClusterExist(String cluster) { |
| RealmAwareZkClient zkClient = getRealmAwareZkClient(); |
| return ZKUtil.isClusterSetup(cluster, zkClient); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @PUT |
| @Path("{clusterId}/cloudconfig") |
| public Response addCloudConfig(@PathParam("clusterId") String clusterId, String content) { |
| |
| RealmAwareZkClient zkClient = getRealmAwareZkClient(); |
| if (!ZKUtil.isClusterSetup(clusterId, zkClient)) { |
| return notFound("Cluster is not properly setup!"); |
| } |
| |
| HelixAdmin admin = getHelixAdmin(); |
| ZNRecord record; |
| try { |
| record = toZNRecord(content); |
| } catch (IOException e) { |
| LOG.error("Failed to deserialize user's input " + content + ", Exception: " + e); |
| return badRequest("Input is not a vaild ZNRecord!"); |
| } |
| |
| try { |
| CloudConfig cloudConfig = new CloudConfig.Builder(record).build(); |
| admin.addCloudConfig(clusterId, cloudConfig); |
| } catch (HelixException ex) { |
| LOG.error("Error in adding a CloudConfig to cluster: " + clusterId, ex); |
| return badRequest(ex.getMessage()); |
| } catch (Exception ex) { |
| LOG.error("Cannot add CloudConfig to cluster: " + clusterId, ex); |
| return serverError(ex); |
| } |
| |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.READ_REQUEST) |
| @Timed(name = HttpConstants.READ_REQUEST) |
| @GET |
| @Path("{clusterId}/cloudconfig") |
| public Response getCloudConfig(@PathParam("clusterId") String clusterId) { |
| |
| RealmAwareZkClient zkClient = getRealmAwareZkClient(); |
| if (!ZKUtil.isClusterSetup(clusterId, zkClient)) { |
| return notFound(); |
| } |
| |
| ConfigAccessor configAccessor = new ConfigAccessor(zkClient); |
| CloudConfig cloudConfig = configAccessor.getCloudConfig(clusterId); |
| |
| if (cloudConfig != null) { |
| return JSONRepresentation(cloudConfig.getRecord()); |
| } |
| |
| return notFound(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @DELETE |
| @Path("{clusterId}/cloudconfig") |
| public Response deleteCloudConfig(@PathParam("clusterId") String clusterId) { |
| HelixAdmin admin = getHelixAdmin(); |
| admin.removeCloudConfig(clusterId); |
| return OK(); |
| } |
| |
| @ResponseMetered(name = HttpConstants.WRITE_REQUEST) |
| @Timed(name = HttpConstants.WRITE_REQUEST) |
| @POST |
| @Path("{clusterId}/cloudconfig") |
| public Response updateCloudConfig(@PathParam("clusterId") String clusterId, |
| @QueryParam("command") String commandStr, String content) { |
| |
| RealmAwareZkClient zkClient = getRealmAwareZkClient(); |
| if (!ZKUtil.isClusterSetup(clusterId, zkClient)) { |
| return notFound(); |
| } |
| |
| ConfigAccessor configAccessor = new ConfigAccessor(zkClient); |
| // Here to update cloud config |
| Command command; |
| if (commandStr == null || commandStr.isEmpty()) { |
| command = Command.update; // Default behavior |
| } else { |
| try { |
| command = getCommand(commandStr); |
| } catch (HelixException ex) { |
| return badRequest(ex.getMessage()); |
| } |
| } |
| |
| ZNRecord record; |
| CloudConfig cloudConfig; |
| try { |
| record = toZNRecord(content); |
| cloudConfig = new CloudConfig(record); |
| } catch (IOException e) { |
| LOG.error("Failed to deserialize user's input " + content + ", Exception: " + e); |
| return badRequest("Input is not a vaild ZNRecord!"); |
| } |
| try { |
| switch (command) { |
| case delete: { |
| configAccessor.deleteCloudConfigFields(clusterId, cloudConfig); |
| } |
| break; |
| case update: { |
| try { |
| configAccessor.updateCloudConfig(clusterId, cloudConfig); |
| } catch (HelixException ex) { |
| LOG.error("Error in updating a CloudConfig to cluster: " + clusterId, ex); |
| return badRequest(ex.getMessage()); |
| } catch (Exception ex) { |
| LOG.error("Cannot update CloudConfig for cluster: " + clusterId, ex); |
| return serverError(ex); |
| } |
| } |
| break; |
| default: |
| return badRequest("Unsupported command " + commandStr); |
| } |
| } catch (Exception ex) { |
| LOG.error("Failed to " + command + " cloud config, cluster " + clusterId + " new config: " |
| + content + ", Exception: " + ex); |
| return serverError(ex); |
| } |
| return OK(); |
| } |
| |
| /** |
| * Reads HISTORY ZNode from the metadata store and generates a Map object that contains the |
| * pertinent history entries depending on the history type. |
| * @param clusterId |
| * @param historyType |
| * @return |
| */ |
| private Map<String, Object> getControllerHistory(String clusterId, |
| ControllerHistory.HistoryType historyType) { |
| HelixDataAccessor dataAccessor = getDataAccssor(clusterId); |
| Map<String, Object> history = new HashMap<>(); |
| history.put(Properties.id.name(), clusterId); |
| |
| ControllerHistory historyRecord = |
| dataAccessor.getProperty(dataAccessor.keyBuilder().controllerLeaderHistory()); |
| |
| switch (historyType) { |
| case CONTROLLER_LEADERSHIP: |
| history.put(Properties.history.name(), |
| historyRecord != null ? historyRecord.getHistoryList() : Collections.emptyList()); |
| break; |
| case MAINTENANCE: |
| history.put(ClusterProperties.maintenanceHistory.name(), |
| historyRecord != null ? historyRecord.getMaintenanceHistoryList() |
| : Collections.emptyList()); |
| break; |
| } |
| return history; |
| } |
| } |