| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kylin.rest.service; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.kylin.common.KylinConfig; |
| import org.apache.kylin.common.util.Pair; |
| import org.apache.kylin.stream.coordinator.Coordinator; |
| import org.apache.kylin.stream.coordinator.StreamMetadataStore; |
| import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory; |
| import org.apache.kylin.stream.coordinator.client.CoordinatorClient; |
| import org.apache.kylin.stream.coordinator.coordinate.StreamingCoordinator; |
| import org.apache.kylin.stream.core.model.CubeAssignment; |
| import org.apache.kylin.stream.core.model.ReplicaSet; |
| import org.apache.kylin.stream.core.model.Node; |
| import org.apache.kylin.stream.core.source.Partition; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.stereotype.Component; |
| |
| import org.apache.kylin.shaded.com.google.common.collect.Sets; |
| |
| @Component("streamingCoordinatorService") |
| public class StreamingCoordinatorService extends BasicService { |
| private static final Logger logger = LoggerFactory.getLogger(StreamingCoordinatorService.class); |
| |
| private StreamMetadataStore streamMetadataStore; |
| |
| private CoordinatorClient streamingCoordinator; |
| |
| public StreamingCoordinatorService() { |
| streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore(); |
| if (KylinConfig.getInstanceFromEnv().isNewCoordinatorEnabled()) { |
| logger.info("Use new version coordinator."); |
| streamingCoordinator = StreamingCoordinator.getInstance(); |
| } else { |
| logger.info("Use old version coordinator."); |
| streamingCoordinator = Coordinator.getInstance(); |
| } |
| } |
| |
| public synchronized Map<Integer, Map<String, List<Partition>>> reBalanceRecommend() { |
| return streamingCoordinator.reBalanceRecommend(); |
| } |
| |
| public synchronized void reBalance(Map<Integer, Map<String, List<Partition>>> reBalancePlan) { |
| streamingCoordinator.reBalance(reBalancePlan); |
| } |
| |
| public void assignCube(String cubeName) { |
| streamingCoordinator.assignCube(cubeName); |
| } |
| |
| public void unAssignCube(String cubeName) { |
| streamingCoordinator.unAssignCube(cubeName); |
| } |
| |
| public void reAssignCube(String cubeName, CubeAssignment newAssignment) { |
| validateAssignment(newAssignment); |
| streamingCoordinator.reAssignCube(cubeName, newAssignment); |
| } |
| |
| private void validateAssignment(CubeAssignment newAssignment) { |
| Map<Integer, List<Partition>> assignments = newAssignment.getAssignments(); |
| Set<Integer> inputReplicaSetIDs = assignments.keySet(); |
| Set<Integer> allReplicaSetIDs = Sets.newHashSet(streamMetadataStore.getReplicaSetIDs()); |
| for (Integer inputReplicaSetID : inputReplicaSetIDs) { |
| if (!allReplicaSetIDs.contains(inputReplicaSetID)) { |
| throw new IllegalArgumentException("the replica set id:" + inputReplicaSetID + " does not exist"); |
| } |
| } |
| } |
| |
| public void pauseConsumers(String cubeName) { |
| streamingCoordinator.pauseConsumers(cubeName); |
| } |
| |
| public void resumeConsumers(String cubeName) { |
| streamingCoordinator.resumeConsumers(cubeName); |
| } |
| |
| public void replicaSetLeaderChange(int replicaSetID, Node newLeader) { |
| streamingCoordinator.replicaSetLeaderChange(replicaSetID, newLeader); |
| } |
| |
| public void createReplicaSet(ReplicaSet rs) { |
| streamingCoordinator.createReplicaSet(rs); |
| } |
| |
| public void removeReplicaSet(int rsID) { |
| streamingCoordinator.removeReplicaSet(rsID); |
| } |
| |
| public void addNodeToReplicaSet(Integer replicaSetID, String nodeID) { |
| streamingCoordinator.addNodeToReplicaSet(replicaSetID, nodeID); |
| } |
| |
| public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) { |
| streamingCoordinator.removeNodeFromReplicaSet(replicaSetID, nodeID); |
| } |
| |
| public void onSegmentRemoteStoreComplete(String cubeName, Pair<Long, Long> segment, Node receiver) { |
| streamingCoordinator.segmentRemoteStoreComplete(receiver, cubeName, segment); |
| } |
| } |