blob: 6c7803e943063f59346a45e4a7044b67b68d6db9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.agent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Singleton;
@Singleton
public class ActionQueue {
private static Logger LOG = LoggerFactory.getLogger(ActionQueue.class);
private static HashSet<String> EMPTY_HOST_LIST = new HashSet<String>();
final ConcurrentMap<String, Queue<AgentCommand>> hostQueues;
HashSet<String> hostsWithPendingTask = new HashSet<String>();
public ActionQueue() {
hostQueues = new ConcurrentHashMap<String, Queue<AgentCommand>>();
}
private Queue<AgentCommand> getQueue(String hostname) {
return hostQueues.get(hostname);
}
/**
* Adds command to queue for given hostname
* @param hostname - hostname of node
* @param cmd - command to add to queue
* @throws NullPointerException - if hostname is {@code}null{@code}
*/
public void enqueue(String hostname, AgentCommand cmd) {
Queue<AgentCommand> q = getQueue(hostname);
if (q == null) {
//try to add new queue to map if not found
q = hostQueues.putIfAbsent(hostname, new ConcurrentLinkedQueue<AgentCommand>());
if (q == null) {
//null means that new queue was added to map, get it
q = getQueue(hostname);
}
//otherwise we got existing queue (and put nothing!)
}
q.add(cmd);
}
/**
* Get command from queue for given hostname
* @param hostname
* @return
*/
public AgentCommand dequeue(String hostname) {
Queue<AgentCommand> q = getQueue(hostname);
if (q == null) {
return null;
}
return q.poll();
}
/**
* Dequeue's all commands of a specified type for the given host.
*
* @param hostname
* the host to remove commands for (not {@code null}).
* @param commandType
* the type of command to remove (not {@code null}).
* @return the commands removed, or an empty list if none (never {@code null}
* ).
*/
public List<AgentCommand> dequeue(String hostname,
AgentCommandType commandType) {
if (null == hostname || null == commandType) {
return Collections.emptyList();
}
Queue<AgentCommand> queue = getQueue(hostname);
if (null == queue) {
return null;
}
List<AgentCommand> removedCommands = new ArrayList<AgentCommand>(
queue.size());
Iterator<AgentCommand> iterator = queue.iterator();
while (iterator.hasNext()) {
AgentCommand command = iterator.next();
if (command.getCommandType() == commandType) {
removedCommands.add(command);
iterator.remove();
}
}
return removedCommands;
}
/**
* Try to dequeue command with provided id.
*
* @param hostname
* @param commandId
* @return
*/
public AgentCommand dequeue(String hostname, String commandId) {
Queue<AgentCommand> q = getQueue(hostname);
if (q == null) {
return null;
}
if (q.isEmpty()) {
return null;
} else {
AgentCommand c = null;
for (Iterator<AgentCommand> it = q.iterator(); it.hasNext();) {
AgentCommand ac = it.next();
if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac)
.getCommandId().equals(commandId)) {
c = ac;
it.remove();
break;
}
}
return c;
}
}
public int size(String hostname) {
Queue<AgentCommand> q = getQueue(hostname);
if (q == null) {
return 0;
}
return q.size();
}
public List<AgentCommand> dequeueAll(String hostname) {
Queue<AgentCommand> q = getQueue(hostname);
if (q == null) {
return null;
}
List<AgentCommand> l = new ArrayList<AgentCommand>();
AgentCommand command;
do {
//get commands from queue until empty
command = q.poll();
if (command != null) {
l.add(command);
}
} while (command != null);
return l;
}
/**
* Update the cache of hosts that have pending tasks
*
* @param hosts
*/
public void updateListOfHostsWithPendingTask(HashSet<String> hosts) {
if (hosts != null) {
hostsWithPendingTask = hosts;
} else if (hostsWithPendingTask.size() > 0) {
hostsWithPendingTask = EMPTY_HOST_LIST;
}
}
/**
* Checks whether host has pending tasks
* @param hostName
* @return
*/
public boolean hasPendingTask(String hostName) {
HashSet<String> copyHostsWithTaskPending = hostsWithPendingTask;
if (copyHostsWithTaskPending != null) {
return copyHostsWithTaskPending.contains(hostName);
}
return false;
}
}