blob: 927da34104ffafb0e725158ced892c70452b334d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.agent.manager;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.CronCommand;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.StartupAnswer;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Status;
import com.cloud.resource.ServerResource;
import org.apache.logging.log4j.ThreadContext;
public class DirectAgentAttache extends AgentAttache {
protected final ConfigKey<Integer> _HostPingRetryCount = new ConfigKey<Integer>("Advanced", Integer.class, "host.ping.retry.count", "0",
"Number of times retrying a host ping while waiting for check results", true);
protected final ConfigKey<Integer> _HostPingRetryTimer = new ConfigKey<Integer>("Advanced", Integer.class, "host.ping.retry.timer", "5",
"Interval to wait before retrying a host ping while waiting for check results", true);
ServerResource _resource;
List<ScheduledFuture<?>> _futures = new ArrayList<ScheduledFuture<?>>();
long _seq = 0;
LinkedList<Task> tasks = new LinkedList<Task>();
AtomicInteger _outstandingTaskCount;
AtomicInteger _outstandingCronTaskCount;
public DirectAgentAttache(AgentManagerImpl agentMgr, long id, String name, ServerResource resource, boolean maintenance) {
super(agentMgr, id, name, maintenance);
_resource = resource;
_outstandingTaskCount = new AtomicInteger(0);
_outstandingCronTaskCount = new AtomicInteger(0);
}
@Override
public void disconnect(Status state) {
logger.debug("Processing disconnect {}({})", _id, _name);
for (ScheduledFuture<?> future : _futures) {
future.cancel(false);
}
synchronized (this) {
if (_resource != null) {
_resource.disconnected();
_resource = null;
}
}
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof DirectAgentAttache)) {
return false;
}
return super.equals(obj);
}
@Override
public synchronized boolean isClosed() {
return _resource == null;
}
@Override
public void send(Request req) throws AgentUnavailableException {
req.logD("Executing: ", true);
if (req instanceof Response) {
Response resp = (Response)req;
Answer[] answers = resp.getAnswers();
if (answers != null && answers[0] instanceof StartupAnswer) {
StartupAnswer startup = (StartupAnswer)answers[0];
int interval = startup.getPingInterval();
_futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
}
} else {
Command[] cmds = req.getCommands();
if (cmds.length > 0 && !(cmds[0] instanceof CronCommand)) {
queueTask(new Task(req));
scheduleFromQueue();
} else {
CronCommand cmd = (CronCommand)cmds[0];
_futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new CronTask(req), cmd.getInterval(), cmd.getInterval(), TimeUnit.SECONDS));
}
}
}
@Override
public void process(Answer[] answers) {
if (answers != null && answers[0] instanceof StartupAnswer) {
StartupAnswer startup = (StartupAnswer)answers[0];
int interval = startup.getPingInterval();
logger.info("StartupAnswer received {} Interval = {}", startup.getHostId(), interval);
_futures.add(_agentMgr.getCronJobPool().scheduleAtFixedRate(new PingTask(), interval, interval, TimeUnit.SECONDS));
}
}
@Override
protected void finalize() throws Throwable {
try {
assert _resource == null : "Come on now....If you're going to dabble in agent code, you better know how to close out our resources. Ever considered why there's a method called disconnect()?";
synchronized (this) {
if (_resource != null) {
logger.warn("Lost attache for {}({})", _id, _name);
disconnect(Status.Alert);
}
}
} finally {
super.finalize();
}
}
private synchronized void queueTask(Task task) {
tasks.add(task);
}
private synchronized void scheduleFromQueue() {
logger.trace("Agent attache={}, task queue size={}, outstanding tasks={}", _id, tasks.size(), _outstandingTaskCount.get());
while (!tasks.isEmpty() && _outstandingTaskCount.get() < _agentMgr.getDirectAgentThreadCap()) {
_outstandingTaskCount.incrementAndGet();
_agentMgr.getDirectAgentPool().execute(tasks.remove());
}
}
protected class PingTask extends ManagedContextRunnable {
@Override
protected synchronized void runInContext() {
try {
if (_outstandingCronTaskCount.incrementAndGet() >= _agentMgr.getDirectAgentThreadCap()) {
logger.warn("PingTask execution for direct attache({}) has reached maximum outstanding limit({}), bailing out", _id, _agentMgr.getDirectAgentThreadCap());
return;
}
ServerResource resource = _resource;
if (resource != null) {
PingCommand cmd = resource.getCurrentStatus(_id);
int retried = 0;
while (cmd == null && ++retried <= _HostPingRetryCount.value()) {
Thread.sleep(1000*_HostPingRetryTimer.value());
cmd = resource.getCurrentStatus(_id);
}
if (cmd == null) {
logger.warn("Unable to get current status on {}({})", _id, _name);
return;
}
if (cmd.getContextParam("logid") != null) {
ThreadContext.put("logcontextid", cmd.getContextParam("logid"));
}
logger.debug("Ping from {}({})", _id, _name);
long seq = _seq++;
logger.trace("SeqA {}-{}: {}", _id, seq, new Request(_id, -1, cmd, false).toString());
_agentMgr.handleCommands(DirectAgentAttache.this, seq, new Command[] {cmd});
} else {
logger.debug("Unable to send ping because agent is disconnected {}", _id, _name);
}
} catch (Exception e) {
logger.warn("Unable to complete the ping task", e);
} finally {
_outstandingCronTaskCount.decrementAndGet();
}
}
}
protected class CronTask extends ManagedContextRunnable {
Request _req;
public CronTask(Request req) {
_req = req;
}
private void bailout() {
long seq = _req.getSequence();
try {
Command[] cmds = _req.getCommands();
ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
for (Command cmd : cmds) {
Answer answer = new Answer(cmd, false, "Bailed out as maximum outstanding task limit reached");
answers.add(answer);
}
Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
processAnswers(seq, resp);
} catch (Exception e) {
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Exception caught in bailout ", e);
}
}
@Override
protected void runInContext() {
long seq = _req.getSequence();
try {
if (_outstandingCronTaskCount.incrementAndGet() >= _agentMgr.getDirectAgentThreadCap()) {
logger.warn("CronTask execution for direct attache({}) has reached maximum outstanding limit({}), bailing out", _id, _agentMgr.getDirectAgentThreadCap());
bailout();
return;
}
ServerResource resource = _resource;
Command[] cmds = _req.getCommands();
boolean stopOnError = _req.stopOnError();
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Executing request");
ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
for (int i = 0; i < cmds.length; i++) {
Answer answer = null;
Command currentCmd = cmds[i];
if (currentCmd.getContextParam("logid") != null) {
ThreadContext.put("logcontextid", currentCmd.getContextParam("logid"));
}
try {
if (resource != null) {
answer = resource.executeRequest(cmds[i]);
if (answer == null) {
logger.warn("Resource returned null answer!");
answer = new Answer(cmds[i], false, "Resource returned null answer");
}
} else {
answer = new Answer(cmds[i], false, "Agent is disconnected");
}
} catch (Exception e) {
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Exception Caught while executing command", e);
answer = new Answer(cmds[i], false, e.toString());
}
answers.add(answer);
if (!answer.getResult() && stopOnError) {
if (i < cmds.length - 1 && logger.isDebugEnabled()) {
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Cancelling because one of the answers is false and it is stop on error.");
}
break;
}
}
Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Response Received: ");
processAnswers(seq, resp);
} catch (Exception e) {
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Exception caught ", e);
} finally {
_outstandingCronTaskCount.decrementAndGet();
}
}
}
protected class Task extends ManagedContextRunnable {
Request _req;
public Task(Request req) {
_req = req;
}
@Override
protected void runInContext() {
long seq = _req.getSequence();
try {
ServerResource resource = _resource;
Command[] cmds = _req.getCommands();
boolean stopOnError = _req.stopOnError();
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Executing request");
ArrayList<Answer> answers = new ArrayList<Answer>(cmds.length);
for (int i = 0; i < cmds.length; i++) {
Answer answer = null;
Command currentCmd = cmds[i];
if (currentCmd.getContextParam("logid") != null) {
ThreadContext.put("logcontextid", currentCmd.getContextParam("logid"));
}
try {
if (resource != null) {
answer = resource.executeRequest(cmds[i]);
if (answer == null) {
logger.warn("Resource returned null answer!");
answer = new Answer(cmds[i], false, "Resource returned null answer");
}
} else {
answer = new Answer(cmds[i], false, "Agent is disconnected");
}
} catch (Throwable t) {
// Catch Throwable as all exceptions will otherwise be eaten by the executor framework
logger.warn(LOG_SEQ_FORMATTED_STRING, seq, "Throwable caught while executing command", t);
answer = new Answer(cmds[i], false, t.toString());
}
answers.add(answer);
if (!answer.getResult() && stopOnError) {
if (i < cmds.length - 1 && logger.isDebugEnabled()) {
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Cancelling because one of the answers is false and it is stop on error.");
}
break;
}
}
Response resp = new Response(_req, answers.toArray(new Answer[answers.size()]));
logger.debug(LOG_SEQ_FORMATTED_STRING, seq, "Response Received: ");
processAnswers(seq, resp);
} catch (Throwable t) {
// This is pretty serious as processAnswers might not be called and the calling process is stuck waiting for the full timeout
logger.error(LOG_SEQ_FORMATTED_STRING, seq, "Throwable caught in runInContext, this will cause the management to become unpredictable", t);
} finally {
_outstandingTaskCount.decrementAndGet();
scheduleFromQueue();
}
}
}
}