| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.nimbus; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.storm.DaemonConfig; |
| import org.apache.storm.daemon.supervisor.Supervisor; |
| import org.apache.storm.generated.SupervisorAssignments; |
| import org.apache.storm.utils.ConfigUtils; |
| import org.apache.storm.utils.ObjectReader; |
| import org.apache.storm.utils.SupervisorClient; |
| import org.apache.storm.utils.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A service for distributing master assignments to supervisors, this service makes the assignments notification |
| * asynchronous. |
| * |
| * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer. |
| * |
| * <p>Master will shuffle its node request to the queues, if the target queue is full, we just discard the request, |
| * let the supervisors sync instead. |
| * |
| * <p>Caution: this class is not thread safe. |
| * |
| * <pre>{@code |
| * Working mode |
| * +--------+ +-----------------+ |
| * | queue1 | ==> | Working thread1 | |
| * +--------+ shuffle +--------+ +-----------------+ |
| * | Master | ==> |
| * +--------+ +--------+ +-----------------+ |
| * | queue2 | ==> | Working thread2 | |
| * +--------+ +-----------------+ |
| * } |
| * </pre> |
| */ |
| public class AssignmentDistributionService implements Closeable { |
| private static final Logger LOG = LoggerFactory.getLogger(AssignmentDistributionService.class); |
| private ExecutorService service; |
| /** |
| * Flag to indicate if the service is active. |
| */ |
| private volatile boolean active = false; |
| |
| private Random random; |
| /** |
| * Working threads num. |
| */ |
| private int threadsNum = 0; |
| /** |
| * Working thread queue size. |
| */ |
| private int queueSize = 0; |
| |
| /** |
| * Assignments request queue. |
| */ |
| private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue; |
| |
| /** |
| * local supervisors for local cluster assignments distribution. |
| */ |
| private Map<String, Supervisor> localSupervisors; |
| |
| private Map conf; |
| |
| private boolean isLocalMode = false; // boolean cache for local mode decision |
| |
| /** |
| * Factory method for initialize a instance. |
| * @param conf config. |
| * @return an instance of {@link AssignmentDistributionService} |
| */ |
| public static AssignmentDistributionService getInstance(Map conf) { |
| AssignmentDistributionService service = new AssignmentDistributionService(); |
| service.prepare(conf); |
| return service; |
| } |
| |
| /** |
| * Function for initialization. |
| * |
| * @param conf config |
| */ |
| public void prepare(Map conf) { |
| this.conf = conf; |
| this.random = new Random(47); |
| |
| this.threadsNum = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10); |
| this.queueSize = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100); |
| |
| this.assignmentsQueue = new HashMap<>(); |
| for (int i = 0; i < threadsNum; i++) { |
| this.assignmentsQueue.put(i, new LinkedBlockingQueue<NodeAssignments>(queueSize)); |
| } |
| //start the thread pool |
| this.service = Executors.newFixedThreadPool(threadsNum); |
| this.active = true; |
| //start the threads |
| for (int i = 0; i < threadsNum; i++) { |
| this.service.submit(new DistributeTask(this, i)); |
| } |
| // for local cluster |
| localSupervisors = new HashMap<>(); |
| if (ConfigUtils.isLocalMode(conf)) { |
| isLocalMode = true; |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| this.active = false; |
| this.service.shutdownNow(); |
| try { |
| this.service.awaitTermination(1L, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| LOG.error("Failed to close assignments distribute service"); |
| } |
| this.assignmentsQueue = null; |
| } |
| |
| /** |
| * Add an assignments for a node/supervisor for distribution. |
| * @param node node id of supervisor. |
| * @param host host name for the node. |
| * @param serverPort node thrift server port. |
| * @param assignments the {@link org.apache.storm.generated.SupervisorAssignments} |
| */ |
| public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) { |
| try { |
| //For some reasons, we can not get supervisor port info, eg: supervisor shutdown, |
| //Just skip for this scheduling round. |
| if (serverPort == null) { |
| LOG.warn("Discard an assignment distribution for node {} because server port info is missing.", node); |
| return; |
| } |
| |
| boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS); |
| if (!success) { |
| LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node); |
| } |
| |
| } catch (InterruptedException e) { |
| LOG.error("Add node assignments interrupted: {}", e.getMessage()); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public void addLocalSupervisor(Supervisor supervisor) { |
| this.localSupervisors.put(supervisor.getId(), supervisor); |
| } |
| |
| private Integer nextQueueId() { |
| return this.random.nextInt(threadsNum); |
| } |
| |
| private LinkedBlockingQueue<NodeAssignments> nextQueue() { |
| return this.assignmentsQueue.get(nextQueueId()); |
| } |
| |
| private LinkedBlockingQueue<NodeAssignments> getQueueById(Integer queueIndex) { |
| return this.assignmentsQueue.get(queueIndex); |
| } |
| |
| /** |
| * Get an assignments from the target queue with the specific index. |
| * @param queueIndex index of the queue |
| * @return an {@link NodeAssignments} |
| */ |
| public NodeAssignments nextAssignments(Integer queueIndex) throws InterruptedException { |
| NodeAssignments target = null; |
| while (true) { |
| target = getQueueById(queueIndex).poll(); |
| if (target != null) { |
| return target; |
| } |
| Time.sleep(100L); |
| } |
| } |
| |
| public boolean isActive() { |
| return this.active; |
| } |
| |
| public Map getConf() { |
| return this.conf; |
| } |
| |
| static class NodeAssignments { |
| private String node; |
| private String host; |
| private Integer serverPort; |
| private SupervisorAssignments assignments; |
| |
| private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments) { |
| this.node = node; |
| this.host = host; |
| this.serverPort = serverPort; |
| this.assignments = assignments; |
| } |
| |
| public static NodeAssignments getInstance(String node, String host, Integer serverPort, |
| SupervisorAssignments assignments) { |
| return new NodeAssignments(node, host, serverPort, assignments); |
| } |
| |
| //supervisor assignment id/supervisor id |
| public String getNode() { |
| return this.node; |
| } |
| |
| public String getHost() { |
| return host; |
| } |
| |
| public Integer getServerPort() { |
| return serverPort; |
| } |
| |
| public SupervisorAssignments getAssignments() { |
| return this.assignments; |
| } |
| |
| } |
| |
| /** |
| * Task to distribute assignments. |
| */ |
| static class DistributeTask implements Runnable { |
| private AssignmentDistributionService service; |
| private Integer queueIndex; |
| |
| DistributeTask(AssignmentDistributionService service, Integer index) { |
| this.service = service; |
| this.queueIndex = index; |
| } |
| |
| @Override |
| public void run() { |
| while (service.isActive()) { |
| try { |
| NodeAssignments nodeAssignments = this.service.nextAssignments(queueIndex); |
| sendAssignmentsToNode(nodeAssignments); |
| } catch (InterruptedException e) { |
| if (service.isActive()) { |
| LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause()); |
| } else { |
| // service is off now just interrupt it. |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| |
| private void sendAssignmentsToNode(NodeAssignments assignments) { |
| if (this.service.isLocalMode) { |
| //local node |
| Supervisor supervisor = this.service.localSupervisors.get(assignments.getNode()); |
| if (supervisor != null) { |
| supervisor.sendSupervisorAssignments(assignments.getAssignments()); |
| } else { |
| LOG.error("Can not find node {} for assignments distribution", assignments.getNode()); |
| throw new RuntimeException("null for node " + assignments.getNode() + " supervisor instance."); |
| } |
| } else { |
| // distributed mode |
| try (SupervisorClient client = SupervisorClient.getConfiguredClient(service.getConf(), |
| assignments.getHost(), assignments.getServerPort())) { |
| try { |
| client.getIface().sendSupervisorAssignments(assignments.getAssignments()); |
| } catch (Exception e) { |
| //just ignore the exception. |
| LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage()); |
| } |
| } catch (Throwable e) { |
| //just ignore any error/exception. |
| LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage()); |
| } |
| |
| } |
| } |
| } |
| } |