| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| package com.cloud.agent; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.io.StringWriter; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.UnknownHostException; |
| import java.nio.channels.ClosedChannelException; |
| import java.nio.charset.Charset; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import javax.naming.ConfigurationException; |
| |
| import org.apache.cloudstack.agent.lb.SetupMSListAnswer; |
| import org.apache.cloudstack.agent.lb.SetupMSListCommand; |
| import org.apache.cloudstack.ca.PostCertificateRenewalCommand; |
| import org.apache.cloudstack.ca.SetupCertificateAnswer; |
| import org.apache.cloudstack.ca.SetupCertificateCommand; |
| import org.apache.cloudstack.ca.SetupKeyStoreCommand; |
| import org.apache.cloudstack.ca.SetupKeystoreAnswer; |
| import org.apache.cloudstack.managed.context.ManagedContextTimerTask; |
| import org.apache.cloudstack.utils.security.KeyStoreUtils; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.lang3.ObjectUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| import org.apache.logging.log4j.ThreadContext; |
| |
| import com.cloud.agent.api.AgentControlAnswer; |
| import com.cloud.agent.api.AgentControlCommand; |
| import com.cloud.agent.api.Answer; |
| import com.cloud.agent.api.Command; |
| import com.cloud.agent.api.CronCommand; |
| import com.cloud.agent.api.MaintainAnswer; |
| import com.cloud.agent.api.MaintainCommand; |
| import com.cloud.agent.api.PingAnswer; |
| import com.cloud.agent.api.PingCommand; |
| import com.cloud.agent.api.ReadyCommand; |
| import com.cloud.agent.api.ShutdownCommand; |
| import com.cloud.agent.api.StartupAnswer; |
| import com.cloud.agent.api.StartupCommand; |
| import com.cloud.agent.transport.Request; |
| import com.cloud.agent.transport.Response; |
| import com.cloud.exception.AgentControlChannelException; |
| import com.cloud.host.Host; |
| import com.cloud.resource.AgentStatusUpdater; |
| import com.cloud.resource.ResourceStatusUpdater; |
| import com.cloud.resource.ServerResource; |
| import com.cloud.utils.NumbersUtil; |
| import com.cloud.utils.PropertiesUtil; |
| import com.cloud.utils.concurrency.NamedThreadFactory; |
| import com.cloud.utils.exception.CloudRuntimeException; |
| import com.cloud.utils.exception.NioConnectionException; |
| import com.cloud.utils.exception.TaskExecutionException; |
| import com.cloud.utils.nio.HandlerFactory; |
| import com.cloud.utils.nio.Link; |
| import com.cloud.utils.nio.NioClient; |
| import com.cloud.utils.nio.NioConnection; |
| import com.cloud.utils.nio.Task; |
| import com.cloud.utils.script.OutputInterpreter; |
| import com.cloud.utils.script.Script; |
| |
| /** |
| * @config |
| * {@table |
| * || Param Name | Description | Values | Default || |
| * || type | Type of server | Storage / Computing / Routing | No Default || |
| * || workers | # of workers to process the requests | int | 1 || |
| * || host | host to connect to | ip address | localhost || |
| * || port | port to connect to | port number | 8250 || |
| * || instance | Used to allow multiple agents running on the same host | String | none || * } |
| * |
| * For more configuration options, see the individual types. |
| * |
| **/ |
| public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater { |
| protected Logger logger = LogManager.getLogger(getClass()); |
| |
| public enum ExitStatus { |
| Normal(0), // Normal status = 0. |
| Upgrade(65), // Exiting for upgrade. |
| Configuration(66), // Exiting due to configuration problems. |
| Error(67); // Exiting because of error. |
| |
| final int value; |
| |
| ExitStatus(final int value) { |
| this.value = value; |
| } |
| |
| public int value() { |
| return value; |
| } |
| } |
| |
| CopyOnWriteArrayList<IAgentControlListener> controlListeners = new CopyOnWriteArrayList<>(); |
| |
| IAgentShell shell; |
| NioConnection connection; |
| ServerResource serverResource; |
| Link link; |
| Long id; |
| String _uuid; |
| String _name; |
| |
| ScheduledExecutorService selfTaskExecutor; |
| ScheduledExecutorService certExecutor; |
| ScheduledExecutorService hostLbCheckExecutor; |
| |
| CopyOnWriteArrayList<ScheduledFuture<?>> watchList = new CopyOnWriteArrayList<>(); |
| AtomicLong sequence = new AtomicLong(0); |
| AtomicLong lastPingResponseTime = new AtomicLong(0L); |
| long pingInterval = 0; |
| AtomicInteger commandsInProgress = new AtomicInteger(0); |
| |
| private final AtomicReference<StartupTask> startupTask = new AtomicReference<>(); |
| private static final long DEFAULT_STARTUP_WAIT = 180; |
| long startupWait = DEFAULT_STARTUP_WAIT; |
| boolean reconnectAllowed = true; |
| |
| //For time sensitive task, e.g. PingTask |
| ThreadPoolExecutor outRequestHandler; |
| ExecutorService requestHandler; |
| |
| Thread shutdownThread = new ShutdownThread(this); |
| |
| private String keystoreSetupSetupPath; |
| private String keystoreCertImportScriptPath; |
| |
| private String hostname; |
| |
| protected String getLinkLog(final Link link) { |
| if (link == null) { |
| return ""; |
| } |
| StringBuilder str = new StringBuilder(); |
| if (logger.isTraceEnabled()) { |
| str.append(System.identityHashCode(link)).append("-"); |
| } |
| str.append(link.getSocketAddress()); |
| return str.toString(); |
| } |
| |
| protected String getAgentName() { |
| return (serverResource != null && serverResource.isAppendAgentNameToLogs() && |
| StringUtils.isNotBlank(serverResource.getName())) ? |
| serverResource.getName() : |
| "Agent"; |
| } |
| |
| protected void setupShutdownHookAndInitExecutors() { |
| logger.trace("Adding shutdown hook"); |
| Runtime.getRuntime().addShutdownHook(shutdownThread); |
| selfTaskExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Agent-SelfTask")); |
| outRequestHandler = new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, |
| new SynchronousQueue<>(), new NamedThreadFactory("AgentOutRequest-Handler")); |
| requestHandler = new ThreadPoolExecutor(shell.getWorkers(), 5 * shell.getWorkers(), 1, TimeUnit.DAYS, |
| new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentRequest-Handler")); |
| } |
| |
| /** |
| * Constructor for the {@code Agent} class, intended for simulator use only. |
| * |
| * <p>This constructor initializes the agent with a provided {@link IAgentShell}. |
| * It sets up the necessary NIO client connection, establishes a shutdown hook, |
| * and initializes the thread executors. |
| * |
| * @param shell the {@link IAgentShell} instance that provides agent configuration and runtime information. |
| */ |
| public Agent(final IAgentShell shell) { |
| this.shell = shell; |
| this.link = null; |
| this.connection = new NioClient( |
| getAgentName(), |
| this.shell.getNextHost(), |
| this.shell.getPort(), |
| this.shell.getWorkers(), |
| this.shell.getSslHandshakeTimeout(), |
| this |
| ); |
| setupShutdownHookAndInitExecutors(); |
| } |
| |
| public Agent(final IAgentShell shell, final int localAgentId, final ServerResource resource) throws ConfigurationException { |
| this.shell = shell; |
| serverResource = resource; |
| link = null; |
| resource.setAgentControl(this); |
| final String value = shell.getPersistentProperty(getResourceName(), "id"); |
| _uuid = shell.getPersistentProperty(getResourceName(), "uuid"); |
| _name = shell.getPersistentProperty(getResourceName(), "name"); |
| id = value != null ? Long.parseLong(value) : null; |
| logger.info("Initialising agent [id: {}, uuid: {}, name: {}]", ObjectUtils.defaultIfNull(id, ""), _uuid, _name); |
| |
| final Map<String, Object> params = new HashMap<>(); |
| // merge with properties from command line to let resource access command line parameters |
| for (final Map.Entry<String, Object> cmdLineProp : this.shell.getCmdLineProperties().entrySet()) { |
| params.put(cmdLineProp.getKey(), cmdLineProp.getValue()); |
| } |
| if (!serverResource.configure(getResourceName(), params)) { |
| throw new ConfigurationException("Unable to configure " + serverResource.getName()); |
| } |
| ThreadContext.put("agentname", getAgentName()); |
| final String host = this.shell.getNextHost(); |
| connection = new NioClient(getAgentName(), host, this.shell.getPort(), this.shell.getWorkers(), |
| this.shell.getSslHandshakeTimeout(), this); |
| setupShutdownHookAndInitExecutors(); |
| logger.info("{} with host = {}, local id = {}", this, host, localAgentId); |
| } |
| |
| |
| @Override |
| public String toString() { |
| return String.format("Agent [id = %s, uuid = %s, name = %s, type = %s, zone = %s, pod = %s, workers = %d, port = %d]", |
| ObjectUtils.defaultIfNull(id, "new"), |
| _uuid, |
| _name, |
| getResourceName(), |
| this.shell.getZone(), |
| this.shell.getPod(), |
| this.shell.getWorkers(), |
| this.shell.getPort()); |
| } |
| |
| public String getVersion() { |
| return shell.getVersion(); |
| } |
| |
| public String getResourceGuid() { |
| final String guid = shell.getGuid(); |
| return guid + "-" + getResourceName(); |
| } |
| |
| public String getZone() { |
| return shell.getZone(); |
| } |
| |
| public String getPod() { |
| return shell.getPod(); |
| } |
| |
| protected void setLink(final Link link) { |
| this.link = link; |
| } |
| |
| public ServerResource getResource() { |
| return serverResource; |
| } |
| |
| public String getResourceName() { |
| return serverResource.getClass().getSimpleName(); |
| } |
| |
| /** |
| * In case of a software based agent restart, this method |
| * can help to perform explicit garbage collection of any old |
| * agent instances and its inner objects. |
| */ |
| private void scavengeOldAgentObjects() { |
| requestHandler.submit(() -> { |
| try { |
| Thread.sleep(2000L); |
| } catch (final InterruptedException ignored) { |
| } finally { |
| System.gc(); |
| } |
| }); |
| } |
| |
| public void start() { |
| if (!serverResource.start()) { |
| String msg = String.format("Unable to start the resource: %s", serverResource.getName()); |
| logger.error(msg); |
| throw new CloudRuntimeException(msg); |
| } |
| |
| keystoreSetupSetupPath = Script.findScript("scripts/util/", KeyStoreUtils.KS_SETUP_SCRIPT); |
| if (keystoreSetupSetupPath == null) { |
| throw new CloudRuntimeException(String.format("Unable to find the '%s' script", KeyStoreUtils.KS_SETUP_SCRIPT)); |
| } |
| |
| keystoreCertImportScriptPath = Script.findScript("scripts/util/", KeyStoreUtils.KS_IMPORT_SCRIPT); |
| if (keystoreCertImportScriptPath == null) { |
| throw new CloudRuntimeException(String.format("Unable to find the '%s' script", KeyStoreUtils.KS_IMPORT_SCRIPT)); |
| } |
| |
| try { |
| connection.start(); |
| } catch (final NioConnectionException e) { |
| logger.warn("Attempt to connect to server generated NIO Connection Exception {}, trying again", e.getLocalizedMessage()); |
| } |
| while (!connection.isStartup()) { |
| final String host = shell.getNextHost(); |
| shell.getBackoffAlgorithm().waitBeforeRetry(); |
| connection = new NioClient(getAgentName(), host, shell.getPort(), shell.getWorkers(), |
| shell.getSslHandshakeTimeout(), this); |
| logger.info("Connecting to host: {}", host); |
| try { |
| connection.start(); |
| } catch (final NioConnectionException e) { |
| stopAndCleanupConnection(false); |
| logger.info("Attempted to connect to the server, but received an unexpected exception, trying again...", e); |
| } |
| } |
| shell.updateConnectedHost(); |
| scavengeOldAgentObjects(); |
| |
| } |
| |
| public void stop(final String reason, final String detail) { |
| logger.info("Stopping the agent: Reason = {}{}", reason, (detail != null ? ": Detail = " + detail : "")); |
| reconnectAllowed = false; |
| if (connection != null) { |
| final ShutdownCommand cmd = new ShutdownCommand(reason, detail); |
| try { |
| if (link != null) { |
| final Request req = new Request(id != null ? id : -1, -1, cmd, false); |
| link.send(req.toBytes()); |
| } |
| } catch (final ClosedChannelException e) { |
| logger.warn("Unable to send: {}", cmd.toString()); |
| } catch (final Exception e) { |
| logger.warn("Unable to send: {} due to exception: {}", cmd.toString(), e); |
| } |
| logger.debug("Sending shutdown to management server"); |
| try { |
| Thread.sleep(1000); |
| } catch (final InterruptedException e) { |
| logger.debug("Who the heck interrupted me here?"); |
| } |
| connection.stop(); |
| connection = null; |
| link = null; |
| } |
| |
| if (serverResource != null) { |
| serverResource.stop(); |
| serverResource = null; |
| } |
| |
| if (startupTask.get() != null) { |
| startupTask.set(null); |
| } |
| |
| if (outRequestHandler != null) { |
| outRequestHandler.shutdownNow(); |
| outRequestHandler = null; |
| } |
| |
| if (requestHandler != null) { |
| requestHandler.shutdown(); |
| requestHandler = null; |
| } |
| |
| if (selfTaskExecutor != null) { |
| selfTaskExecutor.shutdown(); |
| selfTaskExecutor = null; |
| } |
| |
| if (hostLbCheckExecutor != null) { |
| hostLbCheckExecutor.shutdown(); |
| hostLbCheckExecutor = null; |
| } |
| |
| if (certExecutor != null) { |
| certExecutor.shutdown(); |
| certExecutor = null; |
| } |
| } |
| |
| public Long getId() { |
| return id; |
| } |
| |
| public void setId(final Long id) { |
| logger.debug("Set agent id {}", id); |
| this.id = id; |
| shell.setPersistentProperty(getResourceName(), "id", Long.toString(id)); |
| } |
| |
| public String getUuid() { |
| return _uuid; |
| } |
| |
| public void setUuid(String uuid) { |
| this._uuid = uuid; |
| shell.setPersistentProperty(getResourceName(), "uuid", uuid); |
| } |
| |
| public String getName() { |
| return _name; |
| } |
| |
| public void setName(String name) { |
| this._name = name; |
| shell.setPersistentProperty(getResourceName(), "name", name); |
| } |
| |
| private void scheduleCertificateRenewalTask() { |
| String name = "CertificateRenewalTask"; |
| if (certExecutor != null && !certExecutor.isShutdown()) { |
| certExecutor.shutdown(); |
| try { |
| if (!certExecutor.awaitTermination(1, TimeUnit.SECONDS)) { |
| certExecutor.shutdownNow(); |
| } |
| } catch (InterruptedException e) { |
| logger.debug("Forcing {} shutdown as it did not shutdown in the desired time due to: {}", |
| name, e.getMessage()); |
| certExecutor.shutdownNow(); |
| } |
| } |
| certExecutor = Executors.newSingleThreadScheduledExecutor((new NamedThreadFactory(name))); |
| certExecutor.schedule(new PostCertificateRenewalTask(this), 5, TimeUnit.SECONDS); |
| } |
| |
| private void scheduleHostLBCheckerTask(final long checkInterval) { |
| String name = "HostLBCheckerTask"; |
| if (hostLbCheckExecutor != null && !hostLbCheckExecutor.isShutdown()) { |
| hostLbCheckExecutor.shutdown(); |
| try { |
| if (!hostLbCheckExecutor.awaitTermination(1, TimeUnit.SECONDS)) { |
| hostLbCheckExecutor.shutdownNow(); |
| } |
| } catch (InterruptedException e) { |
| logger.debug("Forcing {} shutdown as it did not shutdown in the desired time due to: {}", |
| name, e.getMessage()); |
| hostLbCheckExecutor.shutdownNow(); |
| } |
| } |
| if (checkInterval > 0L) { |
| logger.info("Scheduling preferred host task with host.lb.interval={}ms", checkInterval); |
| hostLbCheckExecutor = Executors.newSingleThreadScheduledExecutor((new NamedThreadFactory(name))); |
| hostLbCheckExecutor.scheduleAtFixedRate(new PreferredHostCheckerTask(), checkInterval, checkInterval, |
| TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| public void scheduleWatch(final Link link, final Request request, final long delay, final long period) { |
| logger.debug("Adding a watch list"); |
| final WatchTask task = new WatchTask(link, request, this); |
| final ScheduledFuture<?> future = selfTaskExecutor.scheduleAtFixedRate(task, delay, period, TimeUnit.MILLISECONDS); |
| watchList.add(future); |
| } |
| |
| public void triggerUpdate() { |
| PingCommand command = serverResource.getCurrentStatus(getId()); |
| command.setOutOfBand(true); |
| logger.debug("Sending out of band ping"); |
| final Request request = new Request(id, -1, command, false); |
| request.setSequence(getNextSequence()); |
| try { |
| link.send(request.toBytes()); |
| } catch (final ClosedChannelException e) { |
| logger.warn("Unable to send ping update: {}", request.toString()); |
| } |
| } |
| |
| protected void cancelTasks() { |
| for (final ScheduledFuture<?> task : watchList) { |
| task.cancel(true); |
| } |
| logger.debug("Clearing watch list: {}", () -> watchList.size()); |
| watchList.clear(); |
| } |
| |
| /** |
| * Cleanup agent zone properties. |
| * |
| * Unset zone, cluster and pod values so that host is not added back |
| * when service is restarted. This will be set to proper values |
| * when host is added back |
| */ |
| protected void cleanupAgentZoneProperties() { |
| shell.setPersistentProperty(null, "zone", ""); |
| shell.setPersistentProperty(null, "cluster", ""); |
| shell.setPersistentProperty(null, "pod", ""); |
| } |
| |
| public void lockStartupTask(final Link link) { |
| logger.debug("Creating startup task for link: {}", () -> getLinkLog(link)); |
| StartupTask currentTask = startupTask.get(); |
| if (currentTask != null) { |
| logger.warn("A Startup task is already locked or in progress, cannot create for link {}", |
| getLinkLog(link)); |
| return; |
| } |
| currentTask = new StartupTask(link); |
| if (startupTask.compareAndSet(null, currentTask)) { |
| selfTaskExecutor.schedule(currentTask, startupWait, TimeUnit.SECONDS); |
| return; |
| } |
| logger.warn("Failed to lock a StartupTask for link: {}", getLinkLog(link)); |
| } |
| |
| protected boolean cancelStartupTask() { |
| StartupTask task = startupTask.getAndSet(null); |
| if (task != null) { |
| task.cancel(); |
| return true; |
| } |
| return false; |
| } |
| |
| public void sendStartup(final Link link) { |
| final StartupCommand[] startup = serverResource.initialize(); |
| if (startup != null) { |
| final String msHostList = shell.getPersistentProperty(null, "host"); |
| final Command[] commands = new Command[startup.length]; |
| for (int i = 0; i < startup.length; i++) { |
| setupStartupCommand(startup[i]); |
| startup[i].setMSHostList(msHostList); |
| commands[i] = startup[i]; |
| } |
| final Request request = new Request(id != null ? id : -1, -1, commands, false, false); |
| request.setSequence(getNextSequence()); |
| |
| logger.debug("Sending Startup: {}", request.toString()); |
| lockStartupTask(link); |
| try { |
| link.send(request.toBytes()); |
| } catch (final ClosedChannelException e) { |
| logger.warn("Unable to send request to {} due to '{}', request: {}", |
| getLinkLog(link), e.getMessage(), request); |
| } |
| |
| if (serverResource instanceof ResourceStatusUpdater) { |
| ((ResourceStatusUpdater) serverResource).registerStatusUpdater(this); |
| } |
| } |
| } |
| |
| protected String retrieveHostname() { |
| logger.trace("Retrieving hostname with resource={}", () -> serverResource.getClass().getSimpleName()); |
| final String result = Script.runSimpleBashScript(Script.getExecutableAbsolutePath("hostname"), 500); |
| if (StringUtils.isNotBlank(result)) { |
| return result; |
| } |
| try { |
| InetAddress address = InetAddress.getLocalHost(); |
| return address.toString(); |
| } catch (final UnknownHostException e) { |
| logger.warn("unknown host? ", e); |
| throw new CloudRuntimeException("Cannot get local IP address"); |
| } |
| } |
| |
| protected void setupStartupCommand(final StartupCommand startup) { |
| startup.setId(getId()); |
| if (StringUtils.isBlank(startup.getName())) { |
| if (StringUtils.isBlank(hostname)) { |
| hostname = retrieveHostname(); |
| } |
| startup.setName(hostname); |
| } |
| startup.setDataCenter(getZone()); |
| startup.setPod(getPod()); |
| startup.setGuid(getResourceGuid()); |
| startup.setResourceName(getResourceName()); |
| startup.setVersion(getVersion()); |
| startup.setArch(getAgentArch()); |
| } |
| |
| protected String getAgentArch() { |
| final Script command = new Script("/usr/bin/arch", 500, logger); |
| final OutputInterpreter.OneLineParser parser = new OutputInterpreter.OneLineParser(); |
| return command.execute(parser); |
| } |
| |
| @Override |
| public Task create(final Task.Type type, final Link link, final byte[] data) { |
| return new ServerHandler(type, link, data); |
| } |
| |
| protected void closeAndTerminateLink(final Link link) { |
| if (link == null) { |
| return; |
| } |
| link.close(); |
| link.terminated(); |
| } |
| |
| protected void stopAndCleanupConnection(boolean waitForStop) { |
| if (connection == null) { |
| return; |
| } |
| connection.stop(); |
| try { |
| connection.cleanUp(); |
| } catch (final IOException e) { |
| logger.warn("Fail to clean up old connection. {}", e); |
| } |
| if (!waitForStop) { |
| return; |
| } |
| do { |
| shell.getBackoffAlgorithm().waitBeforeRetry(); |
| } while (connection.isStartup()); |
| } |
| |
| protected void reconnect(final Link link) { |
| if (!reconnectAllowed) { |
| logger.debug("Reconnect requested but it is not allowed {}", () -> getLinkLog(link)); |
| return; |
| } |
| cancelStartupTask(); |
| closeAndTerminateLink(link); |
| closeAndTerminateLink(this.link); |
| setLink(null); |
| cancelTasks(); |
| serverResource.disconnected(); |
| logger.info("Lost connection to host: {}. Attempting reconnection while we still have {} commands in progress.", shell.getConnectedHost(), commandsInProgress.get()); |
| stopAndCleanupConnection(true); |
| do { |
| final String host = shell.getNextHost(); |
| connection = new NioClient(getAgentName(), host, shell.getPort(), shell.getWorkers(), shell.getSslHandshakeTimeout(), this); |
| logger.info("Reconnecting to host: {}", host); |
| try { |
| connection.start(); |
| } catch (final NioConnectionException e) { |
| logger.info("Attempted to re-connect to the server, but received an unexpected exception, trying again...", e); |
| stopAndCleanupConnection(false); |
| } |
| shell.getBackoffAlgorithm().waitBeforeRetry(); |
| } while (!connection.isStartup()); |
| shell.updateConnectedHost(); |
| logger.info("Connected to the host: {}", shell.getConnectedHost()); |
| } |
| |
| public void processStartupAnswer(final Answer answer, final Response response, final Link link) { |
| boolean answerValid = cancelStartupTask(); |
| final StartupAnswer startup = (StartupAnswer)answer; |
| if (!startup.getResult()) { |
| logger.error("Not allowed to connect to the server: {}", answer.getDetails()); |
| if (serverResource != null && !serverResource.isExitOnFailures()) { |
| logger.trace("{} does not allow exit on failure, reconnecting", |
| serverResource.getClass().getSimpleName()); |
| reconnect(link); |
| return; |
| } |
| System.exit(1); |
| } |
| if (!answerValid) { |
| logger.warn("Threw away a startup answer because we're reconnecting."); |
| return; |
| } |
| |
| logger.info("Process agent startup answer, agent [id: {}, uuid: {}, name: {}] connected to the server", |
| startup.getHostId(), startup.getHostUuid(), startup.getHostName()); |
| |
| setId(startup.getHostId()); |
| setUuid(startup.getHostUuid()); |
| setName(startup.getHostName()); |
| pingInterval = startup.getPingInterval() * 1000L; // change to ms. |
| |
| updateLastPingResponseTime(); |
| scheduleWatch(link, response, pingInterval, pingInterval); |
| |
| outRequestHandler.setKeepAliveTime(2 * pingInterval, TimeUnit.MILLISECONDS); |
| |
| logger.info("Startup Response Received: agent [id: {}, uuid: {}, name: {}]", |
| startup.getHostId(), startup.getHostUuid(), startup.getHostName()); |
| } |
| |
| protected void processRequest(final Request request, final Link link) { |
| boolean requestLogged = false; |
| Response response = null; |
| try { |
| final Command[] cmds = request.getCommands(); |
| final Answer[] answers = new Answer[cmds.length]; |
| |
| for (int i = 0; i < cmds.length; i++) { |
| final Command cmd = cmds[i]; |
| Answer answer; |
| try { |
| if (logger.isDebugEnabled()) { |
| if (!requestLogged) // ensures request is logged only once per method call |
| { |
| final String requestMsg = request.toString(); |
| if (requestMsg != null) { |
| logger.debug("Request:{}",requestMsg); |
| } |
| requestLogged = true; |
| } |
| logger.debug("Processing command: {}", cmd.toString()); |
| } |
| |
| if (cmd instanceof CronCommand) { |
| final CronCommand watch = (CronCommand)cmd; |
| scheduleWatch(link, request, watch.getInterval() * 1000L, watch.getInterval() * 1000L); |
| answer = new Answer(cmd, true, null); |
| } else if (cmd instanceof ShutdownCommand) { |
| final ShutdownCommand shutdown = (ShutdownCommand)cmd; |
| logger.debug("Received shutdownCommand, due to: {}", shutdown.getReason()); |
| cancelTasks(); |
| if (shutdown.isRemoveHost()) { |
| cleanupAgentZoneProperties(); |
| } |
| reconnectAllowed = false; |
| answer = new Answer(cmd, true, null); |
| } else if (cmd instanceof ReadyCommand && ((ReadyCommand)cmd).getDetails() != null) { |
| |
| logger.debug("Not ready to connect to mgt server: {}", ((ReadyCommand)cmd).getDetails()); |
| if (serverResource != null && !serverResource.isExitOnFailures()) { |
| logger.trace("{} does not allow exit on failure, reconnecting", |
| serverResource.getClass().getSimpleName()); |
| reconnect(link); |
| return; |
| } |
| System.exit(1); |
| return; |
| } else if (cmd instanceof MaintainCommand) { |
| logger.debug("Received maintainCommand, do not cancel current tasks"); |
| answer = new MaintainAnswer((MaintainCommand)cmd); |
| } else if (cmd instanceof AgentControlCommand) { |
| answer = null; |
| for (final IAgentControlListener listener : controlListeners) { |
| answer = listener.processControlRequest(request, (AgentControlCommand)cmd); |
| if (answer != null) { |
| break; |
| } |
| } |
| |
| if (answer == null) { |
| logger.warn("No handler found to process cmd: {}", cmd.toString()); |
| answer = new AgentControlAnswer(cmd); |
| } |
| } else if (cmd instanceof SetupKeyStoreCommand && ((SetupKeyStoreCommand) cmd).isHandleByAgent()) { |
| answer = setupAgentKeystore((SetupKeyStoreCommand) cmd); |
| } else if (cmd instanceof SetupCertificateCommand && ((SetupCertificateCommand) cmd).isHandleByAgent()) { |
| answer = setupAgentCertificate((SetupCertificateCommand) cmd); |
| if (Host.Type.Routing.equals(serverResource.getType())) { |
| scheduleCertificateRenewalTask(); |
| } |
| } else if (cmd instanceof SetupMSListCommand) { |
| answer = setupManagementServerList((SetupMSListCommand) cmd); |
| } else { |
| if (cmd instanceof ReadyCommand) { |
| processReadyCommand(cmd); |
| } |
| commandsInProgress.incrementAndGet(); |
| try { |
| answer = serverResource.executeRequest(cmd); |
| } finally { |
| commandsInProgress.decrementAndGet(); |
| } |
| if (answer == null) { |
| logger.debug("Response: unsupported command {}", cmd.toString()); |
| answer = Answer.createUnsupportedCommandAnswer(cmd); |
| } |
| } |
| } catch (final Throwable th) { |
| logger.warn("Caught: ", th); |
| final StringWriter writer = new StringWriter(); |
| th.printStackTrace(new PrintWriter(writer)); |
| answer = new Answer(cmd, false, writer.toString()); |
| } |
| |
| answers[i] = answer; |
| if (!answer.getResult() && request.stopOnError()) { |
| for (i++; i < cmds.length; i++) { |
| answers[i] = new Answer(cmds[i], false, "Stopped by previous failure"); |
| } |
| break; |
| } |
| } |
| response = new Response(request, answers); |
| } finally { |
| if (logger.isDebugEnabled()) { |
| final String responseMsg = response.toString(); |
| if (responseMsg != null) { |
| logger.debug(response.toString()); |
| } |
| } |
| |
| if (response != null) { |
| try { |
| link.send(response.toBytes()); |
| } catch (final ClosedChannelException e) { |
| logger.warn("Unable to send response: {}", response.toString()); |
| } |
| } |
| } |
| } |
| |
| public Answer setupAgentKeystore(final SetupKeyStoreCommand cmd) { |
| final String keyStorePassword = cmd.getKeystorePassword(); |
| final long validityDays = cmd.getValidityDays(); |
| |
| logger.debug("Setting up agent keystore file and generating CSR"); |
| |
| final File agentFile = PropertiesUtil.findConfigFile("agent.properties"); |
| if (agentFile == null) { |
| return new Answer(cmd, false, "Failed to find agent.properties file"); |
| } |
| final String keyStoreFile = agentFile.getParent() + "/" + KeyStoreUtils.KS_FILENAME; |
| final String csrFile = agentFile.getParent() + "/" + KeyStoreUtils.CSR_FILENAME; |
| |
| String storedPassword = shell.getPersistentProperty(null, KeyStoreUtils.KS_PASSPHRASE_PROPERTY); |
| if (StringUtils.isEmpty(storedPassword)) { |
| storedPassword = keyStorePassword; |
| shell.setPersistentProperty(null, KeyStoreUtils.KS_PASSPHRASE_PROPERTY, storedPassword); |
| } |
| |
| Script script = new Script(keystoreSetupSetupPath, 300000, logger); |
| script.add(agentFile.getAbsolutePath()); |
| script.add(keyStoreFile); |
| script.add(storedPassword); |
| script.add(String.valueOf(validityDays)); |
| script.add(csrFile); |
| String result = script.execute(); |
| if (result != null) { |
| throw new CloudRuntimeException("Unable to setup keystore file"); |
| } |
| |
| final String csrString; |
| try { |
| csrString = FileUtils.readFileToString(new File(csrFile), Charset.defaultCharset()); |
| } catch (IOException e) { |
| throw new CloudRuntimeException("Unable to read generated CSR file", e); |
| } |
| return new SetupKeystoreAnswer(csrString); |
| } |
| |
| private Answer setupAgentCertificate(final SetupCertificateCommand cmd) { |
| final String certificate = cmd.getCertificate(); |
| final String privateKey = cmd.getPrivateKey(); |
| final String caCertificates = cmd.getCaCertificates(); |
| |
| logger.debug("Importing received certificate to agent's keystore"); |
| |
| final File agentFile = PropertiesUtil.findConfigFile("agent.properties"); |
| if (agentFile == null) { |
| return new Answer(cmd, false, "Failed to find agent.properties file"); |
| } |
| final String keyStoreFile = agentFile.getParent() + "/" + KeyStoreUtils.KS_FILENAME; |
| final String certFile = agentFile.getParent() + "/" + KeyStoreUtils.CERT_FILENAME; |
| final String privateKeyFile = agentFile.getParent() + "/" + KeyStoreUtils.PKEY_FILENAME; |
| final String caCertFile = agentFile.getParent() + "/" + KeyStoreUtils.CACERT_FILENAME; |
| |
| try { |
| FileUtils.writeStringToFile(new File(certFile), certificate, Charset.defaultCharset()); |
| FileUtils.writeStringToFile(new File(caCertFile), caCertificates, Charset.defaultCharset()); |
| logger.debug("Saved received client certificate to: {}", certFile); |
| } catch (IOException e) { |
| throw new CloudRuntimeException("Unable to save received agent client and ca certificates", e); |
| } |
| |
| String ksPassphrase = shell.getPersistentProperty(null, KeyStoreUtils.KS_PASSPHRASE_PROPERTY); |
| Script script = new Script(keystoreCertImportScriptPath, 300000, logger); |
| script.add(agentFile.getAbsolutePath()); |
| script.add(ksPassphrase); |
| script.add(keyStoreFile); |
| script.add(KeyStoreUtils.AGENT_MODE); |
| script.add(certFile); |
| script.add(""); |
| script.add(caCertFile); |
| script.add(""); |
| script.add(privateKeyFile); |
| script.add(privateKey); |
| String result = script.execute(); |
| if (result != null) { |
| throw new CloudRuntimeException("Unable to import certificate into keystore file"); |
| } |
| return new SetupCertificateAnswer(true); |
| } |
| |
| private void processManagementServerList(final List<String> msList, final String lbAlgorithm, final Long lbCheckInterval) { |
| if (CollectionUtils.isNotEmpty(msList) && StringUtils.isNotEmpty(lbAlgorithm)) { |
| try { |
| final String newMSHosts = String.format("%s%s%s", com.cloud.utils.StringUtils.toCSVList(msList), IAgentShell.hostLbAlgorithmSeparator, lbAlgorithm); |
| shell.setPersistentProperty(null, "host", newMSHosts); |
| shell.setHosts(newMSHosts); |
| shell.resetHostCounter(); |
| logger.info("Processed new management server list: {}", newMSHosts); |
| } catch (final Exception e) { |
| throw new CloudRuntimeException("Could not persist received management servers list", e); |
| } |
| } |
| if ("shuffle".equals(lbAlgorithm)) { |
| scheduleHostLBCheckerTask(0); |
| } else { |
| scheduleHostLBCheckerTask(shell.getLbCheckerInterval(lbCheckInterval)); |
| } |
| } |
| |
| private Answer setupManagementServerList(final SetupMSListCommand cmd) { |
| processManagementServerList(cmd.getMsList(), cmd.getLbAlgorithm(), cmd.getLbCheckInterval()); |
| return new SetupMSListAnswer(true); |
| } |
| |
| public void processResponse(final Response response, final Link link) { |
| final Answer answer = response.getAnswer(); |
| logger.debug("Received response: {}", response.toString()); |
| if (answer instanceof StartupAnswer) { |
| processStartupAnswer(answer, response, link); |
| } else if (answer instanceof AgentControlAnswer) { |
| // Notice, we are doing callback while holding a lock! |
| for (final IAgentControlListener listener : controlListeners) { |
| listener.processControlResponse(response, (AgentControlAnswer)answer); |
| } |
| } else if (answer instanceof PingAnswer && (((PingAnswer) answer).isSendStartup()) && reconnectAllowed) { |
| logger.info("Management server requested startup command to reinitialize the agent"); |
| sendStartup(link); |
| } else { |
| updateLastPingResponseTime(); |
| } |
| } |
| |
| public void processReadyCommand(final Command cmd) { |
| final ReadyCommand ready = (ReadyCommand)cmd; |
| // Set human readable sizes; |
| Boolean humanReadable = ready.getEnableHumanReadableSizes(); |
| if (humanReadable != null){ |
| NumbersUtil.enableHumanReadableSizes = humanReadable; |
| } |
| |
| logger.info("Processing agent ready command, agent id = {}, uuid = {}, name = {}", ready.getHostId(), ready.getHostUuid(), ready.getHostName()); |
| if (ready.getHostId() != null) { |
| setId(ready.getHostId()); |
| setUuid(ready.getHostUuid()); |
| setName(ready.getHostName()); |
| } |
| |
| verifyAgentArch(ready.getArch()); |
| processManagementServerList(ready.getMsHostList(), ready.getLbAlgorithm(), ready.getLbCheckInterval()); |
| |
| logger.info("Ready command is processed for agent [id: {}, uuid: {}, name: {}]", getId(), getUuid(), getName()); |
| } |
| |
| private void verifyAgentArch(String arch) { |
| if (StringUtils.isNotBlank(arch)) { |
| String agentArch = getAgentArch(); |
| if (!arch.equals(agentArch)) { |
| logger.error("Unexpected arch {}, expected {}", agentArch, arch); |
| } |
| } |
| } |
| |
| public void processOtherTask(final Task task) { |
| final Object obj = task.get(); |
| if (obj instanceof Response) { |
| if (System.currentTimeMillis() - lastPingResponseTime.get() > pingInterval * shell.getPingRetries()) { |
| logger.error("Ping Interval has gone past {}. Won't reconnect to mgt server, as connection is still alive", |
| pingInterval * shell.getPingRetries()); |
| return; |
| } |
| |
| final PingCommand ping = serverResource.getCurrentStatus(getId()); |
| final Request request = new Request(id, -1, ping, false); |
| request.setSequence(getNextSequence()); |
| logger.debug("Sending ping: {}", request.toString()); |
| |
| try { |
| task.getLink().send(request.toBytes()); |
| //if i can send pingcommand out, means the link is ok |
| updateLastPingResponseTime(); |
| } catch (final ClosedChannelException e) { |
| logger.warn("Unable to send request to {} due to '{}', request: {}", |
| getLinkLog(task.getLink()), e.getMessage(), request); |
| } |
| |
| } else if (obj instanceof Request) { |
| final Request req = (Request)obj; |
| final Command command = req.getCommand(); |
| if (command.getContextParam("logid") != null) { |
| ThreadContext.put("logcontextid", command.getContextParam("logid")); |
| } |
| Answer answer = null; |
| commandsInProgress.incrementAndGet(); |
| try { |
| answer = serverResource.executeRequest(command); |
| } finally { |
| commandsInProgress.decrementAndGet(); |
| } |
| if (answer != null) { |
| final Response response = new Response(req, answer); |
| |
| logger.debug("Watch Sent: {}", response.toString()); |
| try { |
| task.getLink().send(response.toBytes()); |
| } catch (final ClosedChannelException e) { |
| logger.warn("Unable to send response: {}", response.toString()); |
| } |
| } |
| } else { |
| logger.warn("Ignoring an unknown task"); |
| } |
| } |
| |
| public void updateLastPingResponseTime() { |
| lastPingResponseTime.set(System.currentTimeMillis()); |
| } |
| |
| protected long getNextSequence() { |
| return sequence.getAndIncrement(); |
| } |
| |
| @Override |
| public void registerControlListener(final IAgentControlListener listener) { |
| controlListeners.add(listener); |
| } |
| |
| @Override |
| public void unregisterControlListener(final IAgentControlListener listener) { |
| controlListeners.remove(listener); |
| } |
| |
| @Override |
| public AgentControlAnswer sendRequest(final AgentControlCommand cmd, final int timeoutInMilliseconds) throws AgentControlChannelException { |
| final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false); |
| request.setSequence(getNextSequence()); |
| final AgentControlListener listener = new AgentControlListener(request); |
| registerControlListener(listener); |
| try { |
| postRequest(request); |
| synchronized (listener) { |
| try { |
| listener.wait(timeoutInMilliseconds); |
| } catch (final InterruptedException e) { |
| logger.warn("sendRequest is interrupted, exit waiting"); |
| } |
| } |
| return listener.getAnswer(); |
| } finally { |
| unregisterControlListener(listener); |
| } |
| } |
| |
| @Override |
| public void postRequest(final AgentControlCommand cmd) throws AgentControlChannelException { |
| final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false); |
| request.setSequence(getNextSequence()); |
| postRequest(request); |
| } |
| |
| private void postRequest(final Request request) throws AgentControlChannelException { |
| if (link != null) { |
| try { |
| link.send(request.toBytes()); |
| } catch (final ClosedChannelException e) { |
| logger.warn("Unable to post agent control request: {}", request.toString()); |
| throw new AgentControlChannelException("Unable to post agent control request due to " + e.getMessage()); |
| } |
| } else { |
| throw new AgentControlChannelException("Unable to post agent control request as link is not available"); |
| } |
| } |
| |
| public class AgentControlListener implements IAgentControlListener { |
| private AgentControlAnswer _answer; |
| private final Request _request; |
| |
| public AgentControlListener(final Request request) { |
| _request = request; |
| } |
| |
| public AgentControlAnswer getAnswer() { |
| return _answer; |
| } |
| |
| @Override |
| public Answer processControlRequest(final Request request, final AgentControlCommand cmd) { |
| return null; |
| } |
| |
| @Override |
| public void processControlResponse(final Response response, final AgentControlAnswer answer) { |
| if (_request.getSequence() == response.getSequence()) { |
| _answer = answer; |
| synchronized (this) { |
| notifyAll(); |
| } |
| } |
| } |
| } |
| |
| protected class ShutdownThread extends Thread { |
| Agent _agent; |
| |
| public ShutdownThread(final Agent agent) { |
| super("AgentShutdownThread"); |
| _agent = agent; |
| } |
| |
| @Override |
| public void run() { |
| _agent.stop(ShutdownCommand.Requested, null); |
| } |
| } |
| |
| public class WatchTask implements Runnable { |
| protected Request _request; |
| protected Agent _agent; |
| protected Link _link; |
| |
| public WatchTask(final Link link, final Request request, final Agent agent) { |
| super(); |
| _request = request; |
| _link = link; |
| _agent = agent; |
| } |
| |
| @Override |
| public void run() { |
| logger.trace("Scheduling {}", (_request instanceof Response ? "Ping" : "Watch Task")); |
| try { |
| if (_request instanceof Response) { |
| outRequestHandler.submit(new ServerHandler(Task.Type.OTHER, _link, _request)); |
| } else { |
| _link.schedule(new ServerHandler(Task.Type.OTHER, _link, _request)); |
| } |
| } catch (final ClosedChannelException e) { |
| logger.warn("Unable to schedule task because channel is closed"); |
| } |
| } |
| } |
| |
| public class StartupTask implements Runnable { |
| protected Link _link; |
| private final AtomicBoolean cancelled = new AtomicBoolean(false); |
| |
| public StartupTask(final Link link) { |
| logger.debug("Startup task created"); |
| _link = link; |
| } |
| |
| public boolean cancel() { |
| // TimerTask.cancel may fail depends on the calling context |
| if (cancelled.compareAndSet(false, true)) { |
| startupWait = DEFAULT_STARTUP_WAIT; |
| logger.debug("Startup task cancelled"); |
| } |
| return true; |
| } |
| |
| @Override |
| public void run() { |
| if (cancelled.compareAndSet(false, true)) { |
| logger.info("The running startup command is now invalid. Attempting reconnect"); |
| startupTask.set(null); |
| startupWait = DEFAULT_STARTUP_WAIT * 2; |
| logger.debug("Executing reconnect from task - {}", () -> getLinkLog(_link)); |
| reconnect(_link); |
| } |
| } |
| } |
| |
| public class AgentRequestHandler extends Task { |
| public AgentRequestHandler(final Task.Type type, final Link link, final Request req) { |
| super(type, link, req); |
| } |
| |
| @Override |
| protected void doTask(final Task task) throws TaskExecutionException { |
| final Request req = (Request)get(); |
| if (!(req instanceof Response)) { |
| processRequest(req, task.getLink()); |
| } |
| } |
| } |
| |
| public class ServerHandler extends Task { |
| public ServerHandler(final Task.Type type, final Link link, final byte[] data) { |
| super(type, link, data); |
| } |
| |
| public ServerHandler(final Task.Type type, final Link link, final Request req) { |
| super(type, link, req); |
| } |
| |
| @Override |
| public void doTask(final Task task) throws TaskExecutionException { |
| if (task.getType() == Task.Type.CONNECT) { |
| shell.getBackoffAlgorithm().reset(); |
| setLink(task.getLink()); |
| sendStartup(task.getLink()); |
| } else if (task.getType() == Task.Type.DATA) { |
| Request request; |
| try { |
| request = Request.parse(task.getData()); |
| if (request instanceof Response) { |
| //It's for pinganswer etc, should be processed immediately. |
| processResponse((Response)request, task.getLink()); |
| } else { |
| //put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool |
| //processRequest(request, task.getLink()); |
| requestHandler.submit(new AgentRequestHandler(getType(), getLink(), request)); |
| } |
| } catch (final ClassNotFoundException e) { |
| logger.error("Unable to find this request "); |
| } catch (final Exception e) { |
| logger.error("Error parsing task", e); |
| } |
| } else if (task.getType() == Task.Type.DISCONNECT) { |
| logger.debug("Executing disconnect task - {}", () -> getLinkLog(task.getLink())); |
| reconnect(task.getLink()); |
| } else if (task.getType() == Task.Type.OTHER) { |
| processOtherTask(task); |
| } |
| } |
| } |
| |
| /** |
| * Task stops the current agent and launches a new agent |
| * when there are no outstanding jobs in the agent's task queue |
| */ |
| public class PostCertificateRenewalTask extends ManagedContextTimerTask { |
| |
| private Agent agent; |
| |
| public PostCertificateRenewalTask(final Agent agent) { |
| this.agent = agent; |
| } |
| |
| @Override |
| protected void runInContext() { |
| while (true) { |
| try { |
| if (commandsInProgress.get() == 0) { |
| logger.debug("Running post certificate renewal task to restart services."); |
| |
| // Let the resource perform any post certificate renewal cleanups |
| serverResource.executeRequest(new PostCertificateRenewalCommand()); |
| |
| IAgentShell shell = agent.shell; |
| ServerResource resource = agent.serverResource.getClass().getDeclaredConstructor().newInstance(); |
| |
| // Stop current agent |
| agent.cancelTasks(); |
| agent.reconnectAllowed = false; |
| Runtime.getRuntime().removeShutdownHook(agent.shutdownThread); |
| agent.stop(ShutdownCommand.Requested, "Restarting due to new X509 certificates"); |
| |
| // Nullify references for GC |
| agent.shell = null; |
| agent.watchList = null; |
| agent.shutdownThread = null; |
| agent.controlListeners = null; |
| agent = null; |
| |
| // Start a new agent instance |
| shell.launchNewAgent(resource); |
| return; |
| } |
| logger.debug("Other tasks are in progress, will retry post certificate renewal command after few seconds"); |
| Thread.sleep(5000); |
| } catch (final Exception e) { |
| logger.warn("Failed to execute post certificate renewal command:", e); |
| break; |
| } |
| } |
| } |
| } |
| |
| public class PreferredHostCheckerTask extends ManagedContextTimerTask { |
| |
| @Override |
| protected void runInContext() { |
| try { |
| final String[] msList = shell.getHosts(); |
| if (msList == null || msList.length < 1) { |
| return; |
| } |
| final String preferredHost = msList[0]; |
| final String connectedHost = shell.getConnectedHost(); |
| logger.debug("Running preferred host checker task, connected host={}, preferred host={}", |
| connectedHost, preferredHost); |
| if (preferredHost == null || preferredHost.equals(connectedHost) || link == null) { |
| return; |
| } |
| boolean isHostUp = false; |
| try (final Socket socket = new Socket()) { |
| socket.connect(new InetSocketAddress(preferredHost, shell.getPort()), 5000); |
| isHostUp = true; |
| } catch (final IOException e) { |
| logger.debug("Host: {} is not reachable", preferredHost); |
| } |
| if (isHostUp && link != null && commandsInProgress.get() == 0) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Preferred host {} is found to be reachable, trying to reconnect", preferredHost); |
| } |
| shell.resetHostCounter(); |
| reconnect(link); |
| } |
| } catch (Throwable t) { |
| logger.error("Error caught while attempting to connect to preferred host", t); |
| } |
| } |
| } |
| } |