| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker; |
| |
| import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; |
| |
| /** |
| * Data class comprising the average message data over a fixed period of time. |
| */ |
| public class TimeAverageMessageData { |
| // The maximum number of samples this data will consider. |
| private int maxSamples; |
| |
| // The number of samples that are currently available for this data. Always |
| // at most maxSamples. |
| private int numSamples; |
| |
| // The average throughput-in in bytes per second. |
| private double msgThroughputIn; |
| |
| // The average throughput-out in bytes per second. |
| private double msgThroughputOut; |
| |
| // The average message rate in per second. |
| private double msgRateIn; |
| |
| // The average message rate out per second. |
| private double msgRateOut; |
| |
| // For JSON only. |
| public TimeAverageMessageData() { |
| } |
| |
| /** |
| * Initialize this TimeAverageData to 0 values. |
| * |
| * @param maxSamples |
| * The maximum number of samples with which to maintain the average. |
| */ |
| public TimeAverageMessageData(final int maxSamples) { |
| this.maxSamples = maxSamples; |
| } |
| |
| /** |
| * Initialize this TimeAverageData using default stats. |
| * |
| * @param maxSamples |
| * The maximum number of samples with which to maintain the average. |
| * @param defaultStats |
| * The stats to default to. These are overwritten after the first update. |
| */ |
| public TimeAverageMessageData(final int maxSamples, final NamespaceBundleStats defaultStats) { |
| this.maxSamples = maxSamples; |
| msgThroughputIn = defaultStats.msgThroughputIn; |
| msgThroughputOut = defaultStats.msgThroughputOut; |
| msgRateIn = defaultStats.msgRateIn; |
| msgRateOut = defaultStats.msgRateOut; |
| } |
| |
| /** |
| * Update using new samples for the message data. |
| * |
| * @param newMsgThroughputIn |
| * Most recently observed throughput in. |
| * @param newMsgThroughputOut |
| * Most recently observed throughput out. |
| * @param newMsgRateIn |
| * Most recently observed message rate in. |
| * @param newMsgRateOut |
| * Most recently observed message rate out. |
| */ |
| public void update(final double newMsgThroughputIn, final double newMsgThroughputOut, final double newMsgRateIn, |
| final double newMsgRateOut) { |
| // If max samples has been reached, don't increase numSamples. |
| numSamples = Math.min(numSamples + 1, maxSamples); |
| msgThroughputIn = getUpdatedValue(msgThroughputIn, newMsgThroughputIn); |
| msgThroughputOut = getUpdatedValue(msgThroughputOut, newMsgThroughputOut); |
| msgRateIn = getUpdatedValue(msgRateIn, newMsgRateIn); |
| msgRateOut = getUpdatedValue(msgRateOut, newMsgRateOut); |
| } |
| |
| /** |
| * Update using a new bundle sample. |
| * |
| * @param newSample |
| * Most recently observed bundle stats. |
| */ |
| public void update(final NamespaceBundleStats newSample) { |
| update(newSample.msgThroughputIn, newSample.msgThroughputOut, newSample.msgRateIn, newSample.msgRateOut); |
| } |
| |
| // Update the average of a sample using the number of samples, the previous |
| // average, and a new sample. |
| private double getUpdatedValue(final double oldAverage, final double newSample) { |
| // Note that for numSamples == 1, this returns newSample. |
| // This ensures that default stats get overwritten after the first |
| // update. |
| return ((numSamples - 1) * oldAverage + newSample) / numSamples; |
| } |
| |
| public int getMaxSamples() { |
| return maxSamples; |
| } |
| |
| public void setMaxSamples(int maxSamples) { |
| this.maxSamples = maxSamples; |
| } |
| |
| public int getNumSamples() { |
| return numSamples; |
| } |
| |
| public void setNumSamples(int numSamples) { |
| this.numSamples = numSamples; |
| } |
| |
| public double getMsgThroughputIn() { |
| return msgThroughputIn; |
| } |
| |
| public void setMsgThroughputIn(double msgThroughputIn) { |
| this.msgThroughputIn = msgThroughputIn; |
| } |
| |
| public double getMsgThroughputOut() { |
| return msgThroughputOut; |
| } |
| |
| public void setMsgThroughputOut(double msgThroughputOut) { |
| this.msgThroughputOut = msgThroughputOut; |
| } |
| |
| public double getMsgRateIn() { |
| return msgRateIn; |
| } |
| |
| public void setMsgRateIn(double msgRateIn) { |
| this.msgRateIn = msgRateIn; |
| } |
| |
| public double getMsgRateOut() { |
| return msgRateOut; |
| } |
| |
| public void setMsgRateOut(double msgRateOut) { |
| this.msgRateOut = msgRateOut; |
| } |
| |
| /** |
| * Get the total message rate. |
| * |
| * @return Message rate in + message rate out. |
| */ |
| public double totalMsgRate() { |
| return msgRateIn + msgRateOut; |
| } |
| |
| /** |
| * Get the total message throughput. |
| * |
| * @return Message throughput in + message throughput out. |
| */ |
| public double totalMsgThroughput() { |
| return msgThroughputIn + msgThroughputOut; |
| } |
| } |