| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kylin.rest.controller; |
| |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; |
| import org.apache.hadoop.hive.metastore.api.FieldSchema; |
| import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; |
| import org.apache.kylin.common.KylinConfig; |
| import org.apache.kylin.common.util.HadoopUtil; |
| import org.apache.kylin.common.util.JsonUtil; |
| import org.apache.kylin.cube.CubeInstance; |
| import org.apache.kylin.dimension.TimeDerivedColumnType; |
| import org.apache.kylin.metadata.datatype.DataType; |
| import org.apache.kylin.metadata.model.ColumnDesc; |
| import org.apache.kylin.metadata.model.ISourceAware; |
| import org.apache.kylin.metadata.model.TableDesc; |
| import org.apache.kylin.metadata.project.ProjectInstance; |
| import org.apache.kylin.metadata.project.ProjectManager; |
| import org.apache.kylin.rest.exception.BadRequestException; |
| import org.apache.kylin.rest.exception.ForbiddenException; |
| import org.apache.kylin.rest.exception.InternalErrorException; |
| import org.apache.kylin.rest.exception.NotFoundException; |
| import org.apache.kylin.rest.request.StreamingRequest; |
| import org.apache.kylin.rest.request.StreamingRequestV2; |
| import org.apache.kylin.rest.response.ResponseCode; |
| import org.apache.kylin.rest.service.CubeService; |
| import org.apache.kylin.rest.service.StreamingV2Service; |
| import org.apache.kylin.rest.service.TableService; |
| import org.apache.kylin.stream.core.model.CubeAssignment; |
| import org.apache.kylin.stream.core.model.ReplicaSet; |
| import org.apache.kylin.stream.core.model.Node; |
| import org.apache.kylin.stream.core.model.stats.ClusterState; |
| import org.apache.kylin.stream.core.model.stats.CubeRealTimeState; |
| import org.apache.kylin.stream.core.model.stats.ReceiverStats; |
| import org.apache.kylin.stream.core.source.Partition; |
| import org.apache.kylin.stream.core.source.StreamingSourceConfig; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.beans.factory.annotation.Qualifier; |
| import org.springframework.security.access.AccessDeniedException; |
| import org.springframework.security.core.context.SecurityContextHolder; |
| import org.springframework.stereotype.Controller; |
| import org.springframework.web.bind.annotation.PathVariable; |
| import org.springframework.web.bind.annotation.RequestBody; |
| import org.springframework.web.bind.annotation.RequestMapping; |
| import org.springframework.web.bind.annotation.RequestMethod; |
| import org.springframework.web.bind.annotation.RequestParam; |
| import org.springframework.web.bind.annotation.ResponseBody; |
| |
| import com.fasterxml.jackson.core.JsonParseException; |
| import com.fasterxml.jackson.core.JsonProcessingException; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.JsonMappingException; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| /** |
| * StreamingController is defined as Restful API entrance for UI. |
| * |
| */ |
| @Controller |
| @RequestMapping(value = "/streaming_v2") |
| public class StreamingV2Controller extends BasicController { |
| private static final Logger logger = LoggerFactory.getLogger(StreamingV2Controller.class); |
| |
| @Autowired |
| private StreamingV2Service streamingService; |
| |
| @Autowired |
| private CubeService cubeMgmtService; |
| |
| @Autowired |
| @Qualifier("tableService") |
| private TableService tableService; |
| |
| @RequestMapping(value = "/getConfig", method = { RequestMethod.GET }) |
| @ResponseBody |
| public List<StreamingSourceConfig> getStreamings(@RequestParam(value = "table", required = false) String table, |
| @RequestParam(value = "limit", required = false) Integer limit, |
| @RequestParam(value = "offset", required = false) Integer offset) { |
| try { |
| return streamingService.getStreamingConfigs(table, limit, offset); |
| } catch (IOException e) { |
| logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e); |
| throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage()); |
| } |
| } |
| |
| /** |
| * |
| * create Streaming Schema |
| * |
| * @throws IOException |
| */ |
| @RequestMapping(value = "", method = { RequestMethod.POST }) |
| @ResponseBody |
| public StreamingRequestV2 saveStreamingConfig(@RequestBody StreamingRequestV2 streamingRequest) { |
| String project = streamingRequest.getProject(); |
| TableDesc tableDesc = deserializeTableDesc(streamingRequest); |
| StreamingSourceConfig streamingSourceConfig = deserializeStreamingConfig(streamingRequest.getStreamingConfig()); |
| |
| validateInput(tableDesc, streamingSourceConfig); |
| |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to saveStreamingConfig with table Identity {}", user, tableDesc.getIdentity()); |
| |
| boolean saveStreamingSuccess = false, saveTableSuccess = false; |
| KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); |
| ProjectInstance projectInstance = ProjectManager.getInstance(kylinConfig).getProject(project); |
| try { |
| try { |
| tableDesc.setUuid(UUID.randomUUID().toString()); |
| tableService.loadTableToProject(tableDesc, null, project); |
| saveTableSuccess = true; |
| } catch (IOException e) { |
| throw new BadRequestException("Failed to add streaming table."); |
| } |
| try { |
| streamingSourceConfig.setName(tableDesc.getIdentity()); |
| streamingSourceConfig.setUuid(UUID.randomUUID().toString()); |
| streamingService.createStreamingConfig(streamingSourceConfig, projectInstance); |
| saveStreamingSuccess = true; |
| } catch (IOException e) { |
| logger.error("Failed to save StreamingSourceConfig:" + e.getLocalizedMessage(), e); |
| throw new InternalErrorException("Failed to save StreamingSourceConfig: " + e.getLocalizedMessage()); |
| } |
| |
| } finally { |
| if (!saveTableSuccess || !saveStreamingSuccess) { |
| if (saveTableSuccess) { |
| try { |
| tableService.unloadHiveTable(tableDesc.getIdentity(), project); |
| } catch (IOException e) { |
| throw new InternalErrorException("Action failed and failed to rollback the create table " |
| + e.getLocalizedMessage(), e); |
| } |
| } |
| if (saveStreamingSuccess) { |
| try { |
| streamingService.dropStreamingConfig(streamingSourceConfig); |
| } catch (IOException e) { |
| throw new InternalErrorException( |
| "Action failed and failed to rollback the created streaming config: " |
| + e.getLocalizedMessage(), e); |
| } |
| } |
| } |
| |
| } |
| streamingRequest.setSuccessful(true); |
| return streamingRequest; |
| } |
| |
| private void validateInput(TableDesc tableDesc, StreamingSourceConfig streamingSourceConfig) { |
| if (StringUtils.isEmpty(tableDesc.getIdentity()) || StringUtils.isEmpty(streamingSourceConfig.getName())) { |
| logger.error("streamingSourceConfig name should not be empty."); |
| throw new BadRequestException("streamingSourceConfig name should not be empty."); |
| } |
| |
| // validate the compatibility for input table schema and the underline hive table schema |
| if (tableDesc.getSourceType() == ISourceAware.ID_KAFKA_HIVE) { |
| List<FieldSchema> fields; |
| try { |
| HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(new HiveConf()); |
| fields = metaStoreClient.getFields(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable(), tableDesc.getName()); |
| } catch (NoSuchObjectException noObjectException) { |
| logger.info("table not exist in hive meta store for table:" + tableDesc.getIdentity(), |
| noObjectException); |
| throw new BadRequestException("table doesn't exist in hive meta store for table:" |
| + tableDesc.getIdentity(), ResponseCode.CODE_UNDEFINED, noObjectException); |
| } catch (Exception e) { |
| logger.error("error when get metadata from hive meta store for table:" + tableDesc.getIdentity(), e); |
| throw new BadRequestException("error when connect hive meta store", ResponseCode.CODE_UNDEFINED, e); |
| } |
| // check the data type compatibility for each column |
| Map<String, FieldSchema> fieldSchemaMap = Maps.newHashMap(); |
| for (FieldSchema field : fields) { |
| fieldSchemaMap.put(field.getName().toUpperCase(Locale.ROOT), field); |
| } |
| List<String> incompatibleMsgs = Lists.newArrayList(); |
| for (ColumnDesc columnDesc : tableDesc.getColumns()) { |
| FieldSchema fieldSchema = fieldSchemaMap.get(columnDesc.getName().toUpperCase(Locale.ROOT)); |
| if (fieldSchema == null) { |
| if (!TimeDerivedColumnType.isTimeDerivedColumn(columnDesc.getName())) { |
| incompatibleMsgs.add("column not exist in hive table:" + columnDesc.getName()); |
| continue; |
| } else { |
| continue; |
| } |
| } |
| if (!checkHiveTableFieldCompatible(fieldSchema, columnDesc)) { |
| String msg = String.format(Locale.ROOT, |
| "column:%s defined in hive type:%s is incompatible with the column definition:%s", |
| columnDesc.getName(), fieldSchema.getType(), columnDesc.getDatatype()); |
| incompatibleMsgs.add(msg); |
| } |
| } |
| if (!incompatibleMsgs.isEmpty()) { |
| logger.info("incompatible for hive and input table schema:{}", incompatibleMsgs); |
| throw new BadRequestException("incompatible for hive schema and input table schema:" + incompatibleMsgs); |
| } |
| } |
| } |
| |
| private static Map<String, Set<String>> COMPATIBLE_MAP = Maps.newHashMap(); |
| static { |
| COMPATIBLE_MAP.put("float", Sets.newHashSet("double")); |
| COMPATIBLE_MAP.put("string", Sets.newHashSet("varchar", "char", "varchar(256)")); |
| COMPATIBLE_MAP.put("varchar", Sets.newHashSet("string", "char")); |
| COMPATIBLE_MAP.put("varchar(256)", Sets.newHashSet("string", "char", "varchar")); |
| COMPATIBLE_MAP.put("long", Sets.newHashSet("bigint", "int", "smallint", "integer")); |
| COMPATIBLE_MAP.put("bigint", Sets.newHashSet("long", "int", "smallint", "integer")); |
| COMPATIBLE_MAP.put("int", Sets.newHashSet("smallint", "integer")); |
| } |
| |
| private boolean checkHiveTableFieldCompatible(FieldSchema fieldSchema, ColumnDesc columnDesc) { |
| DataType normalized = DataType.getType(columnDesc.getDatatype()); |
| String normalizedDataType = normalized == null ? columnDesc.getDatatype() : normalized.toString(); |
| if (fieldSchema.getType().equals(normalizedDataType)) { |
| return true; |
| } |
| Set<String> compatibleSet = COMPATIBLE_MAP.get(fieldSchema.getType()); |
| if (compatibleSet != null && compatibleSet.contains(normalizedDataType)) { |
| return true; |
| } |
| return false; |
| } |
| |
| @RequestMapping(value = "/updateConfig", method = { RequestMethod.PUT }) |
| @ResponseBody |
| public StreamingRequest updateStreamingConfig(@RequestBody StreamingRequest streamingRequest) |
| throws JsonProcessingException { |
| StreamingSourceConfig streamingSourceConfig = deserializeStreamingConfig(streamingRequest.getStreamingConfig()); |
| |
| if (streamingSourceConfig == null) { |
| return streamingRequest; |
| } |
| |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to updateStreamingConfig.", user); |
| try { |
| streamingSourceConfig = streamingService.updateStreamingConfig(streamingSourceConfig); |
| } catch (AccessDeniedException accessDeniedException) { |
| throw new ForbiddenException("You don't have right to update this StreamingSourceConfig."); |
| } catch (Exception e) { |
| logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e); |
| throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage()); |
| } |
| streamingRequest.setSuccessful(true); |
| |
| return streamingRequest; |
| } |
| |
| @RequestMapping(value = "/{configName}", method = { RequestMethod.DELETE }) |
| @ResponseBody |
| public void deleteConfig(@PathVariable String configName) throws IOException { |
| StreamingSourceConfig config = streamingService.getStreamingManagerV2().getConfig(configName); |
| if (null == config) { |
| throw new NotFoundException("StreamingSourceConfig with name " + configName + " not found.."); |
| } |
| |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to delete config: {}", user, configName); |
| |
| try { |
| streamingService.dropStreamingConfig(config); |
| } catch (Exception e) { |
| logger.error(e.getLocalizedMessage(), e); |
| throw new InternalErrorException("Failed to delete StreamingSourceConfig. " + " Caused by: " |
| + e.getMessage(), e); |
| } |
| } |
| |
| @RequestMapping(value = "/parserTemplate", method = { RequestMethod.GET }) |
| @ResponseBody |
| public String getParserTemplate(@RequestParam(value = "sourceType") int sourceType, |
| @RequestParam(value = "streamingConfig") String streamingConfigStr) { |
| StreamingSourceConfig streamingSourceConfig = deserializeStreamingConfig(streamingConfigStr); |
| return streamingService.getParserTemplate(sourceType, streamingSourceConfig); |
| } |
| |
| @RequestMapping(value = "/cubeAssignments", method = { RequestMethod.GET }) |
| @ResponseBody |
| public List<CubeAssignment> getCubeAssignments(@RequestParam(value = "cube", required = false) String cube) { |
| CubeInstance cubeInstance = null; |
| if (cube != null) { |
| cubeInstance = cubeMgmtService.getCubeManager().getCube(cube); |
| } |
| return streamingService.getStreamingCubeAssignments(cubeInstance); |
| } |
| |
| @RequestMapping(value = "/rsAssignments", method = { RequestMethod.GET }) |
| @ResponseBody |
| public Map<Integer, Map<String, List<Partition>>> getReplicaSetAssignments( |
| @RequestParam(value = "replicaSetID", required = false) Integer replicaSetID) { |
| return streamingService.getStreamingReplicaSetAssignments(replicaSetID); |
| } |
| |
| @RequestMapping(value = "/balance/recommend", method = { RequestMethod.GET }) |
| @ResponseBody |
| public Map<Integer, Map<String, List<Partition>>> reBalanceRecommend() { |
| return streamingService.reBalancePlan(); |
| } |
| |
| @RequestMapping(value = "/balance", method = { RequestMethod.POST }) |
| @ResponseBody |
| public void reBalance(@RequestBody String reBalancePlanStr) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to do reBalance.", user); |
| streamingService.reBalance(deserializeRebalancePlan(reBalancePlanStr)); |
| } |
| |
| private Map<Integer, Map<String, List<Partition>>> deserializeRebalancePlan(String reBalancePlanStr) { |
| TypeReference<Map<Integer, Map<String, List<Partition>>>> typeRef = new TypeReference<Map<Integer, Map<String, List<Partition>>>>() { |
| }; |
| ObjectMapper mapper = new ObjectMapper(); |
| try { |
| return mapper.readValue(reBalancePlanStr, typeRef); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @RequestMapping(value = "/cubeAssignments/{cubeName}", method = { RequestMethod.DELETE }) |
| @ResponseBody |
| public void removeCubeAssignment(@PathVariable String cubeName) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to remove CubeAssignment {}", user, cubeName); |
| streamingService.removeCubeAssignment(); |
| } |
| |
| @RequestMapping(value = "/cubes", method = { RequestMethod.GET }) |
| @ResponseBody |
| public List<String> getStreamingCubes() { |
| return streamingService.getStreamingCubes(); |
| } |
| |
| @RequestMapping(value = "/cubes/{cubeName}/consumeState", method = { RequestMethod.GET }) |
| @ResponseBody |
| public String getStreamingCubeConsumeState(@PathVariable String cubeName) { |
| return streamingService.getStreamingCubeConsumeState(cubeName).toString(); |
| } |
| |
| @RequestMapping(value = "/cubes/{cubeName}/assign", method = { RequestMethod.PUT }) |
| @ResponseBody |
| public void assignStreamingCube(@PathVariable String cubeName) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to assign cube {}", user, cubeName); |
| CubeInstance cube = cubeMgmtService.getCubeManager().getCube(cubeName); |
| streamingService.assignCube(cube); |
| } |
| |
| @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = { RequestMethod.PUT }) |
| @ResponseBody |
| public void unAssignStreamingCube(@PathVariable String cubeName) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to unAssign cube {}", user, cubeName); |
| CubeInstance cube = cubeMgmtService.getCubeManager().getCube(cubeName); |
| streamingService.unAssignCube(cube); |
| } |
| |
| @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = { RequestMethod.POST }) |
| @ResponseBody |
| public void reAssignStreamingCube(@PathVariable String cubeName, @RequestBody CubeAssignment newAssignment) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to reAssign cube {}", user, cubeName); |
| streamingService.reAssignCube(cubeName, newAssignment); |
| } |
| |
| @RequestMapping(value = "/receivers", method = { RequestMethod.GET }) |
| @ResponseBody |
| public List<Node> getStreamingReceivers() { |
| return streamingService.getReceivers(); |
| } |
| |
| @RequestMapping(value = "/receivers/{receiverID:.+}", method = { RequestMethod.DELETE }) |
| @ResponseBody |
| public void removeStreamingReceiver(@PathVariable String receiverID) { |
| Node receiver = Node.fromNormalizeString(receiverID); |
| streamingService.removeReceiver(receiver); |
| } |
| |
| @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST }) |
| @ResponseBody |
| public void createReplicaSet(@RequestBody ReplicaSet rs) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to create ReplicaSet {}", user, rs.getReplicaSetID()); |
| streamingService.createReplicaSet(rs); |
| } |
| |
| @RequestMapping(value = "/replicaSet/{replicaSetID}", method = { RequestMethod.DELETE }) |
| @ResponseBody |
| public void removeReplicaSet(@PathVariable Integer replicaSetID) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to remove ReplicaSet {}", user, replicaSetID); |
| streamingService.removeReplicaSet(replicaSetID); |
| } |
| |
| @RequestMapping(value = "/replicaSets", method = { RequestMethod.GET }) |
| @ResponseBody |
| public List<ReplicaSet> getReplicaSets() { |
| return streamingService.getReplicaSets(); |
| } |
| |
| @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.PUT }) |
| @ResponseBody |
| public void addNodeToReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to add Node {} To ReplicaSet {}", user, nodeID, replicaSetID); |
| streamingService.addNodeToReplicaSet(replicaSetID, nodeID); |
| } |
| |
| @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.DELETE }) |
| @ResponseBody |
| public void removeNodeFromReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to remove Node {} from ReplicaSet {}", user, nodeID, replicaSetID); |
| streamingService.removeNodeFromReplicaSet(replicaSetID, nodeID); |
| } |
| |
| @RequestMapping(value = "/cubes/{cubeName}/suspendConsume", method = { RequestMethod.PUT }) |
| @ResponseBody |
| public void pauseCubeConsume(@PathVariable String cubeName) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to pause Consumers for cube {}", user, cubeName); |
| CubeInstance cube = cubeMgmtService.getCubeManager().getCube(cubeName); |
| streamingService.pauseConsumers(cube); |
| } |
| |
| @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = { RequestMethod.PUT }) |
| @ResponseBody |
| public void resumeCubeConsume(@PathVariable String cubeName) { |
| final String user = SecurityContextHolder.getContext().getAuthentication().getName(); |
| logger.info("{} try to resume Consumers for cube {}", user, cubeName); |
| CubeInstance cube = cubeMgmtService.getCubeManager().getCube(cubeName); |
| streamingService.resumeConsumers(cube); |
| } |
| |
| @RequestMapping(value = "/cubes/{cubeName}/stats", method = { RequestMethod.GET }) |
| @ResponseBody |
| public CubeRealTimeState getCubeRealTimeState(@PathVariable String cubeName) { |
| CubeInstance cube = cubeMgmtService.getCubeManager().getCube(cubeName); |
| return streamingService.getCubeRealTimeState(cube); |
| } |
| |
| @RequestMapping(value = "/receivers/{receiverID:.+}/stats", method = { RequestMethod.GET }) |
| @ResponseBody |
| public ReceiverStats getReceiverStats(@PathVariable String receiverID) { |
| Node receiver = Node.fromNormalizeString(receiverID); |
| return streamingService.getReceiverStats(receiver); |
| } |
| |
| @RequestMapping(value = "/cluster/state", method = { RequestMethod.GET }) |
| @ResponseBody |
| public ClusterState getClusterState() { |
| return streamingService.getClusterState(); |
| } |
| |
| private TableDesc deserializeTableDesc(StreamingRequestV2 streamingRequest) { |
| TableDesc desc = null; |
| try { |
| logger.debug("Saving TableDesc " + streamingRequest.getTableData()); |
| desc = JsonUtil.readValue(streamingRequest.getTableData(), TableDesc.class); |
| } catch (JsonParseException e) { |
| logger.error("The TableDesc definition is invalid.", e); |
| updateRequest(streamingRequest, false, e.getMessage()); |
| } catch (JsonMappingException e) { |
| logger.error("The data TableDesc definition is invalid.", e); |
| updateRequest(streamingRequest, false, e.getMessage()); |
| } catch (IOException e) { |
| logger.error("Failed to deal with the request.", e); |
| throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e); |
| } |
| |
| String[] dbTable = HadoopUtil.parseHiveTableName(desc.getName()); |
| desc.setName(dbTable[1]); |
| desc.setDatabase(dbTable[0]); |
| desc.getIdentity(); |
| return desc; |
| } |
| |
| private StreamingSourceConfig deserializeStreamingConfig(String streamingConfigStr) { |
| try { |
| logger.debug("Saving StreamingSourceConfig " + streamingConfigStr); |
| return JsonUtil.readValue(streamingConfigStr, StreamingSourceConfig.class); |
| } catch (Exception e) { |
| logger.error("The StreamingSourceConfig definition is invalid.", e); |
| throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e); |
| } |
| } |
| |
| private void updateRequest(StreamingRequestV2 request, boolean success, String message) { |
| request.setSuccessful(success); |
| request.setMessage(message); |
| } |
| |
| public void setCubeService(CubeService cubeService) { |
| this.cubeMgmtService = cubeService; |
| } |
| |
| } |