blob: e61487cedd39f4c79e8244ff4d5bd49900258d54 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.agent.processors;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.uima.ducc.agent.NodeAgent;
import org.apache.uima.ducc.common.utils.DuccLogger;
import org.apache.uima.ducc.common.utils.id.DuccId;
import org.apache.uima.ducc.transport.agent.IUimaPipelineAEComponent;
import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
import org.apache.uima.ducc.transport.event.common.IDuccProcess;
import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
/**
*
*
*/
public class DefaultNodeInventoryProcessor implements NodeInventoryProcessor {
DuccLogger logger = new DuccLogger(this.getClass(), "AGENT");
boolean inventoryChanged = true;
private NodeAgent agent;
private Map<DuccId, IDuccProcess> previousInventory;
private int forceInventoryUpdateMaxThreshold = 0;
private long counter = 0;
public DefaultNodeInventoryProcessor(NodeAgent agent, String inventoryPublishRateSkipCount) {
this.agent = agent;
try {
forceInventoryUpdateMaxThreshold = Integer.parseInt(inventoryPublishRateSkipCount);
} catch (Exception e) {
}
// Dont allow 0
if (forceInventoryUpdateMaxThreshold == 0) {
forceInventoryUpdateMaxThreshold = 1;
}
}
/**
* Get a copy of agent {@code Process} inventory
*/
public Map<DuccId, IDuccProcess> getInventory() {
return agent.getInventoryCopy();
}
public void dispatchInventoryUpdate(DuccEventDispatcher dispatcher, String targetEndpoint,
Map<DuccId, IDuccProcess> inventory) throws Exception {
NodeInventoryUpdateDuccEvent duccEvent = new NodeInventoryUpdateDuccEvent(inventory,
agent.getLastORSequence(), agent.getIdentity());
dispatcher.dispatch(targetEndpoint, duccEvent);
logger.info("dispatchInventoryUpdate", null,
"Agent dispatched inventory update event to endpoint:" + targetEndpoint);
}
/**
*
*/
public void process(Exchange outgoingMessage) throws Exception {
String methodName = "process";
// Get a deep copy of agent's inventory
Map<DuccId, IDuccProcess> inventory = getInventory();
// Determine if the inventory changed since the last publishing was done
// First check if the inventory expanded or shrunk. If the same in size,
// compare process states and PID. If either of the two changed for any
// of the processes trigger immediate publish. If no changes found,
// publish
// according to skip counter
// (ducc.agent.node.inventory.publish.rate.skip)
// configured in ducc.properties.
if (previousInventory != null) {
if (agent.getEventListener().forceInvotoryUpdate()) {
inventoryChanged = true;
agent.getEventListener().resetForceInventoryUpdateFlag();
}
if (inventory.size() != previousInventory.size()) {
inventoryChanged = true;
} else {
// Inventory maps are equal in size, check if all processes in
// the current
// inventory exist in the previous inventory snapshot. If not,
// it means that
// that perhaps a new process was added and one was removed. In
// this case,
// force the publish, since there was a change.
for (Map.Entry<DuccId, IDuccProcess> currentProcess : inventory.entrySet()) {
// Check if a process in the current inventory exists in a
// previous
// inventory snapshot
if (previousInventory.containsKey(currentProcess.getKey())) {
IDuccProcess previousProcess = previousInventory.get(currentProcess.getKey());
// check if either PID or process state has changed
if (currentProcess.getValue().getPID() != null && previousProcess.getPID() == null) {
inventoryChanged = true;
break;
} else if (!currentProcess.getValue().getProcessState()
.equals(previousProcess.getProcessState())) {
inventoryChanged = true;
break;
} else {
List<IUimaPipelineAEComponent> breakdown = currentProcess.getValue()
.getUimaPipelineComponents();
if (breakdown != null && breakdown.size() > 0) {
List<IUimaPipelineAEComponent> previousBreakdown = previousProcess
.getUimaPipelineComponents();
if (previousBreakdown == null || previousBreakdown.size() == 0
|| breakdown.size() != previousBreakdown.size()) {
inventoryChanged = true;
} else {
for (IUimaPipelineAEComponent uimaAeState : breakdown) {
boolean found = false;
for (IUimaPipelineAEComponent previousUimaAeState : previousBreakdown) {
if (uimaAeState.getAeName().equals(previousUimaAeState.getAeName())) {
found = true;
if (!uimaAeState.getAeState().equals(previousUimaAeState.getAeState())
|| uimaAeState.getInitializationTime() != previousUimaAeState
.getInitializationTime()) {
inventoryChanged = true;
break;
}
}
}
if (!found) {
inventoryChanged = true;
}
if (inventoryChanged) {
break;
}
}
}
}
}
} else {
// New inventory contains a process not in the previous
// snapshot
inventoryChanged = true;
break;
}
}
}
}
// Get this inventory snapshot
previousInventory = inventory;
// Broadcast inventory if there is a change or configured number of
// epochs
// passed since the last broadcast. This is configured in
// ducc.properties with
// property ducc.agent.node.inventory.publish.rate.skip
try {
if (inventory.size() > 0 && (inventoryChanged || // if there is
// inventory
// change,
// publish
forceInventoryUpdateMaxThreshold == 0 || // skip rate in
// ducc.properties
// is zero,
// publish
(counter > 0 && (counter % forceInventoryUpdateMaxThreshold) == 0))) { // if
// reached
// skip
// rate,
// publish
StringBuffer sb = new StringBuffer("Node Inventory (" + inventory.size() + ")");
for (Map.Entry<DuccId, IDuccProcess> p : inventory.entrySet()) {
/*
* long endInitLong = 0; String endInit = ""; ITimeWindow wInit =
* p.getValue().getTimeWindowInit(); if(wInit != null) { endInit = wInit.getEnd();
* endInitLong = wInit.getEndLong(); } long startRunLong = 0; String startRun = "";
* ITimeWindow wRun = p.getValue().getTimeWindowRun(); if(wRun != null) { startRun =
* wRun.getStart(); startRunLong = wRun.getStartLong(); } if(endInitLong > startRunLong) {
* logger.warn(methodName, null, "endInit:"+endInitLong+" "+"startRun:"+startRunLong); }
*/
if (p.getValue().getUimaPipelineComponents() == null) {
p.getValue().setUimaPipelineComponents(new ArrayList<IUimaPipelineAEComponent>());
}
if (!p.getValue().getProcessState().equals(ProcessState.Initializing)) {
p.getValue().getUimaPipelineComponents().clear();
}
int pipelineInitStats = (p.getValue().getUimaPipelineComponents() == null) ? 0
: p.getValue().getUimaPipelineComponents().size();
StringBuffer gcInfo = new StringBuffer();
if (p.getValue().getGarbageCollectionStats() != null) {
gcInfo.append(" GC Total=")
.append(p.getValue().getGarbageCollectionStats().getCollectionCount())
.append(" GC Time=")
.append(p.getValue().getGarbageCollectionStats().getCollectionTime())
.append(" ");
}
sb.append("\n\t[Process Type=").append(p.getValue().getProcessType()).append(" DUCC ID=")
.append(p.getValue().getDuccId()).append(" PID=").append(p.getValue().getPID())
.append(" State=").append(p.getValue().getProcessState())
.append(" Resident Memory=").append(p.getValue().getResidentMemory())
.append(gcInfo.toString()).append(" Init Stats List Size:" + pipelineInitStats)
.append(" Reason: " + p.getValue().getReasonForStoppingProcess()).append("] ");
if (p.getValue().getProcessState().equals(ProcessState.Stopped)
|| p.getValue().getProcessState().equals(ProcessState.Failed)
|| p.getValue().getProcessState().equals(ProcessState.Killed)) {
sb.append(" Reason:" + p.getValue().getReasonForStoppingProcess());
sb.append(" Extended Reason:" + p.getValue().getExtendedReasonForStoppingProcess());
}
if (!p.getValue().getProcessState().equals(ProcessState.Running)
&& !p.getValue().getProcessState().equals(ProcessState.Initializing)) {
sb.append(" Exit Code=" + p.getValue().getProcessExitCode());
}
}
logger.info(methodName, null, "Agent " + agent.getIdentity().getCanonicalName()
+ " Posting Inventory:" + sb.toString());
outgoingMessage.getIn().setBody(new NodeInventoryUpdateDuccEvent(inventory,
agent.getLastORSequence(), agent.getIdentity()));
} else {
// Add null to the body of the message. A filter
// defined in the Camel route (AgentConfiguration.java)
// has a predicate to check for null body and throws
// away such a message.
outgoingMessage.getIn().setBody(null);
}
} catch (Exception e) {
logger.error(methodName, null, e);
} finally {
if (inventoryChanged) {
counter = 0;
} else {
counter++;
}
inventoryChanged = false;
}
}
}