| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.daemon.worker; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.function.Supplier; |
| import org.apache.storm.Config; |
| import org.apache.storm.Constants; |
| import org.apache.storm.StormTimer; |
| import org.apache.storm.cluster.DaemonType; |
| import org.apache.storm.cluster.IStateStorage; |
| import org.apache.storm.cluster.IStormClusterState; |
| import org.apache.storm.cluster.VersionedData; |
| import org.apache.storm.daemon.StormCommon; |
| import org.apache.storm.daemon.supervisor.AdvancedFSOps; |
| import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState; |
| import org.apache.storm.executor.IRunningExecutor; |
| import org.apache.storm.generated.Assignment; |
| import org.apache.storm.generated.DebugOptions; |
| import org.apache.storm.generated.Grouping; |
| import org.apache.storm.generated.InvalidTopologyException; |
| import org.apache.storm.generated.NodeInfo; |
| import org.apache.storm.generated.StormBase; |
| import org.apache.storm.generated.StormTopology; |
| import org.apache.storm.generated.StreamInfo; |
| import org.apache.storm.generated.TopologyStatus; |
| import org.apache.storm.grouping.Load; |
| import org.apache.storm.grouping.LoadMapping; |
| import org.apache.storm.hooks.IWorkerHook; |
| import org.apache.storm.messaging.ConnectionWithStatus; |
| import org.apache.storm.messaging.DeserializingConnectionCallback; |
| import org.apache.storm.messaging.IConnection; |
| import org.apache.storm.messaging.IConnectionCallback; |
| import org.apache.storm.messaging.IContext; |
| import org.apache.storm.messaging.TransportFactory; |
| import org.apache.storm.messaging.netty.BackPressureStatus; |
| import org.apache.storm.metrics2.StormMetricRegistry; |
| import org.apache.storm.policy.IWaitStrategy; |
| import org.apache.storm.security.auth.IAutoCredentials; |
| import org.apache.storm.serialization.ITupleSerializer; |
| import org.apache.storm.serialization.KryoTupleSerializer; |
| import org.apache.storm.shade.com.google.common.collect.ImmutableMap; |
| import org.apache.storm.shade.com.google.common.collect.Sets; |
| import org.apache.storm.shade.org.apache.commons.lang.Validate; |
| import org.apache.storm.task.WorkerTopologyContext; |
| import org.apache.storm.tuple.AddressedTuple; |
| import org.apache.storm.tuple.Fields; |
| import org.apache.storm.utils.ConfigUtils; |
| import org.apache.storm.utils.JCQueue; |
| import org.apache.storm.utils.ObjectReader; |
| import org.apache.storm.utils.SupervisorClient; |
| import org.apache.storm.utils.SupervisorIfaceFactory; |
| import org.apache.storm.utils.ThriftTopologyUtils; |
| import org.apache.storm.utils.Utils; |
| import org.apache.storm.utils.Utils.SmartThread; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class WorkerState { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class); |
| private static final long LOAD_REFRESH_INTERVAL_MS = 5000L; |
| private static final int RESEND_BACKPRESSURE_SIZE = 10000; |
| private static long dropCount = 0; |
| final Map<String, Object> conf; |
| final IContext mqContext; |
| final IConnection receiver; |
| final String topologyId; |
| final String assignmentId; |
| private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier; |
| final int port; |
| final String workerId; |
| final IStateStorage stateStorage; |
| final IStormClusterState stormClusterState; |
| // when worker bootup, worker will start to setup initial connections to |
| // other workers. When all connection is ready, we will count down this latch |
| // and spout and bolt will be activated, assuming the topology is not deactivated. |
| // used in worker only, keep it as a latch |
| final CountDownLatch isWorkerActive; |
| final AtomicBoolean isTopologyActive; |
| final AtomicReference<Map<String, DebugOptions>> stormComponentToDebug; |
| // local executors and localTaskIds running in this worker |
| final Set<List<Long>> localExecutors; |
| final ArrayList<Integer> localTaskIds; |
| // [taskId]-> JCQueue : initialized after local executors are initialized |
| final Map<Integer, JCQueue> localReceiveQueues = new HashMap<>(); |
| final Map<String, Object> topologyConf; |
| final StormTopology topology; |
| final StormTopology systemTopology; |
| final Map<Integer, String> taskToComponent; |
| final Map<String, Map<String, Fields>> componentToStreamToFields; |
| final Map<String, List<Integer>> componentToSortedTasks; |
| final ConcurrentMap<String, Long> blobToLastKnownVersion; |
| final ReentrantReadWriteLock endpointSocketLock; |
| final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort; |
| final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket; |
| // executor id is in form [start_task_id end_task_id] |
| final Map<List<Long>, JCQueue> executorReceiveQueueMap; |
| final Map<Integer, JCQueue> taskToExecutorQueue; |
| final Runnable suicideCallback; |
| final Utils.UptimeComputer uptime; |
| final Map<String, Object> defaultSharedResources; |
| final Map<String, Object> userSharedResources; |
| final LoadMapping loadMapping; |
| final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions; |
| // Timers |
| final StormTimer heartbeatTimer = mkHaltingTimer("heartbeat-timer"); |
| final StormTimer refreshLoadTimer = mkHaltingTimer("refresh-load-timer"); |
| final StormTimer refreshConnectionsTimer = mkHaltingTimer("refresh-connections-timer"); |
| final StormTimer refreshCredentialsTimer = mkHaltingTimer("refresh-credentials-timer"); |
| final StormTimer checkForUpdatedBlobsTimer = mkHaltingTimer("check-for-updated-blobs-timer"); |
| final StormTimer resetLogLevelsTimer = mkHaltingTimer("reset-log-levels-timer"); |
| final StormTimer refreshActiveTimer = mkHaltingTimer("refresh-active-timer"); |
| final StormTimer executorHeartbeatTimer = mkHaltingTimer("executor-heartbeat-timer"); |
| final StormTimer flushTupleTimer = mkHaltingTimer("flush-tuple-timer"); |
| final StormTimer userTimer = mkHaltingTimer("user-timer"); |
| final StormTimer backPressureCheckTimer = mkHaltingTimer("backpressure-check-timer"); |
| private final WorkerTransfer workerTransfer; |
| private final BackPressureTracker bpTracker; |
| private final List<IWorkerHook> deserializedWorkerHooks; |
| // global variables only used internally in class |
| private final Set<Integer> outboundTasks; |
| private final AtomicLong nextLoadUpdate = new AtomicLong(0); |
| private final boolean trySerializeLocal; |
| private final Collection<IAutoCredentials> autoCredentials; |
| private final StormMetricRegistry metricRegistry; |
| |
| public WorkerState(Map<String, Object> conf, |
| IContext mqContext, |
| String topologyId, |
| String assignmentId, |
| Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier, |
| int port, |
| String workerId, |
| Map<String, Object> topologyConf, |
| IStateStorage stateStorage, |
| IStormClusterState stormClusterState, |
| Collection<IAutoCredentials> autoCredentials, |
| StormMetricRegistry metricRegistry) throws IOException, |
| InvalidTopologyException { |
| this.metricRegistry = metricRegistry; |
| this.autoCredentials = autoCredentials; |
| this.conf = conf; |
| this.supervisorIfaceSupplier = supervisorIfaceSupplier; |
| this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port)); |
| this.mqContext = (null != mqContext) ? mqContext : TransportFactory.makeContext(topologyConf); |
| this.topologyId = topologyId; |
| this.assignmentId = assignmentId; |
| this.port = port; |
| this.workerId = workerId; |
| this.stateStorage = stateStorage; |
| this.stormClusterState = stormClusterState; |
| this.isWorkerActive = new CountDownLatch(1); |
| this.isTopologyActive = new AtomicBoolean(false); |
| this.stormComponentToDebug = new AtomicReference<>(); |
| this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors); |
| this.localTaskIds = new ArrayList<>(); |
| this.taskToExecutorQueue = new HashMap<>(); |
| this.blobToLastKnownVersion = new ConcurrentHashMap<>(); |
| for (Map.Entry<List<Long>, JCQueue> entry : executorReceiveQueueMap.entrySet()) { |
| List<Integer> taskIds = StormCommon.executorIdToTasks(entry.getKey()); |
| for (Integer taskId : taskIds) { |
| this.taskToExecutorQueue.put(taskId, entry.getValue()); |
| } |
| this.localTaskIds.addAll(taskIds); |
| } |
| Collections.sort(localTaskIds); |
| this.topologyConf = topologyConf; |
| this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf)); |
| this.systemTopology = StormCommon.systemTopology(topologyConf, topology); |
| this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf); |
| this.componentToStreamToFields = new HashMap<>(); |
| for (String c : ThriftTopologyUtils.getComponentIds(systemTopology)) { |
| Map<String, Fields> streamToFields = new HashMap<>(); |
| for (Map.Entry<String, StreamInfo> stream : |
| ThriftTopologyUtils.getComponentCommon(systemTopology, c).get_streams().entrySet()) { |
| streamToFields.put(stream.getKey(), new Fields(stream.getValue().get_output_fields())); |
| } |
| componentToStreamToFields.put(c, streamToFields); |
| } |
| this.componentToSortedTasks = Utils.reverseMap(taskToComponent); |
| this.componentToSortedTasks.values().forEach(Collections::sort); |
| this.endpointSocketLock = new ReentrantReadWriteLock(); |
| this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>()); |
| this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>()); |
| this.suicideCallback = Utils.mkSuicideFn(); |
| this.uptime = Utils.makeUptimeComputer(); |
| this.defaultSharedResources = makeDefaultResources(); |
| this.userSharedResources = makeUserResources(); |
| this.loadMapping = new LoadMapping(); |
| this.assignmentVersions = new AtomicReference<>(new HashMap<>()); |
| this.outboundTasks = workerOutboundTasks(); |
| this.trySerializeLocal = topologyConf.containsKey(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE) |
| && (Boolean) topologyConf.get(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE); |
| if (trySerializeLocal) { |
| LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE); |
| } |
| int maxTaskId = getMaxTaskId(componentToSortedTasks); |
| this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId); |
| this.bpTracker = new BackPressureTracker(workerId, taskToExecutorQueue); |
| this.deserializedWorkerHooks = deserializeWorkerHooks(); |
| LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port); |
| IConnectionCallback cb = new DeserializingConnectionCallback(topologyConf, |
| getWorkerTopologyContext(), |
| this::transferLocalBatch); |
| Supplier<Object> newConnectionResponse = () -> { |
| BackPressureStatus bpStatus = bpTracker.getCurrStatus(); |
| LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus); |
| return bpStatus; |
| }; |
| this.receiver = this.mqContext.bind(topologyId, port, cb, newConnectionResponse); |
| } |
| |
| private static double getQueueLoad(JCQueue queue) { |
| JCQueue.QueueMetrics queueMetrics = queue.getMetrics(); |
| return ((double) queueMetrics.population()) / queueMetrics.capacity(); |
| } |
| |
| public static boolean isConnectionReady(IConnection connection) { |
| return !(connection instanceof ConnectionWithStatus) |
| || ((ConnectionWithStatus) connection).status() == ConnectionWithStatus.Status.Ready; |
| } |
| |
| private static int getMaxTaskId(Map<String, List<Integer>> componentToSortedTasks) { |
| int maxTaskId = -1; |
| for (List<Integer> integers : componentToSortedTasks.values()) { |
| if (!integers.isEmpty()) { |
| int tempMax = integers.stream().max(Integer::compareTo).get(); |
| if (tempMax > maxTaskId) { |
| maxTaskId = tempMax; |
| } |
| } |
| } |
| return maxTaskId; |
| } |
| |
| public List<IWorkerHook> getDeserializedWorkerHooks() { |
| return deserializedWorkerHooks; |
| } |
| |
| public Map<String, Object> getConf() { |
| return conf; |
| } |
| |
| public IConnection getReceiver() { |
| return receiver; |
| } |
| |
| public String getTopologyId() { |
| return topologyId; |
| } |
| |
| public int getPort() { |
| return port; |
| } |
| |
| public String getWorkerId() { |
| return workerId; |
| } |
| |
| public IStateStorage getStateStorage() { |
| return stateStorage; |
| } |
| |
| public CountDownLatch getIsWorkerActive() { |
| return isWorkerActive; |
| } |
| |
| public AtomicBoolean getIsTopologyActive() { |
| return isTopologyActive; |
| } |
| |
| public AtomicReference<Map<String, DebugOptions>> getStormComponentToDebug() { |
| return stormComponentToDebug; |
| } |
| |
| public Set<List<Long>> getLocalExecutors() { |
| return localExecutors; |
| } |
| |
| public List<Integer> getLocalTaskIds() { |
| return localTaskIds; |
| } |
| |
| public Map<Integer, JCQueue> getLocalReceiveQueues() { |
| return localReceiveQueues; |
| } |
| |
| public Map<String, Object> getTopologyConf() { |
| return topologyConf; |
| } |
| |
| public StormTopology getTopology() { |
| return topology; |
| } |
| |
| public StormTopology getSystemTopology() { |
| return systemTopology; |
| } |
| |
| public Map<Integer, String> getTaskToComponent() { |
| return taskToComponent; |
| } |
| |
| public Map<String, Map<String, Fields>> getComponentToStreamToFields() { |
| return componentToStreamToFields; |
| } |
| |
| public Map<String, List<Integer>> getComponentToSortedTasks() { |
| return componentToSortedTasks; |
| } |
| |
| public Map<String, Long> getBlobToLastKnownVersion() { |
| return blobToLastKnownVersion; |
| } |
| |
| public AtomicReference<Map<NodeInfo, IConnection>> getCachedNodeToPortSocket() { |
| return cachedNodeToPortSocket; |
| } |
| |
| public Map<List<Long>, JCQueue> getExecutorReceiveQueueMap() { |
| return executorReceiveQueueMap; |
| } |
| |
| public Runnable getSuicideCallback() { |
| return suicideCallback; |
| } |
| |
| public Utils.UptimeComputer getUptime() { |
| return uptime; |
| } |
| |
| public Map<String, Object> getDefaultSharedResources() { |
| return defaultSharedResources; |
| } |
| |
| public Map<String, Object> getUserSharedResources() { |
| return userSharedResources; |
| } |
| |
| public LoadMapping getLoadMapping() { |
| return loadMapping; |
| } |
| |
| public AtomicReference<Map<String, VersionedData<Assignment>>> getAssignmentVersions() { |
| return assignmentVersions; |
| } |
| |
| public StormTimer getUserTimer() { |
| return userTimer; |
| } |
| |
| public SmartThread makeTransferThread() { |
| return workerTransfer.makeTransferThread(); |
| } |
| |
| public void refreshConnections() { |
| Assignment assignment = null; |
| try { |
| assignment = getLocalAssignment(stormClusterState, topologyId); |
| } catch (Exception e) { |
| LOG.warn("Failed to read assignment. This should only happen when topology is shutting down.", e); |
| } |
| |
| Set<NodeInfo> neededConnections = new HashSet<>(); |
| Map<Integer, NodeInfo> newTaskToNodePort = new HashMap<>(); |
| if (null != assignment) { |
| Map<Integer, NodeInfo> taskToNodePort = StormCommon.taskToNodeport(assignment.get_executor_node_port()); |
| for (Map.Entry<Integer, NodeInfo> taskToNodePortEntry : taskToNodePort.entrySet()) { |
| Integer task = taskToNodePortEntry.getKey(); |
| if (outboundTasks.contains(task)) { |
| newTaskToNodePort.put(task, taskToNodePortEntry.getValue()); |
| if (!localTaskIds.contains(task)) { |
| neededConnections.add(taskToNodePortEntry.getValue()); |
| } |
| } |
| } |
| } |
| |
| Set<NodeInfo> currentConnections = cachedNodeToPortSocket.get().keySet(); |
| Set<NodeInfo> newConnections = Sets.difference(neededConnections, currentConnections); |
| Set<NodeInfo> removeConnections = Sets.difference(currentConnections, neededConnections); |
| |
| Map<String, String> nodeHost = assignment != null ? assignment.get_node_host() : null; |
| // Add new connections atomically |
| cachedNodeToPortSocket.getAndUpdate(prev -> { |
| Map<NodeInfo, IConnection> next = new HashMap<>(prev); |
| for (NodeInfo nodeInfo : newConnections) { |
| next.put(nodeInfo, |
| mqContext.connect( |
| topologyId, |
| //nodeHost is not null here, as newConnections is only non-empty if assignment was not null above. |
| nodeHost.get(nodeInfo.get_node()), // Host |
| nodeInfo.get_port().iterator().next().intValue(), // Port |
| workerTransfer.getRemoteBackPressureStatus())); |
| } |
| return next; |
| }); |
| |
| |
| try { |
| endpointSocketLock.writeLock().lock(); |
| cachedTaskToNodePort.set(newTaskToNodePort); |
| } finally { |
| endpointSocketLock.writeLock().unlock(); |
| } |
| |
| for (NodeInfo nodeInfo : removeConnections) { |
| cachedNodeToPortSocket.get().get(nodeInfo).close(); |
| } |
| |
| // Remove old connections atomically |
| cachedNodeToPortSocket.getAndUpdate(prev -> { |
| Map<NodeInfo, IConnection> next = new HashMap<>(prev); |
| removeConnections.forEach(next::remove); |
| return next; |
| }); |
| |
| } |
| |
| public void refreshStormActive() { |
| refreshStormActive(() -> refreshActiveTimer.schedule(0, this::refreshStormActive)); |
| } |
| |
| public void refreshStormActive(Runnable callback) { |
| StormBase base = stormClusterState.stormBase(topologyId, callback); |
| isTopologyActive.set( |
| (null != base) |
| && (base.get_status() == TopologyStatus.ACTIVE)); |
| if (null != base) { |
| Map<String, DebugOptions> debugOptionsMap = new HashMap<>(base.get_component_debug()); |
| for (DebugOptions debugOptions : debugOptionsMap.values()) { |
| if (!debugOptions.is_set_samplingpct()) { |
| debugOptions.set_samplingpct(10); |
| } |
| if (!debugOptions.is_set_enable()) { |
| debugOptions.set_enable(false); |
| } |
| } |
| stormComponentToDebug.set(debugOptionsMap); |
| LOG.debug("Events debug options {}", stormComponentToDebug.get()); |
| } |
| } |
| |
| public void refreshLoad(List<IRunningExecutor> execs) { |
| Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(localTaskIds)); |
| Map<Integer, Double> localLoad = new HashMap<>(); |
| for (IRunningExecutor exec : execs) { |
| double receiveLoad = getQueueLoad(exec.getReceiveQueue()); |
| localLoad.put(exec.getExecutorId().get(0).intValue(), receiveLoad); |
| } |
| |
| Map<Integer, Load> remoteLoad = new HashMap<>(); |
| cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks))); |
| loadMapping.setLocal(localLoad); |
| loadMapping.setRemote(remoteLoad); |
| |
| Long now = System.currentTimeMillis(); |
| if (now > nextLoadUpdate.get()) { |
| receiver.sendLoadMetrics(localLoad); |
| nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS); |
| } |
| } |
| |
| // checks if the tasks which had back pressure are now free again. if so, sends an update to other workers |
| public void refreshBackPressureStatus() { |
| LOG.debug("Checking for change in Backpressure status on worker's tasks"); |
| boolean bpSituationChanged = bpTracker.refreshBpTaskList(); |
| if (bpSituationChanged) { |
| BackPressureStatus bpStatus = bpTracker.getCurrStatus(); |
| receiver.sendBackPressureStatus(bpStatus); |
| } |
| } |
| |
| /** |
| * we will wait all connections to be ready and then activate the spout/bolt when the worker bootup. |
| */ |
| public void activateWorkerWhenAllConnectionsReady() { |
| int delaySecs = 0; |
| int recurSecs = 1; |
| refreshActiveTimer.schedule(delaySecs, |
| () -> { |
| if (areAllConnectionsReady()) { |
| LOG.info("All connections are ready for worker {}:{} with id {}", assignmentId, port, workerId); |
| isWorkerActive.countDown(); |
| } else { |
| refreshActiveTimer.schedule(recurSecs, () -> activateWorkerWhenAllConnectionsReady(), false, 0); |
| } |
| } |
| ); |
| } |
| |
| /* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */ |
| public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) { |
| return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer); |
| } |
| |
| public void flushRemotes() throws InterruptedException { |
| workerTransfer.flushRemotes(); |
| } |
| |
| public boolean tryFlushRemotes() { |
| return workerTransfer.tryFlushRemotes(); |
| } |
| |
| // Receives msgs from remote workers and feeds them to local executors. If any receiving local executor is under Back Pressure, |
| // informs other workers about back pressure situation. Runs in the NettyWorker thread. |
| private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) { |
| for (int i = 0; i < tupleBatch.size(); i++) { |
| AddressedTuple tuple = tupleBatch.get(i); |
| JCQueue queue = taskToExecutorQueue.get(tuple.dest); |
| |
| // 1- try adding to main queue if its overflow is not empty |
| if (queue.isEmptyOverflow()) { |
| if (queue.tryPublish(tuple)) { |
| continue; |
| } |
| } |
| |
| // 2- BP detected (i.e MainQ is full). So try adding to overflow |
| int currOverflowCount = queue.getOverflowCount(); |
| // get BP state object so only have to lookup once |
| BackpressureState bpState = bpTracker.getBackpressureState(tuple.dest); |
| if (bpTracker.recordBackPressure(bpState)) { |
| receiver.sendBackPressureStatus(bpTracker.getCurrStatus()); |
| bpTracker.setLastOverflowCount(bpState, currOverflowCount); |
| } else { |
| |
| if (currOverflowCount - bpTracker.getLastOverflowCount(bpState) > RESEND_BACKPRESSURE_SIZE) { |
| // resend BP status, in case prev notification was missed or reordered |
| BackPressureStatus bpStatus = bpTracker.getCurrStatus(); |
| receiver.sendBackPressureStatus(bpStatus); |
| bpTracker.setLastOverflowCount(bpState, currOverflowCount); |
| LOG.debug("Re-sent BackPressure Status. OverflowCount = {}, BP Status ID = {}. ", currOverflowCount, bpStatus.id); |
| } |
| } |
| if (!queue.tryPublishToOverflow(tuple)) { |
| dropMessage(tuple, queue); |
| } |
| } |
| } |
| |
| private void dropMessage(AddressedTuple tuple, JCQueue queue) { |
| ++dropCount; |
| queue.recordMsgDrop(); |
| LOG.warn( |
| "Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", |
| queue.getName(), queue.getOverflowCount(), dropCount, tuple); |
| } |
| |
| public void checkSerialize(KryoTupleSerializer serializer, AddressedTuple tuple) { |
| if (trySerializeLocal) { |
| serializer.serialize(tuple.getTuple()); |
| } |
| } |
| |
| public final WorkerTopologyContext getWorkerTopologyContext() { |
| try { |
| String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, topologyId)); |
| String pidDir = ConfigUtils.workerPidsRoot(conf, topologyId); |
| return new WorkerTopologyContext(systemTopology, topologyConf, taskToComponent, componentToSortedTasks, |
| componentToStreamToFields, topologyId, codeDir, pidDir, port, localTaskIds, |
| defaultSharedResources, |
| userSharedResources, cachedTaskToNodePort, assignmentId); |
| } catch (IOException e) { |
| throw Utils.wrapInRuntime(e); |
| } |
| } |
| |
| private List<IWorkerHook> deserializeWorkerHooks() { |
| List<IWorkerHook> myHookList = new ArrayList<>(); |
| if (topology.is_set_worker_hooks()) { |
| for (ByteBuffer hook : topology.get_worker_hooks()) { |
| byte[] hookBytes = Utils.toByteArray(hook); |
| IWorkerHook hookObject = Utils.javaDeserialize(hookBytes, IWorkerHook.class); |
| myHookList.add(hookObject); |
| } |
| } |
| return myHookList; |
| } |
| |
| public void runWorkerStartHooks() { |
| WorkerTopologyContext workerContext = getWorkerTopologyContext(); |
| for (IWorkerHook hook : getDeserializedWorkerHooks()) { |
| hook.start(topologyConf, workerContext); |
| } |
| } |
| |
| public void runWorkerShutdownHooks() { |
| for (IWorkerHook hook : getDeserializedWorkerHooks()) { |
| hook.shutdown(); |
| } |
| } |
| |
| public void closeResources() { |
| LOG.info("Shutting down default resources"); |
| ((ExecutorService) defaultSharedResources.get(WorkerTopologyContext.SHARED_EXECUTOR)).shutdownNow(); |
| LOG.info("Shut down default resources"); |
| } |
| |
| public boolean areAllConnectionsReady() { |
| return cachedNodeToPortSocket.get().values() |
| .stream() |
| .map(WorkerState::isConnectionReady) |
| .reduce((left, right) -> left && right) |
| .orElse(true); |
| } |
| |
| public Collection<IAutoCredentials> getAutoCredentials() { |
| return this.autoCredentials; |
| } |
| |
| private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId, |
| int port) { |
| LOG.info("Reading assignments"); |
| List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>(); |
| executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID); |
| Map<List<Long>, NodeInfo> executorToNodePort = |
| getLocalAssignment(stormClusterState, topologyId).get_executor_node_port(); |
| for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) { |
| NodeInfo nodeInfo = entry.getValue(); |
| if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) { |
| executorsAssignedToThisWorker.add(entry.getKey()); |
| } |
| } |
| return executorsAssignedToThisWorker; |
| } |
| |
| private Assignment getLocalAssignment(IStormClusterState stormClusterState, String topologyId) { |
| try (SupervisorIfaceFactory fac = supervisorIfaceSupplier.get()) { |
| return fac.getIface().getLocalAssignmentForStorm(topologyId); |
| } catch (Throwable e) { |
| //if any error/exception thrown, fetch it from zookeeper |
| Assignment assignment = stormClusterState.remoteAssignmentInfo(topologyId, null); |
| if (assignment == null) { |
| throw new RuntimeException("Failed to read worker assignment." |
| + " Supervisor client threw exception, and assignment in Zookeeper was null", e); |
| } |
| return assignment; |
| } |
| } |
| |
| private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyConf, Set<List<Long>> executors) { |
| Integer recvQueueSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE)); |
| Integer recvBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE)); |
| Integer overflowLimit = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT)); |
| |
| if (recvBatchSize > recvQueueSize / 2) { |
| throw new IllegalArgumentException(Config.TOPOLOGY_PRODUCER_BATCH_SIZE + ":" + recvBatchSize |
| + " is greater than half of " + Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE + ":" |
| + recvQueueSize); |
| } |
| |
| IWaitStrategy backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf); |
| Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>(); |
| for (List<Long> executor : executors) { |
| int port = this.getPort(); |
| receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), |
| recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy, |
| this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, this.getPort(), metricRegistry)); |
| |
| } |
| return receiveQueueMap; |
| } |
| |
| private Map<String, Object> makeDefaultResources() { |
| int threadPoolSize = ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE)); |
| return ImmutableMap.of(WorkerTopologyContext.SHARED_EXECUTOR, Executors.newFixedThreadPool(threadPoolSize)); |
| } |
| |
| private Map<String, Object> makeUserResources() { |
| /* TODO: need to invoke a hook provided by the topology, giving it a chance to create user resources. |
| * this would be part of the initialization hook |
| * need to separate workertopologycontext into WorkerContext and WorkerUserContext. |
| * actually just do it via interfaces. just need to make sure to hide setResource from tasks |
| */ |
| return new HashMap<>(); |
| } |
| |
| private StormTimer mkHaltingTimer(String name) { |
| return new StormTimer(name, (thread, exception) -> { |
| LOG.error("Error when processing event", exception); |
| Utils.exitProcess(20, "Error when processing an event"); |
| }); |
| } |
| |
| /** |
| * Get worker outbound tasks. |
| * @return seq of task ids that receive messages from this worker |
| */ |
| private Set<Integer> workerOutboundTasks() { |
| WorkerTopologyContext context = getWorkerTopologyContext(); |
| Set<String> components = new HashSet<>(); |
| for (Integer taskId : localTaskIds) { |
| for (Map<String, Grouping> value : context.getTargets(context.getComponentId(taskId)).values()) { |
| components.addAll(value.keySet()); |
| } |
| } |
| |
| Set<Integer> outboundTasks = new HashSet<>(); |
| |
| for (Map.Entry<String, List<Integer>> entry : Utils.reverseMap(taskToComponent).entrySet()) { |
| if (components.contains(entry.getKey())) { |
| outboundTasks.addAll(entry.getValue()); |
| } |
| } |
| return outboundTasks; |
| } |
| |
| public Set<Integer> getOutboundTasks() { |
| return this.outboundTasks; |
| } |
| |
| /** |
| * Check if this worker has remote outbound tasks. |
| * @return true if this worker has remote outbound tasks; false otherwise. |
| */ |
| public boolean hasRemoteOutboundTasks() { |
| Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(localTaskIds)); |
| return !remoteTasks.isEmpty(); |
| } |
| |
| /** |
| * If all the tasks are local tasks, the topology has only one worker. |
| * @return true if this worker is the single worker; false otherwise. |
| */ |
| public boolean isSingleWorker() { |
| Set<Integer> nonLocalTasks = Sets.difference(getTaskToComponent().keySet(), |
| new HashSet<>(localTaskIds)); |
| return nonLocalTasks.isEmpty(); |
| } |
| |
| public void haltWorkerTransfer() { |
| workerTransfer.haltTransferThd(); |
| } |
| |
| public JCQueue getTransferQueue() { |
| return workerTransfer.getTransferQueue(); |
| } |
| |
| public StormMetricRegistry getMetricRegistry() { |
| return metricRegistry; |
| } |
| |
| public interface ILocalTransferCallback { |
| void transfer(ArrayList<AddressedTuple> tupleBatch); |
| } |
| } |