blob: 6301bbefcaf3309df81649b534341bed9296a53a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.ipc.stats;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.avro.Protocol.Message;
import org.apache.avro.ipc.RPCContext;
import org.apache.avro.ipc.RPCPlugin;
import org.apache.avro.ipc.stats.Histogram.Segmenter;
import org.apache.avro.ipc.stats.Stopwatch.Ticks;
/**
* Collects count and latency statistics about RPC calls. Keeps
* data for every method. Can be added to a Requestor (client)
* or Responder (server).
*
* This uses milliseconds as the standard unit of measure
* throughout the class, stored in floats.
*/
public class StatsPlugin extends RPCPlugin {
/** Static declaration of histogram buckets. */
static final Segmenter<String, Float> LATENCY_SEGMENTER =
new Histogram.TreeMapSegmenter<Float>(new TreeSet<Float>(Arrays.asList(
0f,
25f,
50f,
75f,
100f,
200f,
300f,
500f,
750f,
1000f, // 1 second
2000f,
5000f,
10000f,
60000f, // 1 minute
600000f)));
static final Segmenter<String, Integer> PAYLOAD_SEGMENTER =
new Histogram.TreeMapSegmenter<Integer>(new TreeSet<Integer>(Arrays.asList(
0,
25,
50,
75,
100,
200,
300,
500,
750,
1000, // 1 k
2000,
5000,
10000,
50000,
100000)));
/** Per-method histograms.
* Must be accessed while holding a lock. */
Map<Message, FloatHistogram<?>> methodTimings =
new HashMap<Message, FloatHistogram<?>>();
Map<Message, IntegerHistogram<?>> sendPayloads =
new HashMap<Message, IntegerHistogram<?>>();
Map<Message, IntegerHistogram<?>> receivePayloads =
new HashMap<Message, IntegerHistogram<?>>();
/** RPCs in flight. */
ConcurrentMap<RPCContext, Stopwatch> activeRpcs =
new ConcurrentHashMap<RPCContext, Stopwatch>();
private Ticks ticks;
/** How long I've been alive */
public Date startupTime = new Date();
private Segmenter<?, Float> floatSegmenter;
private Segmenter<?, Integer> integerSegmenter;
/** Construct a plugin with custom Ticks and Segmenter implementations. */
StatsPlugin(Ticks ticks, Segmenter<?, Float> floatSegmenter,
Segmenter<?, Integer> integerSegmenter) {
this.floatSegmenter = floatSegmenter;
this.integerSegmenter = integerSegmenter;
this.ticks = ticks;
}
/** Construct a plugin with default (system) ticks, and default
* histogram segmentation. */
public StatsPlugin() {
this(Stopwatch.SYSTEM_TICKS, LATENCY_SEGMENTER, PAYLOAD_SEGMENTER);
}
/**
* Helper to get the size of an RPC payload.
*/
private int getPayloadSize(List<ByteBuffer> payload) {
if (payload == null) {
return 0;
}
int size = 0;
for (ByteBuffer bb: payload) {
size = size + bb.limit();
}
return size;
}
@Override
public void serverReceiveRequest(RPCContext context) {
Stopwatch t = new Stopwatch(ticks);
t.start();
this.activeRpcs.put(context, t);
synchronized(receivePayloads) {
IntegerHistogram<?> h = receivePayloads.get(context.getMessage());
if (h == null) {
h = createNewIntegerHistogram();
receivePayloads.put(context.getMessage(), h);
}
h.add(getPayloadSize(context.getRequestPayload()));
}
}
@Override
public void serverSendResponse(RPCContext context) {
Stopwatch t = this.activeRpcs.remove(context);
t.stop();
publish(context, t);
synchronized(sendPayloads) {
IntegerHistogram<?> h = sendPayloads.get(context.getMessage());
if (h == null) {
h = createNewIntegerHistogram();
sendPayloads.put(context.getMessage(), h);
}
h.add(getPayloadSize(context.getResponsePayload()));
}
}
@Override
public void clientSendRequest(RPCContext context) {
Stopwatch t = new Stopwatch(ticks);
t.start();
this.activeRpcs.put(context, t);
synchronized(sendPayloads) {
IntegerHistogram<?> h = sendPayloads.get(context.getMessage());
if (h == null) {
h = createNewIntegerHistogram();
sendPayloads.put(context.getMessage(), h);
}
h.add(getPayloadSize(context.getRequestPayload()));
}
}
@Override
public void clientReceiveResponse(RPCContext context) {
Stopwatch t = this.activeRpcs.remove(context);
t.stop();
publish(context, t);
synchronized(receivePayloads) {
IntegerHistogram<?> h = receivePayloads.get(context.getMessage());
if (h == null) {
h = createNewIntegerHistogram();
receivePayloads.put(context.getMessage(), h);
}
h.add(getPayloadSize(context.getRequestPayload()));
}
}
/** Adds timing to the histograms. */
private void publish(RPCContext context, Stopwatch t) {
Message message = context.getMessage();
if (message == null) throw new IllegalArgumentException();
synchronized(methodTimings) {
FloatHistogram<?> h = methodTimings.get(context.getMessage());
if (h == null) {
h = createNewFloatHistogram();
methodTimings.put(context.getMessage(), h);
}
h.add(nanosToMillis(t.elapsedNanos()));
}
}
@SuppressWarnings("unchecked")
private FloatHistogram<?> createNewFloatHistogram() {
return new FloatHistogram(floatSegmenter);
}
@SuppressWarnings("unchecked")
private IntegerHistogram<?> createNewIntegerHistogram() {
return new IntegerHistogram(integerSegmenter);
}
/** Converts nanoseconds to milliseconds. */
static float nanosToMillis(long elapsedNanos) {
return elapsedNanos / 1000000.0f;
}
}