blob: 3e9ee442bdcb2f311371345f24e240a8a3b121bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.ping;
/*
* IMPORTANT: This is a sample custom pinger for illustration purposes only. It is not
* supported in any way.
*/
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.uima.UIMAFramework;
import org.apache.uima.aae.client.UimaASProcessStatus;
import org.apache.uima.aae.client.UimaAsBaseCallbackListener;
import org.apache.uima.aae.client.UimaAsynchronousEngine;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngineCommon_impl;
import org.apache.uima.adapter.jms.client.BaseUIMAAsynchronousEngine_impl;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.EntityProcessStatus;
import org.apache.uima.ducc.cli.AServicePing;
import org.apache.uima.ducc.cli.ServiceStatistics;
import org.apache.uima.ducc.cli.UimaAsServiceMonitor;
import org.apache.uima.ducc.common.IServiceStatistics;
import org.apache.uima.ducc.common.TcpStreamHandler;
import org.apache.uima.ducc.common.utils.DuccProperties;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Level;
// 'q_thresh=nn,window=mm,broker_jmx=1100,meta_timeout=10000'
public class SamplePing
extends AServicePing
{
String ep; // Endpoint, passed in during initialization
String qname; // Service queue name, parsed from ep
String broker; // Service broker, parsed from ep
int meta_timeout = 5000; // default
String broker_host; // AMQ hostname, parsed from 'broker' URL
int broker_jmx_port = 1099; // AMQ jmx port, default
UimaAsServiceMonitor monitor; // Part of ping API, knows how to query AMQ for broker stats
String nodeIp; // For UIMA-AS get-meta callback, IP of node answering get-meta
String pid; // " " " " , process answering get-meta
boolean gmfail = false; // Did get=meta work?
boolean fast_shrink = true; // If false, don't shrink instances if there are producers
// still connected to the Q.
int additions = 0; // number of additions to signal to SM
Long[] deletions = null; // which specific instances to shrink
int min_instances = 0; // minimum instances to maintain
int max_instances = 10; // max instances to allow
int max_growth = 5; // max processes to grow at any time
double goal = 2.00; // want to get enqueue time to within this factor of individual service time
int cursor = 0; // growth and shrinkage window cursor
int expansion_period = 5; // size of window in minutes
int window_size = expansion_period; // size of window in pings (panes)
int[] expansion_window; // expansion window
int[] deletion_window; // shrinkage window
public SamplePing()
{
}
public void init(String args, String ep)
throws Exception
{
String methodName = "init";
this.ep = ep;
doLog(methodName, "Ping.init(" + args + ", " + ep + " start.");
// Ep is of the form UIMA-AS:queuename:broker. Parse out queue name and broker URL
int ndx = ep.indexOf(":");
ep = ep.substring(ndx+1);
ndx = ep.indexOf(":");
this.qname = ep.substring(0, ndx).trim();
this.broker = ep.substring(ndx+1).trim();
// broker is a URL that we need to parse in order to get the actual host for jmx calls
URL url = null;
try {
url = new URL(null, broker, new TcpStreamHandler());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Invalid broker URL: " + broker);
}
broker_host = url.getHost();
// inhibit noisy default UIMAAs logging
UIMAFramework.getLogger(BaseUIMAAsynchronousEngineCommon_impl.class).setLevel(Level.OFF);
UIMAFramework.getLogger(BaseUIMAAsynchronousEngine_impl.class).setLevel(Level.OFF);
UIMAFramework.getLogger().setLevel(Level.INFO);
// Parse out the pinger arguments which are comma-delimeted in the argument string
// We do this by splitting on ',', then writing as strings to a StringReader, and
// finally loading a PropertiesFile from them. DuccProperties is used because it
// smart typed extraction of the properties.
if ( args != null ) {
String[] as = args.split(",");
StringWriter sw = new StringWriter();
for ( String s : as ) sw.write(s + "\n");
StringReader sr = new StringReader(sw.toString());
DuccProperties props = new DuccProperties();
try {
props.load(sr);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
meta_timeout = props.getIntProperty("meta-timeout", meta_timeout);
broker_jmx_port = props.getIntProperty("broker-jmx-port", broker_jmx_port);
expansion_period = props.getIntProperty("window", expansion_period);
min_instances = props.getIntProperty("min", min_instances);
max_instances = props.getIntProperty("max", max_instances);
max_growth = props.getIntProperty("max-growth", max_growth);
fast_shrink = props.getBooleanProperty("fast-shrink", true);
goal = props.getDoubleProperty("goal", goal);
}
// Set up expansion/deletion windows with window size always 1 minute regardless
// of ping frequency.
double calls_per_minute = 60000.00 / monitor_rate;
window_size = (int) ( ((double)expansion_period) * calls_per_minute);
expansion_window = new int[window_size];
deletion_window = new int[window_size];
doLog("<ctr>",
"INIT: meta-timeout", meta_timeout,
"broker-host", broker_host,
"broker-jmx-port", broker_jmx_port,
"monitor-window", expansion_period,
"window-size", window_size,
"monitor rate", monitor_rate,
"max-instances", max_instances,
"min-instances", min_instances,
"max-growth", max_growth,
"goal", goal,
"fast-shrink", fast_shrink);
// Set up the jmx queue monitor and reset statistics to insure each calculation is
// based on current data, not old data.
this.monitor = new UimaAsServiceMonitor(qname, broker_host, broker_jmx_port);
monitor.resetStatistics();
}
// Release resources when terminated.
public void stop()
{
String methodName = "stop";
if ( monitor != null ) monitor.stop();
doLog(methodName, "------------ Stop signal arrives, pinger exits.");
}
// This is called by the Service Manager to collect the pinger data. This pinger
// issues a get-meta to the service to insure it's alive, then collects queue statistics via JMX
// and determines expansion and deletion for return when SM asks about them.
public IServiceStatistics getStatistics()
{
String methodName = "getStatistics";
IServiceStatistics statistics = new ServiceStatistics(false, false, "<NA>");
nodeIp = "N/A";
pid = "N/A";
// "health" has no real meaning. maybe we can get rid of it one day?
try {
monitor.collect(); // Get jmx stuff
statistics.setHealthy(true);
} catch ( Throwable t ) {
statistics.setHealthy(false);
monitor.setJmxFailure(t.getMessage());
}
// Instantiate Uima AS Client and issue get-meta
BaseUIMAAsynchronousEngine_impl uimaAsEngine = new BaseUIMAAsynchronousEngine_impl();
UimaCbListener listener = new UimaCbListener();
uimaAsEngine.addStatusCallbackListener(listener);
Map<String, Object> appCtx = new HashMap<String, Object>();
appCtx.put(UimaAsynchronousEngine.ServerUri, broker);
appCtx.put(UimaAsynchronousEngine.ENDPOINT, qname);
appCtx.put(UimaAsynchronousEngine.GetMetaTimeout, meta_timeout);
try {
uimaAsEngine.initialize(appCtx); // This performs the get-meta
statistics.setAlive(true);
statistics.setHealthy(true && statistics.isHealthy());
listener.ok();
} catch( ResourceInitializationException e) {
listener.timeout(); // Service not responding
doLog(methodName, "Cannot issue getMeta to: " + qname + ":" + broker);
statistics.setHealthy(false);
statistics.setAlive(false);
} finally {
try {
uimaAsEngine.stop();
} catch (Throwable e) {
doLog(methodName, "Exception on UIMA-AS connection stop:" + e.toString());
}
}
monitor.setSource(nodeIp, pid, gmfail); // remember who responded
statistics.setInfo(monitor.format()); // set string for web server
calculateNewDeployment(statistics); // Decide on instance expansion or deletion
return statistics;
}
/**
* Override from AServicePing, set in calculateNewDeployment while analyzing queue statistics
*/
public long getLastUse()
{
return last_use;
}
// ================================================================================
// Service Mini-scheduler
// ================================================================================
void calculateNewDeployment(IServiceStatistics stats)
{
String methodName = "calculateNewDeployment";
// Rate per overall service
// Rs = Qt / Qd
// Per-instance response
// Ri = Rs * ninst
// This scheduler is going to chase Ri, attempting to keep Qt <= 4 seconds
double eT = monitor.getEnqueueTime(); // average time things stay in queue
long Q = monitor.getQueueSize(); // current queue size (depth)
long cc = monitor.getConsumerCount();
long pc = monitor.getProducerCount();
double Ti; // service time per instance (process)
double Tt; // service time per thread
Long[] instances = (Long[]) smState.get("all-instances");
int ninst = instances.length;
Long[] act_instances = (Long[]) smState.get("active-instances");
int active = act_instances.length;
if ( pc > 0 ) {
last_use = System.currentTimeMillis();
}
//
// Must calculate number of threads per instance up front in order to properly
// count ninstances later as it changes.
//
// let cc = number of comsumers
// ni = number of instances
// nt = threads per instance
// then
// cc = ni * nt + ni
// and
// nt = cc / ni - 1
//
// Then later, knowing cc, we calculate ni:
// ni = cc / (nt + 1)
//
int nthreads = (int) (cc / ninst) - 1;
doLog(methodName, stats.getInfo());
additions = 0;
deletions = null;
int new_ni = 0;
// what we're looking for it Tt: the average time it takes on thread to compute one unit of work.
if ( Q == 0 ) {
if ( (pc != 0) && ( !fast_shrink) ) {
doLog(methodName, "Inhibit shrinkage because pc =", pc, "Q =", Q);
deletion_window[cursor] = 0;
} else {
deletion_window[cursor] = 1;
}
expansion_window[cursor] = 0; // no expansion if no queue
} else {
Ti = (eT / Q) * active;
Tt = Ti * nthreads;
// we want to try to keep the eT at some factor of the Tt. This stuff varies linearly so it's
// relatively easy. If eTd is desired eT, and the goal is g:
double g = Tt * goal; // we want to get eT close to this
double r = eT / g; // the ratio of current queue to to desired, which is the amount we need to
// change the instances by
doLog(methodName, "eT", eT, "Q", Q, "cc", cc, "Ti", Ti, "Tt", Tt, "g", g, "r", r,
"active", active, "ninstances", ninst, "max_instances", max_instances);
if ( r > 1 ) { // we want moref
// We must smooth it out. First get the delta instances.
// Then cap it accounting for instances started but not yet connected to the q
// Then cap it on the max for the service.
// Then cap it on (arbitrarily) 5 so we don't blast too many new instances at once
new_ni = (int) Math.ceil(active * r); // delta
new_ni = Math.max(new_ni - ninst, 0); // here we account for intances that aren't yet started
new_ni = Math.min(new_ni, max_instances); // cap on configured max
new_ni = Math.min(new_ni, max_growth); // cap on growth rate
if ( new_ni > 0 ) { // do we actually expand in the end?
doLog(methodName, "Expand, new_n1:", new_ni);
expansion_window[cursor] = 1;
} else {
doLog(methodName, "Don't expand, new_n1:", new_ni);
expansion_window[cursor] = 0;
}
} else {
doLog(methodName, "Don't expand, r < 1.0:", r);
expansion_window[cursor] = 0;
}
if ( r < .5 ) { // we're over-provisioned to the goal
if ( (pc != 0) && (!fast_shrink) ) {
// never shrink if there are producers
doLog(methodName, "Inhibit shrinkage because pc =", pc, "r=", r);
deletion_window[cursor] = 0;
} else {
doLog(methodName, "Allow shrinkage: r =", r, "pc =", pc);
deletion_window[cursor] = 1; // if we stay that way we'll try dropping one instance
}
} else {
deletion_window[cursor] = 0; // don't delete if there's a queue
}
}
doLog(methodName, "Expansion window:", Arrays.toString(expansion_window));
doLog(methodName, "Deletion window:", Arrays.toString(deletion_window));
// more smoothing, only expand every few (expansion_period) instances, and only if we've been needing more
// for at while
int etot = 0;
int dtot = 0;
for (int i = 0; i < window_size; i++ ) {
etot += expansion_window[i];
dtot += deletion_window[i];
}
if ( (etot == expansion_window.length) && (new_ni > 0) ) {
additions = new_ni;;
deletions = null;
if ( ninst > active ) {
additions = 0; // don't expand if we're still waiting for these to come alive
} else {
expansion_window[cursor] = 0; // if we expand we use one slot in order to govern expansion
}
}
if ( dtot == deletion_window.length ) {
additions = 0;
if ( ninst > min_instances ) {
deletions = new Long[1];
deletions[0] = act_instances[act_instances.length - 1];
doLog(methodName, "Deletions:", deletions[0]);
deletion_window[cursor] = 0; // if we shrink we lose one slot to govern shrinkage
}
}
doLog(methodName, "Cursor before:", cursor, "window_size", window_size);
cursor = ++cursor % window_size;
doLog(methodName, "Cursor after:", cursor);
}
// ================================================================================
// NEW INTERFACES
// ================================================================================
/**
* Implement this to indicate how many new instances to start. The value here
* is calculted above in calculateNewDeployment.
*/
public int getAdditions()
{
return additions;
}
/**
* Implement this to indicate how many instances to stop. The value here
* is calculted above in calculateNewDeployment.
*/
public Long[] getDeletions()
{
return deletions;
}
// ================================================================================
// END NEW INTERFACES
// ================================================================================
/**
* This is a callback class for the UIMA-AS get-meta, that tells us which process on
* which host responded to the get-meta.
*/
class UimaCbListener extends UimaAsBaseCallbackListener
{
public UimaCbListener()
{
}
public void ok()
{
// String methodName = "UimaAsPing:get-meta";
// logger.info(methodName, null, "Get-Meta received from ", nodeIp, "PID", pid);
gmfail = false;
}
public void timeout()
{
String methodName = "UimaAsPing:get-meta";
doLog(methodName, null, "Get-Meta timeout from ", nodeIp, "PID", pid);
gmfail = true;
}
public void onBeforeMessageSend(UimaASProcessStatus status)
{
}
// private void onBeforeMessageSendHandler(UimaASProcessStatus status)
// {
// }
public void onBeforeProcessMeta(String IP, String p)
{
//String methodName = "UimaAsPing:onBeforeProcessMeta";
//doLog(methodName, null, "Get-Meta received from ", IP, ":", p, "for", ep);
pid = p;
nodeIp = IP;
}
// private void onBeforeProcessCASHandler(UimaASProcessStatus status, String nodeIP, String pid)
// {
// }
public void initializationComplete(EntityProcessStatus aStatus)
{
}
public void entityProcessComplete(CAS aCas, EntityProcessStatus aStatus)
{
}
public void collectionProcessComplete(EntityProcessStatus aStatus)
{
}
}
}