| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| package com.cloud.agent.manager; |
| |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.UnknownHostException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SocketChannel; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.inject.Inject; |
| import javax.naming.ConfigurationException; |
| import javax.net.ssl.SSLContext; |
| import javax.net.ssl.SSLEngine; |
| |
| import org.apache.cloudstack.ca.CAManager; |
| import org.apache.cloudstack.framework.config.ConfigDepot; |
| import org.apache.cloudstack.framework.config.ConfigKey; |
| import org.apache.cloudstack.framework.config.dao.ConfigurationDao; |
| import org.apache.cloudstack.ha.dao.HAConfigDao; |
| import org.apache.cloudstack.managed.context.ManagedContextRunnable; |
| import org.apache.cloudstack.managed.context.ManagedContextTimerTask; |
| import org.apache.cloudstack.outofbandmanagement.dao.OutOfBandManagementDao; |
| import org.apache.cloudstack.shutdown.ShutdownManager; |
| import org.apache.cloudstack.shutdown.command.CancelShutdownManagementServerHostCommand; |
| import org.apache.cloudstack.shutdown.command.PrepareForShutdownManagementServerHostCommand; |
| import org.apache.cloudstack.shutdown.command.BaseShutdownManagementServerHostCommand; |
| import org.apache.cloudstack.shutdown.command.TriggerShutdownManagementServerHostCommand; |
| import org.apache.cloudstack.utils.identity.ManagementServerNode; |
| import org.apache.cloudstack.utils.security.SSLUtils; |
| |
| import com.cloud.agent.api.Answer; |
| import com.cloud.agent.api.CancelCommand; |
| import com.cloud.agent.api.ChangeAgentAnswer; |
| import com.cloud.agent.api.ChangeAgentCommand; |
| import com.cloud.agent.api.Command; |
| import com.cloud.agent.api.PropagateResourceEventCommand; |
| import com.cloud.agent.api.ScheduleHostScanTaskCommand; |
| import com.cloud.agent.api.TransferAgentCommand; |
| import com.cloud.agent.transport.Request; |
| import com.cloud.agent.transport.Request.Version; |
| import com.cloud.agent.transport.Response; |
| import com.cloud.cluster.ClusterManager; |
| import com.cloud.cluster.ClusterManagerListener; |
| import com.cloud.cluster.ClusterServicePdu; |
| import com.cloud.cluster.ClusteredAgentRebalanceService; |
| import org.apache.cloudstack.management.ManagementServerHost; |
| import com.cloud.cluster.ManagementServerHostVO; |
| import com.cloud.cluster.agentlb.AgentLoadBalancerPlanner; |
| import com.cloud.cluster.agentlb.HostTransferMapVO; |
| import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState; |
| import com.cloud.cluster.agentlb.dao.HostTransferMapDao; |
| import com.cloud.cluster.dao.ManagementServerHostDao; |
| import com.cloud.exception.AgentUnavailableException; |
| import com.cloud.exception.OperationTimedoutException; |
| import com.cloud.exception.UnsupportedVersionException; |
| import com.cloud.host.Host; |
| import com.cloud.host.HostVO; |
| import com.cloud.host.Status; |
| import com.cloud.host.Status.Event; |
| import com.cloud.resource.ServerResource; |
| import com.cloud.serializer.GsonHelper; |
| import com.cloud.utils.DateUtil; |
| import com.cloud.utils.concurrency.NamedThreadFactory; |
| import com.cloud.utils.db.QueryBuilder; |
| import com.cloud.utils.db.SearchCriteria.Op; |
| import com.cloud.utils.db.TransactionLegacy; |
| import com.cloud.utils.exception.CloudRuntimeException; |
| import com.cloud.utils.exception.TaskExecutionException; |
| import com.cloud.utils.nio.Link; |
| import com.cloud.utils.nio.Task; |
| import com.google.gson.Gson; |
| |
| public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService { |
| private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Cluster-AgentRebalancingExecutor")); |
| private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list |
| |
| public final static long STARTUP_DELAY = 5000; |
| public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login |
| public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds |
| protected Set<Long> _agentToTransferIds = new HashSet<Long>(); |
| Gson _gson; |
| protected HashMap<String, SocketChannel> _peers; |
| protected HashMap<String, SSLEngine> _sslEngines; |
| private final Timer _timer = new Timer("ClusteredAgentManager Timer"); |
| boolean _agentLbHappened = false; |
| |
| @Inject |
| protected ClusterManager _clusterMgr = null; |
| @Inject |
| protected ManagementServerHostDao _mshostDao; |
| @Inject |
| protected HostTransferMapDao _hostTransferDao; |
| @Inject |
| protected List<AgentLoadBalancerPlanner> _lbPlanners; |
| @Inject |
| ConfigurationDao _configDao; |
| @Inject |
| ConfigDepot _configDepot; |
| @Inject |
| private OutOfBandManagementDao outOfBandManagementDao; |
| @Inject |
| private HAConfigDao haConfigDao; |
| @Inject |
| private CAManager caService; |
| @Inject |
| private ShutdownManager shutdownManager; |
| |
| protected ClusteredAgentManagerImpl() { |
| super(); |
| } |
| |
| protected final ConfigKey<Boolean> EnableLB = new ConfigKey<Boolean>(Boolean.class, "agent.lb.enabled", "Advanced", "false", "Enable agent load balancing between management server nodes", true); |
| protected final ConfigKey<Double> ConnectedAgentThreshold = new ConfigKey<Double>(Double.class, "agent.load.threshold", "Advanced", "0.7", |
| "What percentage of the agents can be held by one management server before load balancing happens", true, EnableLB.key()); |
| protected final ConfigKey<Integer> LoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16", "How many agents to connect to in each round", true); |
| protected final ConfigKey<Integer> ScanInterval = new ConfigKey<Integer>(Integer.class, "direct.agent.scan.interval", "Advanced", "90", "Interval between scans to load agents", false, |
| ConfigKey.Scope.Global, 1000); |
| |
| @Override |
| public boolean configure(final String name, final Map<String, Object> xmlParams) throws ConfigurationException { |
| _peers = new HashMap<String, SocketChannel>(7); |
| _sslEngines = new HashMap<String, SSLEngine>(7); |
| _nodeId = ManagementServerNode.getManagementServerId(); |
| |
| logger.info("Configuring ClusterAgentManagerImpl. management server node id(msid): {}", _nodeId); |
| |
| ClusteredAgentAttache.initialize(this); |
| |
| _clusterMgr.registerListener(this); |
| _clusterMgr.registerDispatcher(new ClusterDispatcher()); |
| |
| _gson = GsonHelper.getGson(); |
| |
| return super.configure(name, xmlParams); |
| } |
| |
| @Override |
| public boolean start() { |
| if (!super.start()) { |
| return false; |
| } |
| _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, ScanInterval.value()); |
| logger.debug("Scheduled direct agent scan task to run at an interval of {} seconds", ScanInterval.value()); |
| |
| // Schedule tasks for agent rebalancing |
| if (isAgentRebalanceEnabled()) { |
| cleanupTransferMap(_nodeId); |
| s_transferExecutor.scheduleAtFixedRate(getAgentRebalanceScanTask(), 60000, 60000, TimeUnit.MILLISECONDS); |
| s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, TimeUnit.MILLISECONDS); |
| } |
| |
| return true; |
| } |
| |
| public void scheduleHostScanTask() { |
| _timer.schedule(new DirectAgentScanTimerTask(), 0); |
| logger.debug("Scheduled a direct agent scan task"); |
| } |
| |
| private void runDirectAgentScanTimerTask() { |
| scanDirectAgentToLoad(); |
| } |
| |
| private void scanDirectAgentToLoad() { |
| logger.trace("Begin scanning directly connected hosts"); |
| |
| // for agents that are self-managed, threshold to be considered as disconnected after pingtimeout |
| final long cutSeconds = (System.currentTimeMillis() >> 10) - mgmtServiceConf.getTimeout(); |
| final List<HostVO> hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, LoadSize.value().longValue(), _nodeId); |
| final List<HostVO> appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId); |
| |
| if (hosts != null) { |
| hosts.addAll(appliances); |
| if (hosts.size() > 0) { |
| logger.debug("Found {} unmanaged direct hosts, processing connect for them...", hosts.size()); |
| for (final HostVO host : hosts) { |
| try { |
| final AgentAttache agentattache = findAttache(host.getId()); |
| if (agentattache != null) { |
| // already loaded, skip |
| if (agentattache.forForward()) { |
| logger.info("{} is detected down, but we have a forward attache running, disconnect this one before launching the host", host); |
| removeAgent(agentattache, Status.Disconnected); |
| } else { |
| continue; |
| } |
| } |
| |
| logger.debug("Loading directly connected host {}({})", host.getId(), host.getName()); |
| loadDirectlyConnectedHost(host, false); |
| } catch (final Throwable e) { |
| logger.warn(" can not load directly connected host {}({}) due to ", host.getId(), host.getName(), e); |
| } |
| } |
| } |
| } |
| logger.trace("End scanning directly connected hosts"); |
| } |
| |
| private class DirectAgentScanTimerTask extends ManagedContextTimerTask { |
| @Override |
| protected void runInContext() { |
| try { |
| runDirectAgentScanTimerTask(); |
| } catch (final Throwable e) { |
| logger.error("Unexpected exception {}", e.getMessage(), e); |
| } |
| } |
| } |
| |
| @Override |
| public Task create(final Task.Type type, final Link link, final byte[] data) { |
| return new ClusteredAgentHandler(type, link, data); |
| } |
| |
| protected AgentAttache createAttache(final long id) { |
| logger.debug("create forwarding ClusteredAgentAttache for {}", id); |
| final HostVO host = _hostDao.findById(id); |
| final AgentAttache attache = new ClusteredAgentAttache(this, id, host.getName()); |
| AgentAttache old = null; |
| synchronized (_agents) { |
| old = _agents.get(id); |
| _agents.put(id, attache); |
| } |
| if (old != null) { |
| logger.debug("Remove stale agent attache from current management server"); |
| removeAgent(old, Status.Removed); |
| } |
| return attache; |
| } |
| |
| @Override |
| protected AgentAttache createAttacheForConnect(final HostVO host, final Link link) { |
| logger.debug("create ClusteredAgentAttache for {}", host.getId()); |
| final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), host.getName(), link, host.isInMaintenanceStates()); |
| link.attach(attache); |
| AgentAttache old = null; |
| synchronized (_agents) { |
| old = _agents.get(host.getId()); |
| _agents.put(host.getId(), attache); |
| } |
| if (old != null) { |
| old.disconnect(Status.Removed); |
| } |
| return attache; |
| } |
| |
| @Override |
| protected AgentAttache createAttacheForDirectConnect(final Host host, final ServerResource resource) { |
| logger.debug("Create ClusteredDirectAgentAttache for {}.", host); |
| final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getName(), _nodeId, resource, host.isInMaintenanceStates()); |
| AgentAttache old = null; |
| synchronized (_agents) { |
| old = _agents.get(host.getId()); |
| _agents.put(host.getId(), attache); |
| } |
| if (old != null) { |
| old.disconnect(Status.Removed); |
| } |
| return attache; |
| } |
| |
| @Override |
| protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) { |
| return handleDisconnect(attache, event, false, true, removeAgent); |
| } |
| |
| @Override |
| protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, final Status.Event event) { |
| return handleDisconnect(attache, event, true, true, true); |
| } |
| |
| protected boolean handleDisconnect(final AgentAttache agent, final Status.Event event, final boolean investigate, final boolean broadcast, final boolean removeAgent) { |
| boolean res; |
| if (!investigate) { |
| res = super.handleDisconnectWithoutInvestigation(agent, event, true, removeAgent); |
| } else { |
| res = super.handleDisconnectWithInvestigation(agent, event); |
| } |
| |
| if (res) { |
| if (broadcast) { |
| notifyNodesInCluster(agent); |
| } |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| @Override |
| public boolean executeUserRequest(final long hostId, final Event event) throws AgentUnavailableException { |
| if (event == Event.AgentDisconnected) { |
| logger.debug("Received agent disconnect event for host {}", hostId); |
| final AgentAttache attache = findAttache(hostId); |
| if (attache != null) { |
| // don't process disconnect if the host is being rebalanced |
| if (isAgentRebalanceEnabled()) { |
| final HostTransferMapVO transferVO = _hostTransferDao.findById(hostId); |
| if (transferVO != null) { |
| if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) { |
| logger.debug("Not processing {} event for the host id={} as the host is being connected to {}",Event.AgentDisconnected, hostId, _nodeId); |
| return true; |
| } |
| } |
| } |
| |
| // don't process disconnect if the disconnect came for the host via delayed cluster notification, |
| // but the host has already reconnected to the current management server |
| if (!attache.forForward()) { |
| logger.debug("Not processing {} event for the host id={} as the host is directly connected to the current management server {}", Event.AgentDisconnected, hostId, _nodeId); |
| return true; |
| } |
| |
| return super.handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, false, true); |
| } |
| |
| return true; |
| } else { |
| return super.executeUserRequest(hostId, event); |
| } |
| } |
| |
| @Override |
| public void reconnect(final long hostId) throws CloudRuntimeException, AgentUnavailableException { |
| Boolean result = propagateAgentEvent(hostId, Event.ShutdownRequested); |
| if (result == null) { |
| super.reconnect(hostId); |
| return; |
| } |
| if (!result) { |
| throw new CloudRuntimeException("Failed to propagate agent change request event:" + Event.ShutdownRequested + " to host:" + hostId); |
| } |
| } |
| |
| public void notifyNodesInCluster(final AgentAttache attache) { |
| logger.debug("Notifying other nodes of to disconnect"); |
| final Command[] cmds = new Command[] {new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)}; |
| _clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds)); |
| } |
| |
| // notifies MS peers to schedule a host scan task immediately, triggered during addHost operation |
| public void notifyNodesInClusterToScheduleHostScanTask() { |
| logger.debug("Notifying other MS nodes to run host scan task"); |
| final Command[] cmds = new Command[] {new ScheduleHostScanTaskCommand()}; |
| _clusterMgr.broadcast(0, _gson.toJson(cmds)); |
| } |
| |
| protected void logT(final byte[] bytes, final String msg) { |
| logger.trace("Seq {}-{}: MgmtId {} : {}", Request.getAgentId(bytes), Request.getSequence(bytes), |
| Request.getManagementServerId(bytes), (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); |
| } |
| |
| protected void logD(final byte[] bytes, final String msg) { |
| logger.debug("Seq {}-{}: MgmtId {} : {}", Request.getAgentId(bytes), Request.getSequence(bytes), |
| Request.getManagementServerId(bytes), (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); |
| } |
| |
| protected void logI(final byte[] bytes, final String msg) { |
| logger.info("Seq {}-{}: MgmtId {} : {}", Request.getAgentId(bytes), Request.getSequence(bytes), |
| Request.getManagementServerId(bytes), (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg); |
| } |
| |
| public boolean routeToPeer(final String peer, final byte[] bytes) { |
| int i = 0; |
| SocketChannel ch = null; |
| SSLEngine sslEngine = null; |
| while (i++ < 5) { |
| ch = connectToPeer(peer, ch); |
| if (ch == null) { |
| try { |
| logD(bytes, "Unable to route to peer: " + Request.parse(bytes).toString()); |
| } catch (ClassNotFoundException | UnsupportedVersionException e) { |
| // Request.parse thrown exception when we try to log it, log as much as we can |
| logD(bytes, "Unable to route to peer, and Request.parse further caught exception" + e.getMessage()); |
| } |
| return false; |
| } |
| sslEngine = getSSLEngine(peer); |
| if (sslEngine == null) { |
| logD(bytes, "Unable to get SSLEngine of peer: " + peer); |
| return false; |
| } |
| try { |
| logD(bytes, "Routing to peer"); |
| Link.write(ch, new ByteBuffer[] {ByteBuffer.wrap(bytes)}, sslEngine); |
| return true; |
| } catch (final IOException e) { |
| try { |
| logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage()); |
| } catch (ClassNotFoundException | UnsupportedVersionException ex) { |
| // Request.parse thrown exception when we try to log it, log as much as we can |
| logI(bytes, "Unable to route to peer due to" + e.getMessage() + ". Also caught exception when parsing request: " + ex.getMessage()); |
| } |
| } |
| } |
| return false; |
| } |
| |
| public String findPeer(final long hostId) { |
| return getPeerName(hostId); |
| } |
| |
| public SSLEngine getSSLEngine(final String peerName) { |
| return _sslEngines.get(peerName); |
| } |
| |
| public void cancel(final String peerName, final long hostId, final long sequence, final String reason) { |
| final CancelCommand cancel = new CancelCommand(sequence, reason); |
| final Request req = new Request(hostId, _nodeId, cancel, true); |
| req.setControl(true); |
| routeToPeer(peerName, req.getBytes()); |
| } |
| |
| public void closePeer(final String peerName) { |
| synchronized (_peers) { |
| final SocketChannel ch = _peers.get(peerName); |
| if (ch != null) { |
| try { |
| ch.close(); |
| } catch (final IOException e) { |
| logger.warn("Unable to close peer socket connection to {}", peerName); |
| } |
| } |
| _peers.remove(peerName); |
| _sslEngines.remove(peerName); |
| } |
| } |
| |
| public SocketChannel connectToPeer(final String peerName, final SocketChannel prevCh) { |
| synchronized (_peers) { |
| final SocketChannel ch = _peers.get(peerName); |
| SSLEngine sslEngine = null; |
| if (prevCh != null) { |
| try { |
| prevCh.close(); |
| } catch (final Exception e) { |
| logger.info("[ignored] failed to get close resource for previous channel Socket: {}", e.getLocalizedMessage()); |
| } |
| } |
| if (ch == null || ch == prevCh) { |
| final ManagementServerHost ms = _clusterMgr.getPeer(peerName); |
| if (ms == null) { |
| logger.info("Unable to find peer: {}", peerName); |
| return null; |
| } |
| final String ip = ms.getServiceIP(); |
| InetAddress addr; |
| int port = Port.value(); |
| try { |
| addr = InetAddress.getByName(ip); |
| } catch (final UnknownHostException e) { |
| throw new CloudRuntimeException("Unable to resolve " + ip); |
| } |
| SocketChannel ch1 = null; |
| try { |
| ch1 = SocketChannel.open(new InetSocketAddress(addr, port)); |
| ch1.configureBlocking(false); |
| ch1.socket().setKeepAlive(true); |
| ch1.socket().setSoTimeout(60 * 1000); |
| try { |
| SSLContext sslContext = Link.initManagementSSLContext(caService); |
| sslEngine = sslContext.createSSLEngine(ip, port); |
| sslEngine.setUseClientMode(true); |
| sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols())); |
| sslEngine.beginHandshake(); |
| if (!Link.doHandshake(ch1, sslEngine)) { |
| ch1.close(); |
| throw new IOException(String.format("SSL: Handshake failed with peer management server '%s' on %s:%d ", peerName, ip, port)); |
| } |
| logger.info("SSL: Handshake done with peer management server '{}' on {}:{} ", peerName, ip, port); |
| } catch (final Exception e) { |
| ch1.close(); |
| throw new IOException("SSL: Fail to init SSL! " + e); |
| } |
| logger.debug("Connection to peer opened: {}, ip: {}", peerName, ip); |
| _peers.put(peerName, ch1); |
| _sslEngines.put(peerName, sslEngine); |
| return ch1; |
| } catch (final IOException e) { |
| if (ch1 != null) { |
| try { |
| ch1.close(); |
| } catch (final IOException ex) { |
| logger.error("failed to close failed peer socket: {}", ex); |
| } |
| } |
| logger.warn("Unable to connect to peer management server: {}, ip {} due to {}", peerName, ip, e.getMessage(), e); |
| return null; |
| } |
| } |
| |
| logger.trace("Found open channel for peer: {}", peerName); |
| return ch; |
| } |
| } |
| |
| public SocketChannel connectToPeer(final long hostId, final SocketChannel prevCh) { |
| final String peerName = getPeerName(hostId); |
| if (peerName == null) { |
| return null; |
| } |
| |
| return connectToPeer(peerName, prevCh); |
| } |
| |
| @Override |
| protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException { |
| assert hostId != null : "Who didn't check their id value?"; |
| final HostVO host = _hostDao.findById(hostId); |
| if (host == null) { |
| throw new AgentUnavailableException("Can't find the host ", hostId); |
| } |
| |
| AgentAttache agent = findAttache(hostId); |
| if (agent == null || !agent.forForward()) { |
| if (isHostOwnerSwitched(host)) { |
| logger.debug("Host {} has switched to another management server, need to update agent map with a forwarding agent attache", hostId); |
| agent = createAttache(hostId); |
| } |
| } |
| if (agent == null) { |
| final AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId); |
| ex.addProxyObject(_entityMgr.findById(Host.class, hostId).getUuid()); |
| throw ex; |
| } |
| |
| return agent; |
| } |
| |
| @Override |
| public boolean stop() { |
| if (_peers != null) { |
| for (final SocketChannel ch : _peers.values()) { |
| try { |
| logger.info("Closing: {}", ch.toString()); |
| ch.close(); |
| } catch (final IOException e) { |
| logger.info("[ignored] error on closing channel: {}", ch.toString(), e); |
| } |
| } |
| } |
| _timer.cancel(); |
| |
| // cancel all transfer tasks |
| s_transferExecutor.shutdownNow(); |
| cleanupTransferMap(_nodeId); |
| |
| return super.stop(); |
| } |
| |
| @Override |
| public void startDirectlyConnectedHosts() { |
| // override and let it be dummy for purpose, we will scan and load direct agents periodically. |
| // We may also pickup agents that have been left over from other crashed management server |
| } |
| |
| public class ClusteredAgentHandler extends AgentHandler { |
| |
| public ClusteredAgentHandler(final Task.Type type, final Link link, final byte[] data) { |
| super(type, link, data); |
| } |
| |
| @Override |
| protected void doTask(final Task task) throws TaskExecutionException { |
| final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); |
| try { |
| if (task.getType() != Task.Type.DATA) { |
| super.doTask(task); |
| return; |
| } |
| |
| final byte[] data = task.getData(); |
| final Version ver = Request.getVersion(data); |
| if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) { |
| logger.warn("Wrong version for clustered agent request"); |
| super.doTask(task); |
| return; |
| } |
| |
| final long hostId = Request.getAgentId(data); |
| final Link link = task.getLink(); |
| |
| if (Request.fromServer(data)) { |
| |
| final AgentAttache agent = findAttache(hostId); |
| |
| if (Request.isControl(data)) { |
| if (agent == null) { |
| logD(data, "No attache to process cancellation"); |
| return; |
| } |
| final Request req = Request.parse(data); |
| final Command[] cmds = req.getCommands(); |
| final CancelCommand cancel = (CancelCommand)cmds[0]; |
| logD(data, "Cancel request received"); |
| agent.cancel(cancel.getSequence()); |
| final Long current = agent._currentSequence; |
| // if the request is the current request, always have to trigger sending next request in |
| // sequence, |
| // otherwise the agent queue will be blocked |
| if (req.executeInSequence() && current != null && current == Request.getSequence(data)) { |
| agent.sendNext(Request.getSequence(data)); |
| } |
| return; |
| } |
| |
| try { |
| if (agent == null || agent.isClosed()) { |
| throw new AgentUnavailableException("Unable to route to agent ", hostId); |
| } |
| |
| if (Request.isRequest(data) && Request.requiresSequentialExecution(data)) { |
| // route it to the agent. |
| // But we have the serialize the control commands here so we have |
| // to deserialize this and send it through the agent attache. |
| final Request req = Request.parse(data); |
| agent.send(req, null); |
| return; |
| } else { |
| if (agent instanceof Routable) { |
| final Routable cluster = (Routable)agent; |
| cluster.routeToAgent(data); |
| } else { |
| agent.send(Request.parse(data)); |
| } |
| return; |
| } |
| } catch (final AgentUnavailableException e) { |
| logD(data, e.getMessage()); |
| cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage()); |
| } |
| } else { |
| |
| final long mgmtId = Request.getManagementServerId(data); |
| if (mgmtId != -1 && mgmtId != _nodeId) { |
| routeToPeer(Long.toString(mgmtId), data); |
| if (Request.requiresSequentialExecution(data)) { |
| final AgentAttache attache = (AgentAttache)link.attachment(); |
| if (attache != null) { |
| attache.sendNext(Request.getSequence(data)); |
| } |
| logD(data, "No attache to process " + Request.parse(data).toString()); |
| } |
| return; |
| } else { |
| if (Request.isRequest(data)) { |
| super.doTask(task); |
| } else { |
| // received an answer. |
| final Response response = Response.parse(data); |
| final AgentAttache attache = findAttache(response.getAgentId()); |
| if (attache == null) { |
| logger.info("SeqA {}-{} Unable to find attache to forward {}", response.getAgentId(), response.getSequence(), response.toString()); |
| return; |
| } |
| if (!attache.processAnswers(response.getSequence(), response)) { |
| logger.info("SeqA {}-{}: Response is not processed: {}", attache.getId(), response.getSequence(), response.toString()); |
| } |
| } |
| return; |
| } |
| } |
| } catch (final ClassNotFoundException e) { |
| final String message = String.format("ClassNotFoundException occurred when executing tasks! Error '%s'", e.getMessage()); |
| logger.error(message); |
| throw new TaskExecutionException(message, e); |
| } catch (final UnsupportedVersionException e) { |
| final String message = String.format("UnsupportedVersionException occurred when executing tasks! Error '%s'", e.getMessage()); |
| logger.error(message); |
| throw new TaskExecutionException(message, e); |
| } finally { |
| txn.close(); |
| } |
| } |
| } |
| |
| @Override |
| public void onManagementNodeJoined(final List<? extends ManagementServerHost> nodeList, final long selfNodeId) { |
| } |
| |
| @Override |
| public void onManagementNodeLeft(final List<? extends ManagementServerHost> nodeList, final long selfNodeId) { |
| for (final ManagementServerHost vo : nodeList) { |
| logger.info("Marking hosts as disconnected on Management server {}", vo.getMsid()); |
| final long lastPing = (System.currentTimeMillis() >> 10) - mgmtServiceConf.getTimeout(); |
| _hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing); |
| outOfBandManagementDao.expireServerOwnership(vo.getMsid()); |
| haConfigDao.expireServerOwnership(vo.getMsid()); |
| logger.info("Deleting entries from op_host_transfer table for Management server {}", vo.getMsid()); |
| cleanupTransferMap(vo.getMsid()); |
| } |
| } |
| |
| @Override |
| public void onManagementNodeIsolated() { |
| } |
| |
| @Override |
| public void removeAgent(final AgentAttache attache, final Status nextState) { |
| if (attache == null) { |
| return; |
| } |
| |
| super.removeAgent(attache, nextState); |
| } |
| |
| @Override |
| public boolean executeRebalanceRequest(final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event) throws AgentUnavailableException, OperationTimedoutException { |
| boolean result = false; |
| if (event == Event.RequestAgentRebalance) { |
| return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId); |
| } else if (event == Event.StartAgentRebalance) { |
| try { |
| result = rebalanceHost(agentId, currentOwnerId, futureOwnerId); |
| } catch (final Exception e) { |
| logger.warn("Unable to rebalance host id={}", agentId, e); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| public void scheduleRebalanceAgents() { |
| _timer.schedule(new AgentLoadBalancerTask(), 30000); |
| } |
| |
| public class AgentLoadBalancerTask extends ManagedContextTimerTask { |
| protected volatile boolean cancelled = false; |
| |
| public AgentLoadBalancerTask() { |
| logger.debug("Agent load balancer task created"); |
| } |
| |
| @Override |
| public synchronized boolean cancel() { |
| if (!cancelled) { |
| cancelled = true; |
| logger.debug("Agent load balancer task cancelled"); |
| return super.cancel(); |
| } |
| return true; |
| } |
| |
| @Override |
| protected synchronized void runInContext() { |
| try { |
| if (!cancelled) { |
| startRebalanceAgents(); |
| logger.info("The agent load balancer task is now being cancelled"); |
| cancelled = true; |
| } |
| } catch (final Throwable e) { |
| logger.error("Unexpected exception {}", e.toString(), e); |
| } |
| } |
| } |
| |
| public void startRebalanceAgents() { |
| logger.debug("Management server {} is asking other peers to rebalance their agents", _nodeId); |
| final List<ManagementServerHostVO> allMS = _mshostDao.listBy(ManagementServerHost.State.Up); |
| final QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class); |
| sc.and(sc.entity().getManagementServerId(), Op.NNULL); |
| sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); |
| final List<HostVO> allManagedAgents = sc.list(); |
| |
| int avLoad = 0; |
| |
| if (!allManagedAgents.isEmpty() && !allMS.isEmpty()) { |
| avLoad = allManagedAgents.size() / allMS.size(); |
| } else { |
| logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is {};" + |
| "number of managed agents is {}", allMS.size(), allManagedAgents.size()); |
| return; |
| } |
| |
| if (avLoad == 0L) { |
| logger.debug("As calculated average load is less than 1, rounding it to 1"); |
| avLoad = 1; |
| } |
| |
| for (final ManagementServerHostVO node : allMS) { |
| if (node.getMsid() != _nodeId) { |
| |
| List<HostVO> hostsToRebalance = new ArrayList<HostVO>(); |
| for (final AgentLoadBalancerPlanner lbPlanner : _lbPlanners) { |
| hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad); |
| if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { |
| break; |
| } |
| logger.debug("Agent load balancer planner " + lbPlanner.getName() + " found no hosts to be rebalanced from management server " + node.getMsid()); |
| } |
| |
| if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) { |
| logger.debug("Found {} hosts to rebalance from management server {}", hostsToRebalance.size(), node.getMsid()); |
| for (final HostVO host : hostsToRebalance) { |
| final long hostId = host.getId(); |
| logger.debug("Asking management server {} to give away host id={}", node.getMsid(), hostId); |
| boolean result = true; |
| |
| if (_hostTransferDao.findById(hostId) != null) { |
| logger.warn("Somebody else is already rebalancing host id: {}", hostId); |
| continue; |
| } |
| |
| HostTransferMapVO transfer = null; |
| try { |
| transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId); |
| final Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance); |
| if (answer == null) { |
| logger.warn("Failed to get host id={} from management server {}", hostId, node.getMsid()); |
| result = false; |
| } |
| } catch (final Exception ex) { |
| logger.warn("Failed to get host id={} from management server {}", hostId, node.getMsid(), ex); |
| result = false; |
| } finally { |
| if (transfer != null) { |
| final HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId); |
| if (!result && transferState != null && transferState.getState() == HostTransferState.TransferRequested) { |
| logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode"); |
| // just remove the mapping (if exists) as nothing was done on the peer management |
| // server yet |
| _hostTransferDao.remove(transfer.getId()); |
| } |
| } |
| } |
| } |
| } else { |
| logger.debug("Found no hosts to rebalance from the management server {}", node.getMsid()); |
| } |
| } |
| } |
| } |
| |
| private Answer[] sendRebalanceCommand(final long peer, final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event) { |
| final TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event); |
| final Commands commands = new Commands(Command.OnError.Stop); |
| commands.addCommand(transfer); |
| |
| final Command[] cmds = commands.toCommands(); |
| |
| try { |
| logger.debug("Forwarding {} to {}", cmds[0].toString(), peer); |
| final String peerName = Long.toString(peer); |
| final String cmdStr = _gson.toJson(cmds); |
| final String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true); |
| final Answer[] answers = _gson.fromJson(ansStr, Answer[].class); |
| return answers; |
| } catch (final Exception e) { |
| logger.warn("Caught exception while talking to {}", currentOwnerId, e); |
| return null; |
| } |
| } |
| |
| public String getPeerName(final long agentHostId) { |
| |
| final HostVO host = _hostDao.findById(agentHostId); |
| if (host != null && host.getManagementServerId() != null) { |
| if (_clusterMgr.getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) { |
| return null; |
| } |
| |
| return Long.toString(host.getManagementServerId()); |
| } |
| return null; |
| } |
| |
| public Boolean propagateAgentEvent(final long agentId, final Event event) throws AgentUnavailableException { |
| final String msPeer = getPeerName(agentId); |
| if (msPeer == null) { |
| return null; |
| } |
| |
| logger.debug("Propagating agent change request event: {} to agent: {}", event.toString(), agentId); |
| final Command[] cmds = new Command[1]; |
| cmds[0] = new ChangeAgentCommand(agentId, event); |
| |
| final String ansStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true); |
| if (ansStr == null) { |
| throw new AgentUnavailableException(agentId); |
| } |
| |
| final Answer[] answers = _gson.fromJson(ansStr, Answer[].class); |
| |
| logger.debug("Result for agent change is {}", answers[0].getResult()); |
| |
| return answers[0].getResult(); |
| } |
| |
| private Runnable getTransferScanTask() { |
| return new ManagedContextRunnable() { |
| @Override |
| protected void runInContext() { |
| try { |
| logger.trace("Clustered agent transfer scan check, management server id: {}", _nodeId); |
| synchronized (_agentToTransferIds) { |
| if (_agentToTransferIds.size() > 0) { |
| logger.debug("Found {} agents to transfer", _agentToTransferIds.size()); |
| // for (Long hostId : _agentToTransferIds) { |
| for (final Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) { |
| final Long hostId = iterator.next(); |
| final AgentAttache attache = findAttache(hostId); |
| |
| // if the thread: |
| // 1) timed out waiting for the host to reconnect |
| // 2) recipient management server is not active any more |
| // 3) if the management server doesn't own the host any more |
| // remove the host from re-balance list and delete from op_host_transfer DB |
| // no need to do anything with the real attache as we haven't modified it yet |
| final Date cutTime = DateUtil.currentGMTTime(); |
| final HostTransferMapVO transferMap = _hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut)); |
| |
| if (transferMap == null) { |
| logger.debug("Timed out waiting for the host id={} to be ready to transfer, skipping rebalance for the host" + hostId); |
| iterator.remove(); |
| _hostTransferDao.completeAgentTransfer(hostId); |
| continue; |
| } |
| |
| if (transferMap.getInitialOwner() != _nodeId || attache == null || attache.forForward()) { |
| logger.debug("Management server {} doesn't own host id={} any more, skipping rebalance for the host", _nodeId, hostId); |
| iterator.remove(); |
| _hostTransferDao.completeAgentTransfer(hostId); |
| continue; |
| } |
| |
| final ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner()); |
| if (ms != null && ms.getState() != ManagementServerHost.State.Up) { |
| logger.debug("Can't transfer host {} as it's future owner is not in UP state: {}, skipping rebalance for the host", hostId, ms); |
| iterator.remove(); |
| _hostTransferDao.completeAgentTransfer(hostId); |
| continue; |
| } |
| |
| if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { |
| iterator.remove(); |
| try { |
| _executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner())); |
| } catch (final RejectedExecutionException ex) { |
| logger.warn("Failed to submit rebalance task for host id={}; postponing the execution", hostId); |
| continue; |
| } |
| |
| } else { |
| logger.debug("Agent {} can't be transferred yet as its request queue size is {} and listener queue size is {}", |
| hostId, attache.getQueueSize(), attache.getNonRecurringListenersSize()); |
| } |
| } |
| } else { |
| logger.trace("Found no agents to be transferred by the management server {}", _nodeId); |
| } |
| } |
| |
| } catch (final Throwable e) { |
| logger.error("Problem with the clustered agent transfer scan check!", e); |
| } |
| } |
| }; |
| } |
| |
| private boolean setToWaitForRebalance(final long hostId, final long currentOwnerId, final long futureOwnerId) { |
| logger.debug("Adding agent {} to the list of agents to transfer", hostId); |
| synchronized (_agentToTransferIds) { |
| return _agentToTransferIds.add(hostId); |
| } |
| } |
| |
| protected boolean rebalanceHost(final long hostId, final long currentOwnerId, final long futureOwnerId) throws AgentUnavailableException { |
| |
| boolean result = true; |
| if (currentOwnerId == _nodeId) { |
| if (!startRebalance(hostId)) { |
| logger.debug("Failed to start agent rebalancing"); |
| finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed); |
| return false; |
| } |
| try { |
| final Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance); |
| if (answer == null || !answer[0].getResult()) { |
| result = false; |
| } |
| |
| } catch (final Exception ex) { |
| logger.warn("Host {} failed to connect to the management server {} as a part of rebalance process", hostId, futureOwnerId, ex); |
| result = false; |
| } |
| |
| if (result) { |
| logger.debug("Successfully transferred host id={} to management server {}", hostId, futureOwnerId); |
| finishRebalance(hostId, futureOwnerId, Event.RebalanceCompleted); |
| } else { |
| logger.warn("Failed to transfer host id={} to management server {}", hostId, futureOwnerId); |
| finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed); |
| } |
| |
| } else if (futureOwnerId == _nodeId) { |
| final HostVO host = _hostDao.findById(hostId); |
| try { |
| logger.debug("Disconnecting host {}({}) as a part of rebalance process without notification", host.getId(), host.getName()); |
| |
| final AgentAttache attache = findAttache(hostId); |
| if (attache != null) { |
| result = handleDisconnect(attache, Event.AgentDisconnected, false, false, true); |
| } |
| |
| if (result) { |
| logger.debug("Loading directly connected host {}({}) to the management server {} as a part of rebalance process", host.getId(), host.getName(), _nodeId); |
| result = loadDirectlyConnectedHost(host, true); |
| } else { |
| logger.warn("Failed to disconnect {}({}) as a part of rebalance process without notification" + host.getId(), host.getName()); |
| } |
| |
| } catch (final Exception ex) { |
| logger.warn("Failed to load directly connected host {}({}) to the management server {} a part of rebalance process without notification", host.getId(), host.getName(), _nodeId, ex); |
| result = false; |
| } |
| |
| if (result) { |
| logger.debug("Successfully loaded directly connected host {}({}) to the management server {} a part of rebalance process without notification", host.getId(), host.getName(), _nodeId); |
| } else { |
| logger.warn("Failed to load directly connected host {}({}) to the management server {} a part of rebalance process without notification", host.getId(), host.getName(), _nodeId); |
| } |
| } |
| |
| return result; |
| } |
| |
| protected void finishRebalance(final long hostId, final long futureOwnerId, final Event event) { |
| |
| final boolean success = event == Event.RebalanceCompleted ? true : false; |
| logger.debug("Finishing rebalancing for the agent {} with event {}", hostId, event); |
| |
| final AgentAttache attache = findAttache(hostId); |
| if (attache == null || !(attache instanceof ClusteredAgentAttache)) { |
| logger.debug("Unable to find forward attache for the host id={} assuming that the agent disconnected already", hostId); |
| _hostTransferDao.completeAgentTransfer(hostId); |
| return; |
| } |
| |
| final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache; |
| |
| if (success) { |
| |
| // 1) Set transfer mode to false - so the agent can start processing requests normally |
| forwardAttache.setTransferMode(false); |
| |
| // 2) Get all transfer requests and route them to peer |
| Request requestToTransfer = forwardAttache.getRequestToTransfer(); |
| while (requestToTransfer != null) { |
| logger.debug("Forwarding request {} held in transfer attache {} from the management server {} to {}", requestToTransfer.getSequence(), hostId, _nodeId, futureOwnerId); |
| final boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes()); |
| if (!routeResult) { |
| logD(requestToTransfer.getBytes(), "Failed to route request to peer"); |
| } |
| |
| requestToTransfer = forwardAttache.getRequestToTransfer(); |
| } |
| |
| logger.debug("Management server {} completed agent {} rebalance to {}", _nodeId, hostId, futureOwnerId); |
| |
| } else { |
| failRebalance(hostId); |
| } |
| |
| logger.debug("Management server {} completed agent {} rebalance", _nodeId, hostId); |
| _hostTransferDao.completeAgentTransfer(hostId); |
| } |
| |
| protected void failRebalance(final long hostId) { |
| try { |
| logger.debug("Management server {} failed to rebalance agent {}", _nodeId, hostId); |
| _hostTransferDao.completeAgentTransfer(hostId); |
| handleDisconnectWithoutInvestigation(findAttache(hostId), Event.RebalanceFailed, true, true); |
| } catch (final Exception ex) { |
| logger.warn("Failed to reconnect host id={} as a part of failed rebalance task cleanup", hostId); |
| } |
| } |
| |
| protected boolean startRebalance(final long hostId) { |
| final HostVO host = _hostDao.findById(hostId); |
| |
| if (host == null || host.getRemoved() != null) { |
| logger.warn("Unable to find host record, fail start rebalancing process"); |
| return false; |
| } |
| |
| synchronized (_agents) { |
| final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId); |
| if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { |
| handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true); |
| final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId); |
| if (forwardAttache == null) { |
| logger.warn("Unable to create a forward attache for the host {} as a part of rebalance process", hostId); |
| return false; |
| } |
| logger.debug("Putting agent id={} to transfer mode", hostId); |
| forwardAttache.setTransferMode(true); |
| _agents.put(hostId, forwardAttache); |
| } else { |
| if (attache == null) { |
| logger.warn("Attache for the agent {} no longer exists on management server, can't start host rebalancing", hostId, _nodeId); |
| } else { |
| logger.warn("Attache for the agent {} has request queue size= {} and listener queue size {}, can't start host rebalancing", |
| hostId, attache.getQueueSize(), attache.getNonRecurringListenersSize()); |
| } |
| return false; |
| } |
| } |
| _hostTransferDao.startAgentTransfer(hostId); |
| return true; |
| } |
| |
| protected void cleanupTransferMap(final long msId) { |
| final List<HostTransferMapVO> hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId); |
| |
| for (final HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) { |
| _hostTransferDao.remove(hostJoingingCluster.getId()); |
| } |
| |
| final List<HostTransferMapVO> hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId); |
| for (final HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) { |
| _hostTransferDao.remove(hostLeavingCluster.getId()); |
| } |
| } |
| |
| protected class RebalanceTask extends ManagedContextRunnable { |
| Long hostId = null; |
| Long currentOwnerId = null; |
| Long futureOwnerId = null; |
| |
| public RebalanceTask(final long hostId, final long currentOwnerId, final long futureOwnerId) { |
| this.hostId = hostId; |
| this.currentOwnerId = currentOwnerId; |
| this.futureOwnerId = futureOwnerId; |
| } |
| |
| @Override |
| protected void runInContext() { |
| try { |
| logger.debug("Rebalancing host id={}", hostId); |
| rebalanceHost(hostId, currentOwnerId, futureOwnerId); |
| } catch (final Exception e) { |
| logger.warn("Unable to rebalance host id={}", hostId, e); |
| } |
| } |
| } |
| |
| private String handleScheduleHostScanTaskCommand(final ScheduleHostScanTaskCommand cmd) { |
| logger.debug("Intercepting resource manager command: {}", _gson.toJson(cmd)); |
| |
| try { |
| scheduleHostScanTask(); |
| } catch (final Exception e) { |
| // Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan |
| // happens at fixed intervals anyways. So handling any exceptions that may be thrown |
| logger.warn("Exception happened while trying to schedule host scan task on mgmt server {}, ignoring as regular host scan happens at fixed " + |
| "interval anyways", _clusterMgr.getSelfPeerName(), e); |
| return null; |
| } |
| |
| final Answer[] answers = new Answer[1]; |
| answers[0] = new Answer(cmd, true, null); |
| return _gson.toJson(answers); |
| } |
| |
| public Answer[] sendToAgent(final Long hostId, final Command[] cmds, final boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { |
| final Commands commands = new Commands(stopOnError ? Command.OnError.Stop : Command.OnError.Continue); |
| for (final Command cmd : cmds) { |
| commands.addCommand(cmd); |
| } |
| return send(hostId, commands); |
| } |
| |
| protected class ClusterDispatcher implements ClusterManager.Dispatcher { |
| @Override |
| public String getName() { |
| return "ClusterDispatcher"; |
| } |
| |
| @Override |
| public String dispatch(final ClusterServicePdu pdu) { |
| |
| logger.debug("Dispatch ->{}, json: {}", pdu.getAgentId(), pdu.getJsonPackage()); |
| |
| Command[] cmds = null; |
| try { |
| cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class); |
| } catch (final Throwable e) { |
| assert false; |
| logger.error("Exception in gson decoding : ", e); |
| } |
| |
| if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { // intercepted |
| final ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; |
| |
| logger.debug("Intercepting command for agent change: agent {} event: {}", cmd.getAgentId(), cmd.getEvent()); |
| boolean result = false; |
| try { |
| result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent()); |
| logger.debug("Result is {}", result); |
| |
| } catch (final AgentUnavailableException e) { |
| logger.warn("Agent is unavailable", e); |
| return null; |
| } |
| |
| final Answer[] answers = new Answer[1]; |
| answers[0] = new ChangeAgentAnswer(cmd, result); |
| return _gson.toJson(answers); |
| } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) { |
| final TransferAgentCommand cmd = (TransferAgentCommand)cmds[0]; |
| |
| logger.debug("Intercepting command for agent rebalancing: agent {} event: {}", cmd.getAgentId(), cmd.getEvent()); |
| boolean result = false; |
| try { |
| result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner()); |
| logger.debug("Result is {}", result); |
| |
| } catch (final AgentUnavailableException e) { |
| logger.warn("Agent is unavailable", e); |
| return null; |
| } catch (final OperationTimedoutException e) { |
| logger.warn("Operation timed out", e); |
| return null; |
| } |
| final Answer[] answers = new Answer[1]; |
| answers[0] = new Answer(cmd, result, null); |
| return _gson.toJson(answers); |
| } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) { |
| final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0]; |
| |
| logger.debug("Intercepting command to propagate event {} for host {}", cmd.getEvent().name(), cmd.getHostId()); |
| |
| boolean result = false; |
| try { |
| result = _resourceMgr.executeUserRequest(cmd.getHostId(), cmd.getEvent()); |
| logger.debug("Result is {}", result); |
| } catch (final AgentUnavailableException ex) { |
| logger.warn("Agent is unavailable", ex); |
| return null; |
| } |
| |
| final Answer[] answers = new Answer[1]; |
| answers[0] = new Answer(cmd, result, null); |
| return _gson.toJson(answers); |
| } else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) { |
| final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0]; |
| return handleScheduleHostScanTaskCommand(cmd); |
| } else if (cmds.length == 1 && cmds[0] instanceof BaseShutdownManagementServerHostCommand) { |
| final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand)cmds[0]; |
| return handleShutdownManagementServerHostCommand(cmd); |
| } |
| |
| try { |
| final long startTick = System.currentTimeMillis(); |
| logger.debug("Dispatch -> {}, json: {}", pdu.getAgentId(), pdu.getJsonPackage()); |
| |
| final Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError()); |
| if (answers != null) { |
| final String jsonReturn = _gson.toJson(answers); |
| |
| logger.debug("Completed dispatching -> {}, json: {} in {} ms, return result: {}", pdu.getAgentId(), |
| pdu.getJsonPackage(), (System.currentTimeMillis() - startTick), jsonReturn); |
| |
| return jsonReturn; |
| } else { |
| logger.debug("Completed dispatching -> {}, json: {} in {} ms, return null result", pdu.getAgentId(), |
| pdu.getJsonPackage(), (System.currentTimeMillis() - startTick)); |
| } |
| } catch (final AgentUnavailableException e) { |
| logger.warn("Agent is unavailable", e); |
| } catch (final OperationTimedoutException e) { |
| logger.warn("Timed Out", e); |
| } |
| |
| return null; |
| } |
| |
| private String handleShutdownManagementServerHostCommand(BaseShutdownManagementServerHostCommand cmd) { |
| if (cmd instanceof PrepareForShutdownManagementServerHostCommand) { |
| logger.debug("Received BaseShutdownManagementServerHostCommand - preparing to shut down"); |
| try { |
| shutdownManager.prepareForShutdown(); |
| return "Successfully prepared for shutdown"; |
| } catch(CloudRuntimeException e) { |
| return e.getMessage(); |
| } |
| } |
| if (cmd instanceof TriggerShutdownManagementServerHostCommand) { |
| logger.debug("Received TriggerShutdownManagementServerHostCommand - triggering a shut down"); |
| try { |
| shutdownManager.triggerShutdown(); |
| return "Successfully triggered shutdown"; |
| } catch(CloudRuntimeException e) { |
| return e.getMessage(); |
| } |
| } |
| if (cmd instanceof CancelShutdownManagementServerHostCommand) { |
| logger.debug("Received CancelShutdownManagementServerHostCommand - cancelling shut down"); |
| try { |
| shutdownManager.cancelShutdown(); |
| return "Successfully prepared for shutdown"; |
| } catch(CloudRuntimeException e) { |
| return e.getMessage(); |
| } |
| } |
| throw new CloudRuntimeException("Unknown BaseShutdownManagementServerHostCommand command received : " + cmd); |
| } |
| } |
| |
| public boolean executeAgentUserRequest(final long agentId, final Event event) throws AgentUnavailableException { |
| return executeUserRequest(agentId, event); |
| } |
| |
| public boolean rebalanceAgent(final long agentId, final Event event, final long currentOwnerId, final long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { |
| return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); |
| } |
| |
| public boolean isAgentRebalanceEnabled() { |
| return EnableLB.value(); |
| } |
| |
| private Runnable getAgentRebalanceScanTask() { |
| return new ManagedContextRunnable() { |
| @Override |
| protected void runInContext() { |
| try { |
| logger.trace("Agent rebalance task check, management server id:{}", _nodeId); |
| // initiate agent lb task will be scheduled and executed only once, and only when number of agents |
| // loaded exceeds _connectedAgentsThreshold |
| if (!_agentLbHappened) { |
| QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class); |
| sc.and(sc.entity().getManagementServerId(), Op.NNULL); |
| sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); |
| final List<HostVO> allManagedRoutingAgents = sc.list(); |
| |
| sc = QueryBuilder.create(HostVO.class); |
| sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing); |
| final List<HostVO> allAgents = sc.list(); |
| final double allHostsCount = allAgents.size(); |
| final double managedHostsCount = allManagedRoutingAgents.size(); |
| if (allHostsCount > 0.0) { |
| final double load = managedHostsCount / allHostsCount; |
| if (load > ConnectedAgentThreshold.value()) { |
| logger.debug("Scheduling agent rebalancing task as the average agent load {} is more than the threshold {}", load, ConnectedAgentThreshold.value()); |
| scheduleRebalanceAgents(); |
| _agentLbHappened = true; |
| } else { |
| logger.debug("Not scheduling agent rebalancing task as the average load {} has not crossed the threshold", load, ConnectedAgentThreshold.value()); |
| } |
| } |
| } |
| } catch (final Throwable e) { |
| logger.error("Problem with the clustered agent transfer scan check!", e); |
| } |
| } |
| }; |
| } |
| |
| @Override |
| public void rescan() { |
| // schedule a scan task immediately |
| logger.debug("Scheduling a host scan task"); |
| // schedule host scan task on current MS |
| scheduleHostScanTask(); |
| logger.debug("Notifying all peer MS to schedule host scan task"); |
| } |
| |
| @Override |
| public ConfigKey<?>[] getConfigKeys() { |
| final ConfigKey<?>[] keys = super.getConfigKeys(); |
| |
| final List<ConfigKey<?>> keysLst = new ArrayList<ConfigKey<?>>(); |
| keysLst.addAll(Arrays.asList(keys)); |
| keysLst.add(EnableLB); |
| keysLst.add(ConnectedAgentThreshold); |
| keysLst.add(LoadSize); |
| keysLst.add(ScanInterval); |
| return keysLst.toArray(new ConfigKey<?>[keysLst.size()]); |
| } |
| } |