blob: 3459fceba537437568769ced965772efe83fa5cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.rest.service;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.stream.coordinator.Coordinator;
import org.apache.kylin.stream.coordinator.StreamMetadataStore;
import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.coordinate.StreamingCoordinator;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.source.Partition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
@Component("streamingCoordinatorService")
public class StreamingCoordinatorService extends BasicService {
private static final Logger logger = LoggerFactory.getLogger(StreamingCoordinatorService.class);
private StreamMetadataStore streamMetadataStore;
private CoordinatorClient streamingCoordinator;
public StreamingCoordinatorService() {
streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
if (KylinConfig.getInstanceFromEnv().isNewCoordinatorEnabled()) {
logger.info("Use new version coordinator.");
streamingCoordinator = StreamingCoordinator.getInstance();
} else {
logger.info("Use old version coordinator.");
streamingCoordinator = Coordinator.getInstance();
}
}
public synchronized Map<Integer, Map<String, List<Partition>>> reBalanceRecommend() {
return streamingCoordinator.reBalanceRecommend();
}
public synchronized void reBalance(Map<Integer, Map<String, List<Partition>>> reBalancePlan) {
streamingCoordinator.reBalance(reBalancePlan);
}
public void assignCube(String cubeName) {
streamingCoordinator.assignCube(cubeName);
}
public void unAssignCube(String cubeName) {
streamingCoordinator.unAssignCube(cubeName);
}
public void reAssignCube(String cubeName, CubeAssignment newAssignment) {
validateAssignment(newAssignment);
streamingCoordinator.reAssignCube(cubeName, newAssignment);
}
private void validateAssignment(CubeAssignment newAssignment) {
Map<Integer, List<Partition>> assignments = newAssignment.getAssignments();
Set<Integer> inputReplicaSetIDs = assignments.keySet();
Set<Integer> allReplicaSetIDs = Sets.newHashSet(streamMetadataStore.getReplicaSetIDs());
for (Integer inputReplicaSetID : inputReplicaSetIDs) {
if (!allReplicaSetIDs.contains(inputReplicaSetID)) {
throw new IllegalArgumentException("the replica set id:" + inputReplicaSetID + " does not exist");
}
}
}
public void pauseConsumers(String cubeName) {
streamingCoordinator.pauseConsumers(cubeName);
}
public void resumeConsumers(String cubeName) {
streamingCoordinator.resumeConsumers(cubeName);
}
public void replicaSetLeaderChange(int replicaSetID, Node newLeader) {
streamingCoordinator.replicaSetLeaderChange(replicaSetID, newLeader);
}
public void createReplicaSet(ReplicaSet rs) {
streamingCoordinator.createReplicaSet(rs);
}
public void removeReplicaSet(int rsID) {
streamingCoordinator.removeReplicaSet(rsID);
}
public void addNodeToReplicaSet(Integer replicaSetID, String nodeID) {
streamingCoordinator.addNodeToReplicaSet(replicaSetID, nodeID);
}
public void removeNodeFromReplicaSet(Integer replicaSetID, String nodeID) {
streamingCoordinator.removeNodeFromReplicaSet(replicaSetID, nodeID);
}
public void onSegmentRemoteStoreComplete(String cubeName, Pair<Long, Long> segment, Node receiver) {
streamingCoordinator.segmentRemoteStoreComplete(receiver, cubeName, segment);
}
}