| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| package com.cloud.agent.manager; |
| |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.InvocationTargetException; |
| import java.nio.channels.ClosedChannelException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import javax.inject.Inject; |
| import javax.naming.ConfigurationException; |
| |
| import com.cloud.configuration.Config; |
| import com.cloud.utils.NumbersUtil; |
| import com.cloud.utils.db.GlobalLock; |
| import org.apache.cloudstack.agent.lb.IndirectAgentLB; |
| import org.apache.cloudstack.ca.CAManager; |
| import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService; |
| import org.apache.cloudstack.framework.config.ConfigKey; |
| import org.apache.cloudstack.framework.config.Configurable; |
| import org.apache.cloudstack.framework.config.dao.ConfigurationDao; |
| import org.apache.cloudstack.framework.jobs.AsyncJob; |
| import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; |
| import org.apache.cloudstack.managed.context.ManagedContextRunnable; |
| import org.apache.cloudstack.outofbandmanagement.dao.OutOfBandManagementDao; |
| import org.apache.cloudstack.utils.identity.ManagementServerNode; |
| import org.apache.commons.lang3.BooleanUtils; |
| |
| import com.cloud.agent.AgentManager; |
| import com.cloud.agent.Listener; |
| import com.cloud.agent.StartupCommandProcessor; |
| import com.cloud.agent.api.AgentControlAnswer; |
| import com.cloud.agent.api.AgentControlCommand; |
| import com.cloud.agent.api.Answer; |
| import com.cloud.agent.api.CheckHealthCommand; |
| import com.cloud.agent.api.Command; |
| import com.cloud.agent.api.PingAnswer; |
| import com.cloud.agent.api.PingCommand; |
| import com.cloud.agent.api.PingRoutingCommand; |
| import com.cloud.agent.api.ReadyAnswer; |
| import com.cloud.agent.api.ReadyCommand; |
| import com.cloud.agent.api.SetHostParamsCommand; |
| import com.cloud.agent.api.ShutdownCommand; |
| import com.cloud.agent.api.StartupAnswer; |
| import com.cloud.agent.api.StartupCommand; |
| import com.cloud.agent.api.StartupProxyCommand; |
| import com.cloud.agent.api.StartupRoutingCommand; |
| import com.cloud.agent.api.StartupSecondaryStorageCommand; |
| import com.cloud.agent.api.StartupStorageCommand; |
| import com.cloud.agent.api.UnsupportedAnswer; |
| import com.cloud.agent.transport.Request; |
| import com.cloud.agent.transport.Response; |
| import com.cloud.alert.AlertManager; |
| import com.cloud.configuration.ManagementServiceConfiguration; |
| import com.cloud.dc.ClusterVO; |
| import com.cloud.dc.DataCenterVO; |
| import com.cloud.dc.HostPodVO; |
| import com.cloud.dc.dao.ClusterDao; |
| import com.cloud.dc.dao.DataCenterDao; |
| import com.cloud.dc.dao.HostPodDao; |
| import com.cloud.exception.AgentUnavailableException; |
| import com.cloud.exception.ConnectionException; |
| import com.cloud.exception.OperationTimedoutException; |
| import com.cloud.exception.UnsupportedVersionException; |
| import com.cloud.ha.HighAvailabilityManager; |
| import com.cloud.host.Host; |
| import com.cloud.host.HostVO; |
| import com.cloud.host.Status; |
| import com.cloud.host.Status.Event; |
| import com.cloud.host.dao.HostDao; |
| import com.cloud.hypervisor.Hypervisor.HypervisorType; |
| import com.cloud.hypervisor.HypervisorGuruManager; |
| import com.cloud.resource.Discoverer; |
| import com.cloud.resource.ResourceManager; |
| import com.cloud.resource.ResourceState; |
| import com.cloud.resource.ServerResource; |
| import com.cloud.utils.Pair; |
| import com.cloud.utils.component.ManagerBase; |
| import com.cloud.utils.concurrency.NamedThreadFactory; |
| import com.cloud.utils.db.DB; |
| import com.cloud.utils.db.EntityManager; |
| import com.cloud.utils.db.QueryBuilder; |
| import com.cloud.utils.db.SearchCriteria.Op; |
| import com.cloud.utils.db.TransactionLegacy; |
| import com.cloud.utils.exception.CloudRuntimeException; |
| import com.cloud.utils.exception.HypervisorVersionChangedException; |
| import com.cloud.utils.exception.NioConnectionException; |
| import com.cloud.utils.exception.TaskExecutionException; |
| import com.cloud.utils.fsm.NoTransitionException; |
| import com.cloud.utils.fsm.StateMachine2; |
| import com.cloud.utils.nio.HandlerFactory; |
| import com.cloud.utils.nio.Link; |
| import com.cloud.utils.nio.NioServer; |
| import com.cloud.utils.nio.Task; |
| import com.cloud.utils.time.InaccurateClock; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.logging.log4j.ThreadContext; |
| |
| /** |
| * Implementation of the Agent Manager. This class controls the connection to the agents. |
| **/ |
| public class AgentManagerImpl extends ManagerBase implements AgentManager, HandlerFactory, Configurable { |
| |
| /** |
| * _agents is a ConcurrentHashMap, but it is used from within a synchronized block. This will be reported by findbugs as JLM_JSR166_UTILCONCURRENT_MONITORENTER. Maybe a |
| * ConcurrentHashMap is not the right thing to use here, but i'm not sure so i leave it alone. |
| */ |
| protected ConcurrentHashMap<Long, AgentAttache> _agents = new ConcurrentHashMap<Long, AgentAttache>(10007); |
| protected List<Pair<Integer, Listener>> _hostMonitors = new ArrayList<Pair<Integer, Listener>>(17); |
| protected List<Pair<Integer, Listener>> _cmdMonitors = new ArrayList<Pair<Integer, Listener>>(17); |
| protected List<Pair<Integer, StartupCommandProcessor>> _creationMonitors = new ArrayList<Pair<Integer, StartupCommandProcessor>>(17); |
| protected List<Long> _loadingAgents = new ArrayList<Long>(); |
| private int _monitorId = 0; |
| private final Lock _agentStatusLock = new ReentrantLock(); |
| |
| @Inject |
| protected CAManager caService; |
| @Inject |
| protected EntityManager _entityMgr; |
| |
| protected NioServer _connection; |
| @Inject |
| protected HostDao _hostDao = null; |
| @Inject |
| protected OutOfBandManagementDao outOfBandManagementDao; |
| @Inject |
| protected DataCenterDao _dcDao = null; |
| @Inject |
| protected HostPodDao _podDao = null; |
| @Inject |
| protected ConfigurationDao _configDao = null; |
| @Inject |
| protected ClusterDao _clusterDao = null; |
| |
| @Inject |
| protected HighAvailabilityManager _haMgr = null; |
| @Inject |
| protected AlertManager _alertMgr = null; |
| |
| @Inject |
| protected HypervisorGuruManager _hvGuruMgr; |
| |
| @Inject |
| protected IndirectAgentLB indirectAgentLB; |
| |
| protected int _retry = 2; |
| |
| protected long _nodeId = -1; |
| |
| protected ExecutorService _executor; |
| protected ThreadPoolExecutor _connectExecutor; |
| protected ScheduledExecutorService _directAgentExecutor; |
| protected ScheduledExecutorService _cronJobExecutor; |
| protected ScheduledExecutorService _monitorExecutor; |
| |
| private int _directAgentThreadCap; |
| |
| protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = Status.getStateMachine(); |
| private final ConcurrentHashMap<Long, Long> _pingMap = new ConcurrentHashMap<Long, Long>(10007); |
| |
| @Inject |
| ResourceManager _resourceMgr; |
| @Inject |
| ManagementServiceConfiguration mgmtServiceConf; |
| |
| protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>("Advanced", Integer.class, "workers", "5", |
| "Number of worker threads handling remote agent connections.", false); |
| protected final ConfigKey<Integer> Port = new ConfigKey<Integer>("Advanced", Integer.class, "port", "8250", "Port to listen on for remote agent connections.", false); |
| protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>("Advanced", Integer.class, "alert.wait", "1800", |
| "Seconds to wait before alerting on a disconnected agent", true); |
| protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.load.size", "16", |
| "The number of direct agents to load each time", false); |
| protected final ConfigKey<Integer> DirectAgentPoolSize = new ConfigKey<Integer>("Advanced", Integer.class, "direct.agent.pool.size", "500", |
| "Default size for DirectAgentPool", false); |
| protected final ConfigKey<Float> DirectAgentThreadCap = new ConfigKey<Float>("Advanced", Float.class, "direct.agent.thread.cap", "1", |
| "Percentage (as a value between 0 and 1) of direct.agent.pool.size to be used as upper thread cap for a single direct agent to process requests", false); |
| protected final ConfigKey<Boolean> CheckTxnBeforeSending = new ConfigKey<Boolean>("Developer", Boolean.class, "check.txn.before.sending.agent.commands", "false", |
| "This parameter allows developers to enable a check to see if a transaction wraps commands that are sent to the resource. This is not to be enabled on production systems.", true); |
| |
| @Override |
| public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException { |
| |
| logger.info("Ping Timeout is {}.", mgmtServiceConf.getPingTimeout()); |
| |
| final int threads = DirectAgentLoadSize.value(); |
| |
| _nodeId = ManagementServerNode.getManagementServerId(); |
| logger.info("Configuring AgentManagerImpl. management server node id(msid): {}.", _nodeId); |
| |
| final long lastPing = (System.currentTimeMillis() >> 10) - mgmtServiceConf.getTimeout(); |
| _hostDao.markHostsAsDisconnected(_nodeId, lastPing); |
| |
| registerForHostEvents(new BehindOnPingListener(), true, true, false); |
| |
| registerForHostEvents(new SetHostParamsListener(), true, true, false); |
| |
| _executor = new ThreadPoolExecutor(threads, threads, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentTaskPool")); |
| |
| _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentConnectTaskPool")); |
| // allow core threads to time out even when there are no items in the queue |
| _connectExecutor.allowCoreThreadTimeOut(true); |
| |
| _connection = new NioServer("AgentManager", Port.value(), Workers.value() + 10, this, caService); |
| logger.info("Listening on {} with {} workers.", Port.value(), Workers.value()); |
| |
| // executes all agent commands other than cron and ping |
| _directAgentExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgent")); |
| // executes cron and ping agent commands |
| _cronJobExecutor = new ScheduledThreadPoolExecutor(DirectAgentPoolSize.value(), new NamedThreadFactory("DirectAgentCronJob")); |
| logger.debug("Created DirectAgentAttache pool with size: {}.", DirectAgentPoolSize.value()); |
| _directAgentThreadCap = Math.round(DirectAgentPoolSize.value() * DirectAgentThreadCap.value()) + 1; // add 1 to always make the value > 0 |
| |
| _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); |
| |
| return true; |
| } |
| |
| @Override |
| public Task create(final Task.Type type, final Link link, final byte[] data) { |
| return new AgentHandler(type, link, data); |
| } |
| |
| @Override |
| public int registerForHostEvents(final Listener listener, final boolean connections, final boolean commands, final boolean priority) { |
| synchronized (_hostMonitors) { |
| _monitorId++; |
| if (connections) { |
| if (priority) { |
| _hostMonitors.add(0, new Pair<Integer, Listener>(_monitorId, listener)); |
| } else { |
| _hostMonitors.add(new Pair<Integer, Listener>(_monitorId, listener)); |
| } |
| } |
| if (commands) { |
| if (priority) { |
| _cmdMonitors.add(0, new Pair<Integer, Listener>(_monitorId, listener)); |
| } else { |
| _cmdMonitors.add(new Pair<Integer, Listener>(_monitorId, listener)); |
| } |
| } |
| logger.debug("Registering listener {} with id {}", listener.getClass().getSimpleName(), _monitorId); |
| return _monitorId; |
| } |
| } |
| |
| @Override |
| public int registerForInitialConnects(final StartupCommandProcessor creator, final boolean priority) { |
| synchronized (_hostMonitors) { |
| _monitorId++; |
| if (priority) { |
| _creationMonitors.add(0, new Pair<Integer, StartupCommandProcessor>(_monitorId, creator)); |
| } else { |
| _creationMonitors.add(new Pair<Integer, StartupCommandProcessor>(_monitorId, creator)); |
| } |
| return _monitorId; |
| } |
| } |
| |
| @Override |
| public void unregisterForHostEvents(final int id) { |
| logger.debug("Deregistering {}", id); |
| _hostMonitors.remove(id); |
| } |
| |
| private AgentControlAnswer handleControlCommand(final AgentAttache attache, final AgentControlCommand cmd) { |
| AgentControlAnswer answer = null; |
| |
| for (final Pair<Integer, Listener> listener : _cmdMonitors) { |
| answer = listener.second().processControlCommand(attache.getId(), cmd); |
| |
| if (answer != null) { |
| return answer; |
| } |
| } |
| |
| logger.warn("No handling of agent control command: {} sent from {}", cmd, attache.getId()); |
| return new AgentControlAnswer(cmd); |
| } |
| |
| public void handleCommands(final AgentAttache attache, final long sequence, final Command[] cmds) { |
| for (final Pair<Integer, Listener> listener : _cmdMonitors) { |
| final boolean processed = listener.second().processCommands(attache.getId(), sequence, cmds); |
| logger.trace("SeqA {}-{}: {} by {}", attache.getId(), sequence, (processed ? "processed" : "not processed"), listener.getClass()); |
| } |
| } |
| |
| public void notifyAnswersToMonitors(final long agentId, final long seq, final Answer[] answers) { |
| for (final Pair<Integer, Listener> listener : _cmdMonitors) { |
| listener.second().processAnswers(agentId, seq, answers); |
| } |
| } |
| |
| public AgentAttache findAttache(final long hostId) { |
| AgentAttache attache = null; |
| synchronized (_agents) { |
| attache = _agents.get(hostId); |
| } |
| return attache; |
| } |
| |
| @Override |
| public Answer sendTo(final Long dcId, final HypervisorType type, final Command cmd) { |
| final List<ClusterVO> clusters = _clusterDao.listByDcHyType(dcId, type.toString()); |
| int retry = 0; |
| for (final ClusterVO cluster : clusters) { |
| final List<HostVO> hosts = _resourceMgr.listAllUpAndEnabledHosts(Host.Type.Routing, cluster.getId(), null, dcId); |
| for (final HostVO host : hosts) { |
| retry++; |
| if (retry > _retry) { |
| return null; |
| } |
| Answer answer = null; |
| try { |
| final long targetHostId = _hvGuruMgr.getGuruProcessedCommandTargetHost(host.getId(), cmd, host.getHypervisorType()); |
| answer = easySend(targetHostId, cmd); |
| } catch (final Exception e) { |
| String errorMsg = String.format("Error sending command %s to host %s, due to %s", cmd.getClass().getName(), |
| host.getUuid(), e.getLocalizedMessage()); |
| logger.error(errorMsg); |
| logger.debug(errorMsg, e); |
| } |
| if (answer != null) { |
| return answer; |
| } |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public Answer send(final Long hostId, final Command cmd) throws AgentUnavailableException, OperationTimedoutException { |
| final Commands cmds = new Commands(Command.OnError.Stop); |
| cmds.addCommand(cmd); |
| send(hostId, cmds, cmd.getWait()); |
| final Answer[] answers = cmds.getAnswers(); |
| if (answers != null && !(answers[0] instanceof UnsupportedAnswer)) { |
| return answers[0]; |
| } |
| |
| if (answers != null && answers[0] instanceof UnsupportedAnswer) { |
| logger.warn("Unsupported Command: {}", answers[0].getDetails()); |
| return answers[0]; |
| } |
| |
| return null; |
| } |
| |
| @DB |
| protected boolean noDbTxn() { |
| final TransactionLegacy txn = TransactionLegacy.currentTxn(); |
| return !txn.dbTxnStarted(); |
| } |
| |
| private static void tagCommand(final Command cmd) { |
| final AsyncJobExecutionContext context = AsyncJobExecutionContext.getCurrent(); |
| if (context != null && context.getJob() != null) { |
| final AsyncJob job = context.getJob(); |
| |
| if (job.getRelated() != null && !job.getRelated().isEmpty()) { |
| cmd.setContextParam("job", "job-" + job.getRelated() + "/" + "job-" + job.getId()); |
| } else { |
| cmd.setContextParam("job", "job-" + job.getId()); |
| } |
| } |
| String logcontextid = ThreadContext.get("logcontextid"); |
| if (StringUtils.isNotEmpty(logcontextid)) { |
| cmd.setContextParam("logid", logcontextid); |
| } |
| } |
| |
| /** |
| * @param commands |
| * @return |
| */ |
| private Command[] checkForCommandsAndTag(final Commands commands) { |
| final Command[] cmds = commands.toCommands(); |
| |
| assert cmds.length > 0 : "Ask yourself this about a hundred times. Why am I sending zero length commands?"; |
| |
| setEmptyAnswers(commands, cmds); |
| |
| for (final Command cmd : cmds) { |
| tagCommand(cmd); |
| } |
| return cmds; |
| } |
| |
| /** |
| * @param commands |
| * @param cmds |
| */ |
| private void setEmptyAnswers(final Commands commands, final Command[] cmds) { |
| if (cmds.length == 0) { |
| commands.setAnswers(new Answer[0]); |
| } |
| } |
| |
| @Override |
| public Answer[] send(final Long hostId, final Commands commands, int timeout) throws AgentUnavailableException, OperationTimedoutException { |
| assert hostId != null : "Who's not checking the agent id before sending? ... (finger wagging)"; |
| if (hostId == null) { |
| throw new AgentUnavailableException(-1); |
| } |
| |
| if (timeout <= 0) { |
| timeout = Wait.value(); |
| } |
| |
| if (CheckTxnBeforeSending.value()) { |
| if (!noDbTxn()) { |
| throw new CloudRuntimeException("We do not allow transactions to be wrapped around commands sent to be executed on remote agents. " |
| + "We cannot predict how long it takes a command to complete. " + "The transaction may be rolled back because the connection took too long."); |
| } |
| } else { |
| assert noDbTxn() : "I know, I know. Why are we so strict as to not allow txn across an agent call? ... Why are we so cruel ... Why are we such a dictator .... Too bad... Sorry...but NO AGENT COMMANDS WRAPPED WITHIN DB TRANSACTIONS!"; |
| } |
| |
| final Command[] cmds = checkForCommandsAndTag(commands); |
| |
| //check what agent is returned. |
| final AgentAttache agent = getAttache(hostId); |
| if (agent == null || agent.isClosed()) { |
| throw new AgentUnavailableException("agent not logged into this management server", hostId); |
| } |
| |
| final Request req = new Request(hostId, agent.getName(), _nodeId, cmds, commands.stopOnError(), true); |
| req.setSequence(agent.getNextSequence()); |
| final Answer[] answers = agent.send(req, timeout); |
| notifyAnswersToMonitors(hostId, req.getSequence(), answers); |
| commands.setAnswers(answers); |
| return answers; |
| } |
| |
| protected Status investigate(final AgentAttache agent) { |
| final Long hostId = agent.getId(); |
| final HostVO host = _hostDao.findById(hostId); |
| if (host != null && host.getType() != null && !host.getType().isVirtual()) { |
| logger.debug("Checking if agent ({}) is alive", hostId); |
| final Answer answer = easySend(hostId, new CheckHealthCommand()); |
| if (answer != null && answer.getResult()) { |
| final Status status = Status.Up; |
| logger.debug("Agent ({}) responded to checkHealthCommand, reporting that agent is {}", hostId, status); |
| return status; |
| } |
| return _haMgr.investigate(hostId); |
| } |
| return Status.Alert; |
| } |
| |
| protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException { |
| if (hostId == null) { |
| return null; |
| } |
| final AgentAttache agent = findAttache(hostId); |
| if (agent == null) { |
| logger.debug("Unable to find agent for {}", hostId); |
| throw new AgentUnavailableException("Unable to find agent ", hostId); |
| } |
| |
| return agent; |
| } |
| |
| @Override |
| public long send(final Long hostId, final Commands commands, final Listener listener) throws AgentUnavailableException { |
| final AgentAttache agent = getAttache(hostId); |
| if (agent.isClosed()) { |
| throw new AgentUnavailableException("Agent " + agent.getId() + " is closed", agent.getId()); |
| } |
| |
| final Command[] cmds = checkForCommandsAndTag(commands); |
| |
| final Request req = new Request(hostId, agent.getName(), _nodeId, cmds, commands.stopOnError(), true); |
| req.setSequence(agent.getNextSequence()); |
| |
| agent.send(req, listener); |
| return req.getSequence(); |
| } |
| |
| public void removeAgent(final AgentAttache attache, final Status nextState) { |
| if (attache == null) { |
| return; |
| } |
| final long hostId = attache.getId(); |
| logger.debug("Remove Agent : {}", hostId); |
| AgentAttache removed = null; |
| boolean conflict = false; |
| synchronized (_agents) { |
| removed = _agents.remove(hostId); |
| if (removed != null && removed != attache) { |
| conflict = true; |
| _agents.put(hostId, removed); |
| removed = attache; |
| } |
| } |
| if (conflict) { |
| logger.debug("Agent for host {} is created when it is being disconnected", hostId); |
| } |
| if (removed != null) { |
| removed.disconnect(nextState); |
| } |
| |
| for (final Pair<Integer, Listener> monitor : _hostMonitors) { |
| logger.debug("Sending Disconnect to listener: {}", monitor.second().getClass().getName()); |
| monitor.second().processDisconnect(hostId, nextState); |
| } |
| } |
| |
| @Override |
| public void notifyMonitorsOfNewlyAddedHost(long hostId) { |
| for (final Pair<Integer, Listener> monitor : _hostMonitors) { |
| logger.debug("Sending host added to listener: {}", monitor.second().getClass().getSimpleName()); |
| |
| monitor.second().processHostAdded(hostId); |
| } |
| } |
| |
| protected AgentAttache notifyMonitorsOfConnection(final AgentAttache attache, final StartupCommand[] cmd, final boolean forRebalance) throws ConnectionException { |
| final long hostId = attache.getId(); |
| final HostVO host = _hostDao.findById(hostId); |
| for (final Pair<Integer, Listener> monitor : _hostMonitors) { |
| logger.debug("Sending Connect to listener: {}", monitor.second().getClass().getSimpleName()); |
| for (int i = 0; i < cmd.length; i++) { |
| try { |
| monitor.second().processConnect(host, cmd[i], forRebalance); |
| } catch (final Exception e) { |
| if (e instanceof ConnectionException) { |
| final ConnectionException ce = (ConnectionException)e; |
| if (ce.isSetupError()) { |
| logger.warn("Monitor " + monitor.second().getClass().getSimpleName() + " says there is an error in the connect process for " + hostId + " due to " + e.getMessage()); |
| handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, true, true); |
| throw ce; |
| } else { |
| logger.info("Monitor " + monitor.second().getClass().getSimpleName() + " says not to continue the connect process for " + hostId + " due to " + e.getMessage()); |
| handleDisconnectWithoutInvestigation(attache, Event.ShutdownRequested, true, true); |
| return attache; |
| } |
| } else if (e instanceof HypervisorVersionChangedException) { |
| handleDisconnectWithoutInvestigation(attache, Event.ShutdownRequested, true, true); |
| throw new CloudRuntimeException("Unable to connect " + attache.getId(), e); |
| } else { |
| logger.error("Monitor {} says there is an error in the connect process for {} due to {}", monitor.second().getClass().getSimpleName(), hostId, e.getMessage(), e); |
| handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, true, true); |
| throw new CloudRuntimeException("Unable to connect " + attache.getId(), e); |
| } |
| } |
| } |
| } |
| |
| final Long dcId = host.getDataCenterId(); |
| final ReadyCommand ready = new ReadyCommand(dcId, host.getId(), NumbersUtil.enableHumanReadableSizes); |
| ready.setWait(ReadyCommandWait.value()); |
| final Answer answer = easySend(hostId, ready); |
| if (answer == null || !answer.getResult()) { |
| // this is tricky part for secondary storage |
| // make it as disconnected, wait for secondary storage VM to be up |
| // return the attache instead of null, even it is disconnectede |
| handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, true, true); |
| } |
| if (answer instanceof ReadyAnswer) { |
| ReadyAnswer readyAnswer = (ReadyAnswer)answer; |
| Map<String, String> detailsMap = readyAnswer.getDetailsMap(); |
| if (detailsMap != null) { |
| String uefiEnabled = detailsMap.get(Host.HOST_UEFI_ENABLE); |
| logger.debug("Got HOST_UEFI_ENABLE [{}] for hostId [{}]:", uefiEnabled, host.getUuid()); |
| if (uefiEnabled != null) { |
| _hostDao.loadDetails(host); |
| if (!uefiEnabled.equals(host.getDetails().get(Host.HOST_UEFI_ENABLE))) { |
| host.getDetails().put(Host.HOST_UEFI_ENABLE, uefiEnabled); |
| _hostDao.saveDetails(host); |
| } |
| } |
| } |
| } |
| |
| agentStatusTransitTo(host, Event.Ready, _nodeId); |
| attache.ready(); |
| return attache; |
| } |
| |
| @Override |
| public boolean start() { |
| startDirectlyConnectedHosts(); |
| |
| if (_connection != null) { |
| try { |
| _connection.start(); |
| } catch (final NioConnectionException e) { |
| logger.error("Error when connecting to the NioServer!", e); |
| } |
| } |
| |
| _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), mgmtServiceConf.getPingInterval(), mgmtServiceConf.getPingInterval(), TimeUnit.SECONDS); |
| |
| return true; |
| } |
| |
| public void startDirectlyConnectedHosts() { |
| final List<HostVO> hosts = _resourceMgr.findDirectlyConnectedHosts(); |
| for (final HostVO host : hosts) { |
| loadDirectlyConnectedHost(host, false); |
| } |
| } |
| |
| private ServerResource loadResourcesWithoutHypervisor(final HostVO host) { |
| final String resourceName = host.getResource(); |
| ServerResource resource = null; |
| try { |
| final Class<?> clazz = Class.forName(resourceName); |
| final Constructor<?> constructor = clazz.getConstructor(); |
| resource = (ServerResource)constructor.newInstance(); |
| } catch (final ClassNotFoundException e) { |
| logger.warn("Unable to find class " + host.getResource(), e); |
| } catch (final InstantiationException e) { |
| logger.warn("Unable to instantiate class " + host.getResource(), e); |
| } catch (final IllegalAccessException e) { |
| logger.warn("Illegal access " + host.getResource(), e); |
| } catch (final SecurityException e) { |
| logger.warn("Security error on " + host.getResource(), e); |
| } catch (final NoSuchMethodException e) { |
| logger.warn("NoSuchMethodException error on " + host.getResource(), e); |
| } catch (final IllegalArgumentException e) { |
| logger.warn("IllegalArgumentException error on " + host.getResource(), e); |
| } catch (final InvocationTargetException e) { |
| logger.warn("InvocationTargetException error on " + host.getResource(), e); |
| } |
| |
| if (resource != null) { |
| _hostDao.loadDetails(host); |
| |
| final HashMap<String, Object> params = new HashMap<String, Object>(host.getDetails().size() + 5); |
| params.putAll(host.getDetails()); |
| |
| params.put("guid", host.getGuid()); |
| params.put("zone", Long.toString(host.getDataCenterId())); |
| if (host.getPodId() != null) { |
| params.put("pod", Long.toString(host.getPodId())); |
| } |
| if (host.getClusterId() != null) { |
| params.put("cluster", Long.toString(host.getClusterId())); |
| String guid = null; |
| final ClusterVO cluster = _clusterDao.findById(host.getClusterId()); |
| if (cluster.getGuid() == null) { |
| guid = host.getDetail("pool"); |
| } else { |
| guid = cluster.getGuid(); |
| } |
| if (guid != null && !guid.isEmpty()) { |
| params.put("pool", guid); |
| } |
| } |
| |
| params.put("ipaddress", host.getPrivateIpAddress()); |
| params.put("secondary.storage.vm", "false"); |
| |
| try { |
| resource.configure(host.getName(), params); |
| } catch (final ConfigurationException e) { |
| logger.warn("Unable to configure resource due to {}", e.getMessage()); |
| return null; |
| } |
| |
| if (!resource.start()) { |
| logger.warn("Unable to start the resource"); |
| return null; |
| } |
| } |
| return resource; |
| } |
| |
| @Override |
| public void rescan() { |
| } |
| |
| protected boolean loadDirectlyConnectedHost(final HostVO host, final boolean forRebalance) { |
| boolean initialized = false; |
| ServerResource resource = null; |
| try { |
| // load the respective discoverer |
| final Discoverer discoverer = _resourceMgr.getMatchingDiscover(host.getHypervisorType()); |
| if (discoverer == null) { |
| logger.info("Could not to find a Discoverer to load the resource: {} for hypervisor type: {}", host.getId(), host.getHypervisorType()); |
| resource = loadResourcesWithoutHypervisor(host); |
| } else { |
| resource = discoverer.reloadResource(host); |
| } |
| |
| if (resource == null) { |
| logger.warn("Unable to load the resource: {}", host.getId()); |
| return false; |
| } |
| |
| initialized = true; |
| } finally { |
| if (!initialized) { |
| if (host != null) { |
| agentStatusTransitTo(host, Event.AgentDisconnected, _nodeId); |
| } |
| } |
| } |
| |
| if (forRebalance) { |
| tapLoadingAgents(host.getId(), TapAgentsAction.Add); |
| final Host h = _resourceMgr.createHostAndAgent(host.getId(), resource, host.getDetails(), false, null, true); |
| tapLoadingAgents(host.getId(), TapAgentsAction.Del); |
| |
| return h == null ? false : true; |
| } else { |
| _executor.execute(new SimulateStartTask(host.getId(), resource, host.getDetails())); |
| return true; |
| } |
| } |
| |
| protected AgentAttache createAttacheForDirectConnect(final Host host, final ServerResource resource) throws ConnectionException { |
| logger.debug("create DirectAgentAttache for {}", host.getId()); |
| final DirectAgentAttache attache = new DirectAgentAttache(this, host.getId(), host.getName(), resource, host.isInMaintenanceStates()); |
| |
| AgentAttache old = null; |
| synchronized (_agents) { |
| old = _agents.put(host.getId(), attache); |
| } |
| if (old != null) { |
| old.disconnect(Status.Removed); |
| } |
| |
| return attache; |
| } |
| |
| @Override |
| public boolean stop() { |
| |
| if (_connection != null) { |
| _connection.stop(); |
| } |
| |
| logger.info("Disconnecting agents: {}", _agents.size()); |
| synchronized (_agents) { |
| for (final AgentAttache agent : _agents.values()) { |
| final HostVO host = _hostDao.findById(agent.getId()); |
| if (host == null) { |
| logger.debug("Cant not find host {}", agent.getId()); |
| } else { |
| if (!agent.forForward()) { |
| agentStatusTransitTo(host, Event.ManagementServerDown, _nodeId); |
| } |
| } |
| } |
| } |
| |
| _connectExecutor.shutdownNow(); |
| _monitorExecutor.shutdownNow(); |
| return true; |
| } |
| |
| protected Status getNextStatusOnDisconnection(Host host, final Status.Event event) { |
| final Status currentStatus = host.getStatus(); |
| Status nextStatus; |
| if (currentStatus == Status.Down || currentStatus == Status.Alert || currentStatus == Status.Removed) { |
| logger.debug("Host {} is already {}", host.getUuid(), currentStatus); |
| nextStatus = currentStatus; |
| } else { |
| try { |
| nextStatus = currentStatus.getNextStatus(event); |
| } catch (final NoTransitionException e) { |
| final String err = String.format("Cannot find next status for %s as current status is %s for agent %s", event, currentStatus, host.getUuid()); |
| logger.debug(err); |
| throw new CloudRuntimeException(err); |
| } |
| logger.debug("The next status of agent {} is {}, current status is {}", host.getUuid(), nextStatus, currentStatus); |
| } |
| return nextStatus; |
| } |
| |
| protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) { |
| final long hostId = attache.getId(); |
| |
| boolean result = false; |
| GlobalLock joinLock = getHostJoinLock(hostId); |
| if (joinLock.lock(60)) { |
| try { |
| logger.info("Host {} is disconnecting with event {}", hostId, event); |
| Status nextStatus = null; |
| final HostVO host = _hostDao.findById(hostId); |
| if (host == null) { |
| logger.warn("Can't find host with {}", hostId); |
| nextStatus = Status.Removed; |
| } else { |
| nextStatus = getNextStatusOnDisconnection(host, event); |
| caService.purgeHostCertificate(host); |
| } |
| logger.debug("Deregistering link for {} with state {}", hostId, nextStatus); |
| |
| removeAgent(attache, nextStatus); |
| |
| if (host != null && transitState) { |
| // update the state for host in DB as per the event |
| disconnectAgent(host, event, _nodeId); |
| } |
| } finally { |
| joinLock.unlock(); |
| } |
| result = true; |
| } |
| joinLock.releaseRef(); |
| return result; |
| } |
| |
| protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, Status.Event event) { |
| final long hostId = attache.getId(); |
| HostVO host = _hostDao.findById(hostId); |
| |
| if (host != null) { |
| Status nextStatus = null; |
| try { |
| nextStatus = host.getStatus().getNextStatus(event); |
| } catch (final NoTransitionException ne) { |
| /* |
| * Agent may be currently in status of Down, Alert, Removed, namely there is no next status for some events. Why this can happen? Ask God not me. I hate there was |
| * no piece of comment for code handling race condition. God knew what race condition the code dealt with! |
| */ |
| logger.debug("Caught exception while getting agent's next status", ne); |
| } |
| |
| if (nextStatus == Status.Alert) { |
| /* OK, we are going to the bad status, let's see what happened */ |
| logger.info("Investigating why host {} has disconnected with event", hostId, event); |
| |
| Status determinedState = investigate(attache); |
| // if state cannot be determined do nothing and bail out |
| if (determinedState == null) { |
| if ((System.currentTimeMillis() >> 10) - host.getLastPinged() > AlertWait.value()) { |
| logger.warn("Agent {} state cannot be determined for more than {}({}) seconds, will go to Alert state", hostId, AlertWait, AlertWait.value()); |
| determinedState = Status.Alert; |
| } else { |
| logger.warn("Agent {} state cannot be determined, do nothing", hostId); |
| return false; |
| } |
| } |
| |
| final Status currentStatus = host.getStatus(); |
| logger.info("The agent from host {} state determined is {}", hostId, determinedState); |
| |
| if (determinedState == Status.Down) { |
| final String message = "Host is down: " + host.getId() + "-" + host.getName() + ". Starting HA on the VMs"; |
| logger.error(message); |
| if (host.getType() != Host.Type.SecondaryStorage && host.getType() != Host.Type.ConsoleProxy) { |
| _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host down, " + host.getId(), message); |
| } |
| event = Status.Event.HostDown; |
| } else if (determinedState == Status.Up) { |
| /* Got ping response from host, bring it back */ |
| logger.info("Agent is determined to be up and running"); |
| agentStatusTransitTo(host, Status.Event.Ping, _nodeId); |
| return false; |
| } else if (determinedState == Status.Disconnected) { |
| logger.warn("Agent is disconnected but the host is still up: {}-{}", host.getId(), host.getName() + |
| '-' + host.getResourceState()); |
| if (currentStatus == Status.Disconnected || |
| (currentStatus == Status.Up && host.getResourceState() == ResourceState.PrepareForMaintenance)) { |
| if ((System.currentTimeMillis() >> 10) - host.getLastPinged() > AlertWait.value()) { |
| logger.warn("Host {} has been disconnected past the wait time it should be disconnected.", host.getId()); |
| event = Status.Event.WaitedTooLong; |
| } else { |
| logger.debug("Host {} has been determined to be disconnected but it hasn't passed the wait time yet.", host.getId()); |
| return false; |
| } |
| } else if (currentStatus == Status.Up) { |
| final DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId()); |
| final HostPodVO podVO = _podDao.findById(host.getPodId()); |
| final String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName(); |
| if (host.getType() != Host.Type.SecondaryStorage && host.getType() != Host.Type.ConsoleProxy) { |
| _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host disconnected, " + hostDesc, |
| "If the agent for host [" + hostDesc + "] is not restarted within " + AlertWait + " seconds, host will go to Alert state"); |
| } |
| event = Status.Event.AgentDisconnected; |
| } |
| } else { |
| // if we end up here we are in alert state, send an alert |
| final DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId()); |
| final HostPodVO podVO = _podDao.findById(host.getPodId()); |
| final String podName = podVO != null ? podVO.getName() : "NO POD"; |
| final String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podName; |
| _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host in ALERT state, " + hostDesc, |
| "In availability zone " + host.getDataCenterId() + ", host is in alert state: " + host.getId() + "-" + host.getName()); |
| } |
| } else { |
| logger.debug("The next status of agent {} is not Alert, no need to investigate what happened", host.getId()); |
| } |
| } |
| handleDisconnectWithoutInvestigation(attache, event, true, true); |
| host = _hostDao.findById(hostId); // Maybe the host magically reappeared? |
| if (host != null && host.getStatus() == Status.Down) { |
| _haMgr.scheduleRestartForVmsOnHost(host, true); |
| } |
| return true; |
| } |
| |
| protected class DisconnectTask extends ManagedContextRunnable { |
| AgentAttache _attache; |
| Status.Event _event; |
| boolean _investigate; |
| |
| DisconnectTask(final AgentAttache attache, final Status.Event event, final boolean investigate) { |
| _attache = attache; |
| _event = event; |
| _investigate = investigate; |
| } |
| |
| @Override |
| protected void runInContext() { |
| try { |
| if (_investigate == true) { |
| handleDisconnectWithInvestigation(_attache, _event); |
| } else { |
| handleDisconnectWithoutInvestigation(_attache, _event, true, false); |
| } |
| } catch (final Exception e) { |
| logger.error("Exception caught while handling disconnect: ", e); |
| } |
| } |
| } |
| |
| @Override |
| public Answer easySend(final Long hostId, final Command cmd) { |
| try { |
| final Host h = _hostDao.findById(hostId); |
| if (h == null || h.getRemoved() != null) { |
| logger.debug("Host with id {} doesn't exist", hostId); |
| return null; |
| } |
| final Status status = h.getStatus(); |
| if (!status.equals(Status.Up) && !status.equals(Status.Connecting)) { |
| logger.debug("Can not send command {} due to Host {} not being up", cmd, hostId); |
| return null; |
| } |
| final Answer answer = send(hostId, cmd); |
| if (answer == null) { |
| logger.warn("send returns null answer"); |
| return null; |
| } |
| |
| if (logger.isDebugEnabled() && answer.getDetails() != null) { |
| logger.debug("Details from executing {}: {}", cmd.getClass(), answer.getDetails()); |
| } |
| |
| return answer; |
| |
| } catch (final AgentUnavailableException e) { |
| logger.warn(e.getMessage()); |
| return null; |
| } catch (final OperationTimedoutException e) { |
| logger.warn("Operation timed out: {}", e.getMessage()); |
| return null; |
| } catch (final Exception e) { |
| logger.warn("Exception while sending", e); |
| return null; |
| } |
| } |
| |
| @Override |
| public Answer[] send(final Long hostId, final Commands cmds) throws AgentUnavailableException, OperationTimedoutException { |
| int wait = 0; |
| for (final Command cmd : cmds) { |
| if (cmd.getWait() > wait) { |
| wait = cmd.getWait(); |
| } |
| } |
| return send(hostId, cmds, wait); |
| } |
| |
| @Override |
| public void reconnect(final long hostId) throws AgentUnavailableException { |
| HostVO host = _hostDao.findById(hostId); |
| if (host == null) { |
| throw new CloudRuntimeException("Unable to find host: " + hostId); |
| } |
| |
| if (host.getRemoved() != null) { |
| throw new CloudRuntimeException("Host has already been removed: " + hostId); |
| } |
| |
| if (host.getStatus() == Status.Disconnected) { |
| logger.debug("Host is already disconnected, no work to be done: {}", hostId); |
| return; |
| } |
| |
| if (host.getStatus() != Status.Up && host.getStatus() != Status.Alert && host.getStatus() != Status.Rebalancing) { |
| throw new CloudRuntimeException("Unable to disconnect host because it is not in the correct state: host=" + hostId + "; Status=" + host.getStatus()); |
| } |
| |
| AgentAttache attache = findAttache(hostId); |
| if (attache == null) { |
| throw new CloudRuntimeException("Unable to disconnect host because it is not connected to this server: " + hostId); |
| } |
| disconnectWithoutInvestigation(attache, Event.ShutdownRequested); |
| } |
| |
| @Override |
| public void notifyMonitorsOfHostAboutToBeRemoved(long hostId) { |
| for (final Pair<Integer, Listener> monitor : _hostMonitors) { |
| logger.debug("Sending host about to be removed to listener: {}", monitor.second().getClass().getSimpleName()); |
| |
| monitor.second().processHostAboutToBeRemoved(hostId); |
| } |
| } |
| |
| @Override |
| public void notifyMonitorsOfRemovedHost(long hostId, long clusterId) { |
| for (final Pair<Integer, Listener> monitor : _hostMonitors) { |
| logger.debug("Sending host removed to listener: {}", monitor.second().getClass().getSimpleName()); |
| |
| monitor.second().processHostRemoved(hostId, clusterId); |
| } |
| } |
| |
| public boolean executeUserRequest(final long hostId, final Event event) throws AgentUnavailableException { |
| if (event == Event.AgentDisconnected) { |
| logger.debug("Received agent disconnect event for host {}", hostId); |
| AgentAttache attache = null; |
| attache = findAttache(hostId); |
| if (attache != null) { |
| handleDisconnectWithoutInvestigation(attache, Event.AgentDisconnected, true, true); |
| } |
| return true; |
| } |
| if (event == Event.ShutdownRequested) { |
| try { |
| reconnect(hostId); |
| } catch (CloudRuntimeException e) { |
| logger.debug("Error on shutdown request for hostID: {}", hostId, e); |
| return false; |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean isAgentAttached(final long hostId) { |
| final AgentAttache agentAttache = findAttache(hostId); |
| return agentAttache != null; |
| } |
| |
| protected AgentAttache createAttacheForConnect(final HostVO host, final Link link) throws ConnectionException { |
| logger.debug("create ConnectedAgentAttache for {}", host.getId()); |
| final AgentAttache attache = new ConnectedAgentAttache(this, host.getId(), host.getName(), link, host.isInMaintenanceStates()); |
| link.attach(attache); |
| |
| AgentAttache old = null; |
| synchronized (_agents) { |
| old = _agents.put(host.getId(), attache); |
| } |
| if (old != null) { |
| old.disconnect(Status.Removed); |
| } |
| |
| return attache; |
| } |
| |
| private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startup) throws ConnectionException { |
| final List<String> agentMSHostList = new ArrayList<>(); |
| String lbAlgorithm = null; |
| if (startup != null && startup.length > 0) { |
| final String agentMSHosts = startup[0].getMsHostList(); |
| if (StringUtils.isNotEmpty(agentMSHosts)) { |
| String[] msHosts = agentMSHosts.split("@"); |
| if (msHosts.length > 1) { |
| lbAlgorithm = msHosts[1]; |
| } |
| agentMSHostList.addAll(Arrays.asList(msHosts[0].split(","))); |
| } |
| } |
| AgentAttache attache = null; |
| GlobalLock joinLock = getHostJoinLock(host.getId()); |
| if (joinLock.lock(60)) { |
| try { |
| |
| if (!indirectAgentLB.compareManagementServerList(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) { |
| final List<String> newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null); |
| ready.setMsHostList(newMSList); |
| ready.setLbAlgorithm(indirectAgentLB.getLBAlgorithmName()); |
| ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId())); |
| logger.debug("Agent's management server host list is not up to date, sending list update: {}", newMSList); |
| } |
| |
| attache = createAttacheForConnect(host, link); |
| attache = notifyMonitorsOfConnection(attache, startup, false); |
| } finally { |
| joinLock.unlock(); |
| } |
| } else { |
| throw new ConnectionException(true, "Unable to acquire lock on host " + host.getUuid()); |
| } |
| joinLock.releaseRef(); |
| return attache; |
| } |
| |
| private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, final Request request) { |
| AgentAttache attache = null; |
| ReadyCommand ready = null; |
| try { |
| final HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup); |
| if (host != null) { |
| ready = new ReadyCommand(host.getDataCenterId(), host.getId(), NumbersUtil.enableHumanReadableSizes); |
| attache = sendReadyAndGetAttache(host, ready, link, startup); |
| } |
| } catch (final Exception e) { |
| logger.debug("Failed to handle host connection: ", e); |
| ready = new ReadyCommand(null); |
| ready.setDetails(e.toString()); |
| } finally { |
| if (ready == null) { |
| ready = new ReadyCommand(null); |
| } |
| } |
| |
| try { |
| if (attache == null) { |
| final Request readyRequest = new Request(-1, -1, ready, false); |
| link.send(readyRequest.getBytes()); |
| } else { |
| easySend(attache.getId(), ready); |
| } |
| } catch (final Exception e) { |
| logger.debug("Failed to send ready command:" + e.toString()); |
| } |
| return attache; |
| } |
| |
| protected class SimulateStartTask extends ManagedContextRunnable { |
| ServerResource resource; |
| Map<String, String> details; |
| long id; |
| |
| public SimulateStartTask(final long id, final ServerResource resource, final Map<String, String> details) { |
| this.id = id; |
| this.resource = resource; |
| this.details = details; |
| } |
| |
| @Override |
| protected void runInContext() { |
| try { |
| logger.debug("Simulating start for resource {} id {}", resource.getName(), id); |
| |
| if (tapLoadingAgents(id, TapAgentsAction.Add)) { |
| try { |
| final AgentAttache agentattache = findAttache(id); |
| if (agentattache == null) { |
| logger.debug("Creating agent for host {}", id); |
| _resourceMgr.createHostAndAgent(id, resource, details, false, null, false); |
| logger.debug("Completed creating agent for host {}", id); |
| } else { |
| logger.debug("Agent already created in another thread for host {}, ignore this", id); |
| } |
| } finally { |
| tapLoadingAgents(id, TapAgentsAction.Del); |
| } |
| } else { |
| logger.debug("Agent creation already getting processed in another thread for host {}, ignore this", id); |
| } |
| } catch (final Exception e) { |
| logger.warn("Unable to simulate start on resource {} name {}", id, resource.getName(), e); |
| } |
| } |
| } |
| |
| protected class HandleAgentConnectTask extends ManagedContextRunnable { |
| Link _link; |
| Command[] _cmds; |
| Request _request; |
| |
| HandleAgentConnectTask(final Link link, final Command[] cmds, final Request request) { |
| _link = link; |
| _cmds = cmds; |
| _request = request; |
| } |
| |
| @Override |
| protected void runInContext() { |
| _request.logD("Processing the first command "); |
| final StartupCommand[] startups = new StartupCommand[_cmds.length]; |
| for (int i = 0; i < _cmds.length; i++) { |
| startups[i] = (StartupCommand)_cmds[i]; |
| } |
| |
| final AgentAttache attache = handleConnectedAgent(_link, startups, _request); |
| if (attache == null) { |
| logger.warn("Unable to create attache for agent: {}", _request); |
| } |
| } |
| } |
| |
| protected void connectAgent(final Link link, final Command[] cmds, final Request request) { |
| // send startupanswer to agent in the very beginning, so agent can move on without waiting for the answer for an undetermined time, if we put this logic into another |
| // thread pool. |
| final StartupAnswer[] answers = new StartupAnswer[cmds.length]; |
| Command cmd; |
| for (int i = 0; i < cmds.length; i++) { |
| cmd = cmds[i]; |
| if (cmd instanceof StartupRoutingCommand || cmd instanceof StartupProxyCommand || cmd instanceof StartupSecondaryStorageCommand || |
| cmd instanceof StartupStorageCommand) { |
| answers[i] = new StartupAnswer((StartupCommand) cmds[i], 0, mgmtServiceConf.getPingInterval()); |
| break; |
| } |
| } |
| Response response = null; |
| response = new Response(request, answers[0], _nodeId, -1); |
| try { |
| link.send(response.toBytes()); |
| } catch (final ClosedChannelException e) { |
| logger.debug("Failed to send startupanswer: {}", e.toString()); |
| } |
| _connectExecutor.execute(new HandleAgentConnectTask(link, cmds, request)); |
| } |
| |
| public class AgentHandler extends Task { |
| public AgentHandler(final Task.Type type, final Link link, final byte[] data) { |
| super(type, link, data); |
| } |
| |
| private void processHostHealthCheckResult(Boolean hostHealthCheckResult, long hostId) { |
| if (hostHealthCheckResult == null) { |
| return; |
| } |
| HostVO host = _hostDao.findById(hostId); |
| if (host == null) { |
| logger.error("Unable to find host with ID: {}", hostId); |
| return; |
| } |
| if (!BooleanUtils.toBoolean(EnableKVMAutoEnableDisable.valueIn(host.getClusterId()))) { |
| logger.debug("{} is disabled for the cluster {}, cannot process the health check result " + |
| "received for the host {}", EnableKVMAutoEnableDisable.key(), host.getClusterId(), host.getName()); |
| return; |
| } |
| |
| ResourceState.Event resourceEvent = hostHealthCheckResult ? ResourceState.Event.Enable : ResourceState.Event.Disable; |
| |
| try { |
| logger.info("Host health check {}, auto {} KVM host: {}", |
| hostHealthCheckResult ? "succeeds" : "fails", |
| hostHealthCheckResult ? "enabling" : "disabling", |
| host.getName()); |
| _resourceMgr.autoUpdateHostAllocationState(hostId, resourceEvent); |
| } catch (NoTransitionException e) { |
| logger.error("Cannot Auto {} host: {}", resourceEvent, host.getName(), e); |
| } |
| } |
| |
| private void processStartupRoutingCommand(StartupRoutingCommand startup, long hostId) { |
| if (startup == null) { |
| logger.error("Empty StartupRoutingCommand received"); |
| return; |
| } |
| Boolean hostHealthCheckResult = startup.getHostHealthCheckResult(); |
| processHostHealthCheckResult(hostHealthCheckResult, hostId); |
| } |
| |
| private void processPingRoutingCommand(PingRoutingCommand pingRoutingCommand, long hostId) { |
| if (pingRoutingCommand == null) { |
| logger.error("Empty PingRoutingCommand received"); |
| return; |
| } |
| Boolean hostHealthCheckResult = pingRoutingCommand.getHostHealthCheckResult(); |
| processHostHealthCheckResult(hostHealthCheckResult, hostId); |
| } |
| |
| protected void processRequest(final Link link, final Request request) { |
| final AgentAttache attache = (AgentAttache)link.attachment(); |
| final Command[] cmds = request.getCommands(); |
| Command cmd = cmds[0]; |
| boolean logD = true; |
| |
| if (attache == null) { |
| if (!(cmd instanceof StartupCommand)) { |
| logger.warn("Throwing away a request because it came through as the first command on a connect: {}", request); |
| } else { |
| // submit the task for execution |
| request.logD("Scheduling the first command "); |
| connectAgent(link, cmds, request); |
| } |
| return; |
| } else if (cmd instanceof StartupCommand) { |
| connectAgent(link, cmds, request); |
| } |
| |
| final long hostId = attache.getId(); |
| final String hostName = attache.getName(); |
| |
| if (logger.isDebugEnabled()) { |
| if (cmd instanceof PingRoutingCommand) { |
| logD = false; |
| logger.debug("Ping from Routing host {}({})", hostId, hostName); |
| logger.trace("SeqA {}-{}: Processing {}", hostId, request.getSequence(), request); |
| } else if (cmd instanceof PingCommand) { |
| logD = false; |
| logger.debug("Ping from {}({})", hostId, hostName); |
| logger.trace("SeqA {}-{}: Processing {}", hostId, request.getSequence(), request); |
| } else { |
| logger.debug("SeqA {}-{}: {}", hostId, request.getSequence(), request); |
| } |
| } |
| |
| final Answer[] answers = new Answer[cmds.length]; |
| for (int i = 0; i < cmds.length; i++) { |
| cmd = cmds[i]; |
| Answer answer = null; |
| try { |
| if (cmd instanceof StartupRoutingCommand) { |
| final StartupRoutingCommand startup = (StartupRoutingCommand) cmd; |
| processStartupRoutingCommand(startup, hostId); |
| answer = new StartupAnswer(startup, attache.getId(), mgmtServiceConf.getPingInterval()); |
| } else if (cmd instanceof StartupProxyCommand) { |
| final StartupProxyCommand startup = (StartupProxyCommand) cmd; |
| answer = new StartupAnswer(startup, attache.getId(), mgmtServiceConf.getPingInterval()); |
| } else if (cmd instanceof StartupSecondaryStorageCommand) { |
| final StartupSecondaryStorageCommand startup = (StartupSecondaryStorageCommand) cmd; |
| answer = new StartupAnswer(startup, attache.getId(), mgmtServiceConf.getPingInterval()); |
| } else if (cmd instanceof StartupStorageCommand) { |
| final StartupStorageCommand startup = (StartupStorageCommand) cmd; |
| answer = new StartupAnswer(startup, attache.getId(), mgmtServiceConf.getPingInterval()); |
| } else if (cmd instanceof ShutdownCommand) { |
| final ShutdownCommand shutdown = (ShutdownCommand)cmd; |
| final String reason = shutdown.getReason(); |
| logger.info("Host {} has informed us that it is shutting down with reason {} and detail {}", attache.getId(), reason, shutdown.getDetail()); |
| if (reason.equals(ShutdownCommand.Update)) { |
| // disconnectWithoutInvestigation(attache, Event.UpdateNeeded); |
| throw new CloudRuntimeException("Agent update not implemented"); |
| } else if (reason.equals(ShutdownCommand.Requested)) { |
| disconnectWithoutInvestigation(attache, Event.ShutdownRequested); |
| } |
| return; |
| } else if (cmd instanceof AgentControlCommand) { |
| answer = handleControlCommand(attache, (AgentControlCommand)cmd); |
| } else { |
| handleCommands(attache, request.getSequence(), new Command[] {cmd}); |
| if (cmd instanceof PingCommand) { |
| final long cmdHostId = ((PingCommand)cmd).getHostId(); |
| boolean requestStartupCommand = false; |
| |
| final HostVO host = _hostDao.findById(Long.valueOf(cmdHostId)); |
| boolean gatewayAccessible = true; |
| // if the router is sending a ping, verify the |
| // gateway was pingable |
| if (cmd instanceof PingRoutingCommand) { |
| processPingRoutingCommand((PingRoutingCommand) cmd, hostId); |
| gatewayAccessible = ((PingRoutingCommand)cmd).isGatewayAccessible(); |
| |
| if (host != null) { |
| if (!gatewayAccessible) { |
| // alert that host lost connection to |
| // gateway (cannot ping the default route) |
| final DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId()); |
| final HostPodVO podVO = _podDao.findById(host.getPodId()); |
| final String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName(); |
| |
| _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId(), "Host lost connection to gateway, " + hostDesc, |
| "Host [" + hostDesc + "] lost connection to gateway (default route) and is possibly having network connection issues."); |
| } else { |
| _alertMgr.clearAlert(AlertManager.AlertType.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId()); |
| } |
| } else { |
| logger.debug("Not processing {} for agent id={}; can't find the host in the DB", PingRoutingCommand.class.getSimpleName(), cmdHostId); |
| } |
| } |
| if (host!= null && host.getStatus() != Status.Up && gatewayAccessible) { |
| requestStartupCommand = true; |
| } |
| answer = new PingAnswer((PingCommand)cmd, requestStartupCommand); |
| } else if (cmd instanceof ReadyAnswer) { |
| final HostVO host = _hostDao.findById(attache.getId()); |
| if (host == null) { |
| logger.debug("Cant not find host {}", attache.getId()); |
| } |
| answer = new Answer(cmd); |
| } else { |
| answer = new Answer(cmd); |
| } |
| } |
| } catch (final Throwable th) { |
| logger.warn("Caught: ", th); |
| answer = new Answer(cmd, false, th.getMessage()); |
| } |
| answers[i] = answer; |
| } |
| |
| final Response response = new Response(request, answers, _nodeId, attache.getId()); |
| if (logD) { |
| logger.debug("SeqA {}-: Sending {}", attache.getId(), response.getSequence(), response); |
| } else { |
| logger.trace("SeqA {}-: Sending {}" + attache.getId(), response.getSequence(), response); |
| } |
| try { |
| link.send(response.toBytes()); |
| } catch (final ClosedChannelException e) { |
| logger.warn("Unable to send response because connection is closed: {}", response); |
| } |
| } |
| |
| protected void processResponse(final Link link, final Response response) { |
| final AgentAttache attache = (AgentAttache)link.attachment(); |
| if (attache == null) { |
| logger.warn("Unable to process: {}", response); |
| } else if (!attache.processAnswers(response.getSequence(), response)) { |
| logger.info("Host {} - Seq {}: Response is not processed: {}", attache.getId(), response.getSequence(), response); |
| } |
| } |
| |
| @Override |
| protected void doTask(final Task task) throws TaskExecutionException { |
| final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB); |
| try { |
| final Type type = task.getType(); |
| if (type == Task.Type.DATA) { |
| final byte[] data = task.getData(); |
| try { |
| final Request event = Request.parse(data); |
| if (event instanceof Response) { |
| processResponse(task.getLink(), (Response)event); |
| } else { |
| processRequest(task.getLink(), event); |
| } |
| } catch (final UnsupportedVersionException e) { |
| logger.warn(e.getMessage()); |
| // upgradeAgent(task.getLink(), data, e.getReason()); |
| } catch (final ClassNotFoundException e) { |
| final String message = String.format("Exception occurred when executing tasks! Error '%s'", e.getMessage()); |
| logger.error(message); |
| throw new TaskExecutionException(message, e); |
| } |
| } else if (type == Task.Type.CONNECT) { |
| } else if (type == Task.Type.DISCONNECT) { |
| final Link link = task.getLink(); |
| final AgentAttache attache = (AgentAttache)link.attachment(); |
| if (attache != null) { |
| disconnectWithInvestigation(attache, Event.AgentDisconnected); |
| } else { |
| logger.info("Connection from {} closed but no cleanup was done.", link.getIpAddress()); |
| link.close(); |
| link.terminated(); |
| } |
| } |
| } finally { |
| txn.close(); |
| } |
| } |
| } |
| |
| protected AgentManagerImpl() { |
| } |
| |
| public boolean tapLoadingAgents(final Long hostId, final TapAgentsAction action) { |
| synchronized (_loadingAgents) { |
| if (action == TapAgentsAction.Add) { |
| if (_loadingAgents.contains(hostId)) { |
| return false; |
| } else { |
| _loadingAgents.add(hostId); |
| } |
| } else if (action == TapAgentsAction.Del) { |
| _loadingAgents.remove(hostId); |
| } else if (action == TapAgentsAction.Contains) { |
| return _loadingAgents.contains(hostId); |
| } else { |
| throw new CloudRuntimeException("Unknown TapAgentsAction " + action); |
| } |
| } |
| return true; |
| } |
| |
| @Override |
| public boolean agentStatusTransitTo(final HostVO host, final Status.Event e, final long msId) { |
| try { |
| _agentStatusLock.lock(); |
| logger.debug("[Resource state = {}, Agent event = , Host id = {}, name = {}]", host.getResourceState(), e.toString(), host.getId(), host.getName()); |
| |
| host.setManagementServerId(msId); |
| try { |
| return _statusStateMachine.transitTo(host, e, host.getId(), _hostDao); |
| } catch (final NoTransitionException e1) { |
| logger.debug("Cannot transit agent status with event {} for host {}, name={}, management server id is {}", e, host.getId(), host.getName(), msId); |
| throw new CloudRuntimeException("Cannot transit agent status with event " + e + " for host " + host.getId() + ", management server id is " + msId + "," + e1.getMessage()); |
| } |
| } finally { |
| _agentStatusLock.unlock(); |
| } |
| } |
| |
| public boolean disconnectAgent(final HostVO host, final Status.Event e, final long msId) { |
| host.setDisconnectedOn(new Date()); |
| if (e.equals(Status.Event.Remove)) { |
| host.setGuid(null); |
| host.setClusterId(null); |
| } |
| |
| return agentStatusTransitTo(host, e, msId); |
| } |
| |
| protected void disconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event) { |
| _executor.submit(new DisconnectTask(attache, event, false)); |
| } |
| |
| public void disconnectWithInvestigation(final AgentAttache attache, final Status.Event event) { |
| _executor.submit(new DisconnectTask(attache, event, true)); |
| } |
| |
| protected boolean isHostOwnerSwitched(final long hostId) { |
| final HostVO host = _hostDao.findById(hostId); |
| if (host == null) { |
| logger.warn("Can't find the host {}", hostId); |
| return false; |
| } |
| return isHostOwnerSwitched(host); |
| } |
| |
| protected boolean isHostOwnerSwitched(final HostVO host) { |
| if (host.getStatus() == Status.Up && host.getManagementServerId() != null && host.getManagementServerId() != _nodeId) { |
| return true; |
| } |
| return false; |
| } |
| |
| private void disconnectInternal(final long hostId, final Status.Event event, final boolean invstigate) { |
| final AgentAttache attache = findAttache(hostId); |
| |
| if (attache != null) { |
| if (!invstigate) { |
| disconnectWithoutInvestigation(attache, event); |
| } else { |
| disconnectWithInvestigation(attache, event); |
| } |
| } else { |
| /* Agent is still in connecting process, don't allow to disconnect right away */ |
| if (tapLoadingAgents(hostId, TapAgentsAction.Contains)) { |
| logger.info("Host {} is being loaded no disconnects needed.", hostId); |
| return; |
| } |
| |
| final HostVO host = _hostDao.findById(hostId); |
| if (host != null && host.getRemoved() == null) { |
| disconnectAgent(host, event, _nodeId); |
| } |
| } |
| } |
| |
| @Override |
| public void disconnectWithInvestigation(final long hostId, final Status.Event event) { |
| disconnectInternal(hostId, event, true); |
| } |
| |
| @Override |
| public void disconnectWithoutInvestigation(final long hostId, final Status.Event event) { |
| disconnectInternal(hostId, event, false); |
| } |
| |
| @Override |
| public boolean handleDirectConnectAgent(final Host host, final StartupCommand[] cmds, final ServerResource resource, final boolean forRebalance, boolean newHost) throws ConnectionException { |
| AgentAttache attache; |
| |
| attache = createAttacheForDirectConnect(host, resource); |
| final StartupAnswer[] answers = new StartupAnswer[cmds.length]; |
| for (int i = 0; i < answers.length; i++) { |
| answers[i] = new StartupAnswer(cmds[i], attache.getId(), mgmtServiceConf.getPingInterval()); |
| } |
| attache.process(answers); |
| |
| if (newHost) { |
| notifyMonitorsOfNewlyAddedHost(host.getId()); |
| } |
| |
| attache = notifyMonitorsOfConnection(attache, cmds, forRebalance); |
| |
| return attache != null; |
| } |
| |
| @Override |
| public void pullAgentToMaintenance(final long hostId) { |
| final AgentAttache attache = findAttache(hostId); |
| if (attache != null) { |
| attache.setMaintenanceMode(true); |
| // Now cancel all of the commands except for the active one. |
| attache.cancelAllCommands(Status.Disconnected, false); |
| } |
| } |
| |
| @Override |
| public void pullAgentOutMaintenance(final long hostId) { |
| final AgentAttache attache = findAttache(hostId); |
| if (attache != null) { |
| attache.setMaintenanceMode(false); |
| } |
| } |
| |
| public ScheduledExecutorService getDirectAgentPool() { |
| return _directAgentExecutor; |
| } |
| |
| public ScheduledExecutorService getCronJobPool() { |
| return _cronJobExecutor; |
| } |
| |
| public int getDirectAgentThreadCap() { |
| return _directAgentThreadCap; |
| } |
| |
| public Long getAgentPingTime(final long agentId) { |
| return _pingMap.get(agentId); |
| } |
| |
| public void pingBy(final long agentId) { |
| // Update PingMap with the latest time if agent entry exists in the PingMap |
| if (_pingMap.replace(agentId, InaccurateClock.getTimeInSeconds()) == null) { |
| logger.info("PingMap for agent: " + agentId + " will not be updated because agent is no longer in the PingMap"); |
| } |
| } |
| |
| protected class MonitorTask extends ManagedContextRunnable { |
| @Override |
| protected void runInContext() { |
| logger.trace("Agent Monitor is started."); |
| |
| try { |
| final List<Long> behindAgents = findAgentsBehindOnPing(); |
| for (final Long agentId : behindAgents) { |
| final QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class); |
| sc.and(sc.entity().getId(), Op.EQ, agentId); |
| final HostVO h = sc.find(); |
| if (h != null) { |
| final ResourceState resourceState = h.getResourceState(); |
| if (resourceState == ResourceState.Disabled || resourceState == ResourceState.Maintenance) { |
| /* |
| * Host is in non-operation state, so no investigation and direct put agent to Disconnected |
| */ |
| logger.debug("Ping timeout but agent {} is in resource state of {}, so no investigation", agentId, resourceState); |
| disconnectWithoutInvestigation(agentId, Event.ShutdownRequested); |
| } else { |
| final HostVO host = _hostDao.findById(agentId); |
| if (host != null |
| && (host.getType() == Host.Type.ConsoleProxy || host.getType() == Host.Type.SecondaryStorageVM || host.getType() == Host.Type.SecondaryStorageCmdExecutor)) { |
| |
| logger.warn("Disconnect agent for CPVM/SSVM due to physical connection close. host: {}", host.getId()); |
| disconnectWithoutInvestigation(agentId, Event.ShutdownRequested); |
| } else { |
| logger.debug("Ping timeout for agent {}, do investigation", agentId); |
| disconnectWithInvestigation(agentId, Event.PingTimeout); |
| } |
| } |
| } |
| } |
| |
| final QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class); |
| sc.and(sc.entity().getResourceState(), Op.IN, |
| ResourceState.PrepareForMaintenance, |
| ResourceState.ErrorInPrepareForMaintenance); |
| final List<HostVO> hosts = sc.list(); |
| |
| for (final HostVO host : hosts) { |
| if (_resourceMgr.checkAndMaintain(host.getId())) { |
| final DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId()); |
| final HostPodVO podVO = _podDao.findById(host.getPodId()); |
| final String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName(); |
| _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Migration Complete for host " + hostDesc, |
| "Host [" + hostDesc + "] is ready for maintenance"); |
| } |
| } |
| } catch (final Throwable th) { |
| logger.error("Caught the following exception: ", th); |
| } |
| |
| logger.trace("Agent Monitor is leaving the building!"); |
| } |
| |
| protected List<Long> findAgentsBehindOnPing() { |
| final List<Long> agentsBehind = new ArrayList<Long>(); |
| final long cutoffTime = InaccurateClock.getTimeInSeconds() - mgmtServiceConf.getTimeout(); |
| for (final Map.Entry<Long, Long> entry : _pingMap.entrySet()) { |
| if (entry.getValue() < cutoffTime) { |
| agentsBehind.add(entry.getKey()); |
| } |
| } |
| |
| if (agentsBehind.size() > 0) { |
| logger.info("Found the following agents behind on ping: {}", agentsBehind); |
| } |
| |
| return agentsBehind; |
| } |
| } |
| |
| protected class BehindOnPingListener implements Listener { |
| @Override |
| public boolean isRecurring() { |
| return true; |
| } |
| |
| @Override |
| public boolean processAnswers(final long agentId, final long seq, final Answer[] answers) { |
| return false; |
| } |
| |
| @Override |
| public boolean processCommands(final long agentId, final long seq, final Command[] commands) { |
| final boolean processed = false; |
| for (final Command cmd : commands) { |
| if (cmd instanceof PingCommand) { |
| pingBy(agentId); |
| } |
| } |
| return processed; |
| } |
| |
| @Override |
| public AgentControlAnswer processControlCommand(final long agentId, final AgentControlCommand cmd) { |
| return null; |
| } |
| |
| @Override |
| public void processHostAdded(long hostId) { |
| } |
| |
| @Override |
| public void processConnect(final Host host, final StartupCommand cmd, final boolean forRebalance) { |
| if (host.getType().equals(Host.Type.TrafficMonitor) || host.getType().equals(Host.Type.SecondaryStorage)) { |
| return; |
| } |
| |
| // NOTE: We don't use pingBy here because we're initiating. |
| _pingMap.put(host.getId(), InaccurateClock.getTimeInSeconds()); |
| } |
| |
| @Override |
| public boolean processDisconnect(final long agentId, final Status state) { |
| _pingMap.remove(agentId); |
| return true; |
| } |
| |
| @Override |
| public void processHostAboutToBeRemoved(long hostId) { |
| } |
| |
| @Override |
| public void processHostRemoved(long hostId, long clusterId) { |
| } |
| |
| @Override |
| public boolean processTimeout(final long agentId, final long seq) { |
| return true; |
| } |
| |
| @Override |
| public int getTimeout() { |
| return -1; |
| } |
| |
| } |
| |
| @Override |
| public String getConfigComponentName() { |
| return AgentManager.class.getSimpleName(); |
| } |
| |
| @Override |
| public ConfigKey<?>[] getConfigKeys() { |
| return new ConfigKey<?>[] { CheckTxnBeforeSending, Workers, Port, Wait, AlertWait, DirectAgentLoadSize, |
| DirectAgentPoolSize, DirectAgentThreadCap, EnableKVMAutoEnableDisable, ReadyCommandWait }; |
| } |
| |
| protected class SetHostParamsListener implements Listener { |
| @Override |
| public boolean isRecurring() { |
| return false; |
| } |
| |
| @Override |
| public boolean processAnswers(final long agentId, final long seq, final Answer[] answers) { |
| return false; |
| } |
| |
| @Override |
| public boolean processCommands(final long agentId, final long seq, final Command[] commands) { |
| return false; |
| } |
| |
| @Override |
| public AgentControlAnswer processControlCommand(final long agentId, final AgentControlCommand cmd) { |
| return null; |
| } |
| |
| @Override |
| public void processHostAdded(long hostId) { |
| } |
| |
| @Override |
| public void processConnect(final Host host, final StartupCommand cmd, final boolean forRebalance) { |
| if (cmd instanceof StartupRoutingCommand) { |
| if (((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.KVM || ((StartupRoutingCommand)cmd).getHypervisorType() == HypervisorType.LXC) { |
| Map<String, String> params = new HashMap<String, String>(); |
| params.put(Config.RouterAggregationCommandEachTimeout.toString(), _configDao.getValue(Config.RouterAggregationCommandEachTimeout.toString())); |
| params.put(Config.MigrateWait.toString(), _configDao.getValue(Config.MigrateWait.toString())); |
| params.put(NetworkOrchestrationService.TUNGSTEN_ENABLED.key(), String.valueOf(NetworkOrchestrationService.TUNGSTEN_ENABLED.valueIn(host.getDataCenterId()))); |
| |
| try { |
| SetHostParamsCommand cmds = new SetHostParamsCommand(params); |
| Commands c = new Commands(cmds); |
| send(host.getId(), c, this); |
| } catch (AgentUnavailableException e) { |
| logger.debug("Failed to send host params on host: " + host.getId()); |
| } |
| } |
| } |
| |
| } |
| |
| @Override |
| public boolean processDisconnect(final long agentId, final Status state) { |
| return true; |
| } |
| |
| @Override |
| public void processHostAboutToBeRemoved(long hostId) { |
| } |
| |
| @Override |
| public void processHostRemoved(long hostId, long clusterId) { |
| } |
| |
| @Override |
| public boolean processTimeout(final long agentId, final long seq) { |
| return false; |
| } |
| |
| @Override |
| public int getTimeout() { |
| return -1; |
| } |
| |
| } |
| |
| protected Map<Long, List<Long>> getHostsPerZone() { |
| List<HostVO> allHosts = _resourceMgr.listAllHostsInAllZonesByType(Host.Type.Routing); |
| if (allHosts == null) { |
| return null; |
| } |
| Map<Long, List<Long>> hostsByZone = new HashMap<Long, List<Long>>(); |
| for (HostVO host : allHosts) { |
| if (host.getHypervisorType() == HypervisorType.KVM || host.getHypervisorType() == HypervisorType.LXC) { |
| Long zoneId = host.getDataCenterId(); |
| List<Long> hostIds = hostsByZone.get(zoneId); |
| if (hostIds == null) { |
| hostIds = new ArrayList<Long>(); |
| } |
| hostIds.add(host.getId()); |
| hostsByZone.put(zoneId, hostIds); |
| } |
| } |
| return hostsByZone; |
| } |
| |
| private void sendCommandToAgents(Map<Long, List<Long>> hostsPerZone, Map<String, String> params) { |
| SetHostParamsCommand cmds = new SetHostParamsCommand(params); |
| for (Long zoneId : hostsPerZone.keySet()) { |
| List<Long> hostIds = hostsPerZone.get(zoneId); |
| for (Long hostId : hostIds) { |
| Answer answer = easySend(hostId, cmds); |
| if (answer == null || !answer.getResult()) { |
| logger.error("Error sending parameters to agent {}", hostId); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void propagateChangeToAgents(Map<String, String> params) { |
| if (params != null && ! params.isEmpty()) { |
| logger.debug("Propagating changes on host parameters to the agents"); |
| Map<Long, List<Long>> hostsPerZone = getHostsPerZone(); |
| sendCommandToAgents(hostsPerZone, params); |
| } |
| } |
| |
| private GlobalLock getHostJoinLock(Long hostId) { |
| return GlobalLock.getInternLock(String.format("%s-%s", "Host-Join", hostId)); |
| } |
| } |