| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.jobmetrics; |
| |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.processors.GridProcessorAdapter; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteReducer; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_JOBS_METRICS_CONCURRENCY_LEVEL; |
| |
| /** |
| * Processes job metrics. |
| */ |
| public class GridJobMetricsProcessor extends GridProcessorAdapter { |
| /** */ |
| private static final int CONCURRENCY_LEVEL = Integer.getInteger(IGNITE_JOBS_METRICS_CONCURRENCY_LEVEL, 64); |
| |
| /** Time to live. */ |
| private final long expireTime; |
| |
| /** Maximum size. */ |
| private final int histSize; |
| |
| /** */ |
| private final int queSize; |
| |
| /** */ |
| private volatile long idleTimer = U.currentTimeMillis(); |
| |
| /** */ |
| private final AtomicBoolean isIdle = new AtomicBoolean(true); |
| |
| /** */ |
| private volatile InternalMetrics metrics; |
| |
| /** |
| * @param ctx Grid kernal context. |
| */ |
| public GridJobMetricsProcessor(GridKernalContext ctx) { |
| super(ctx); |
| |
| expireTime = ctx.config().getMetricsExpireTime(); |
| histSize = ctx.config().getMetricsHistorySize(); |
| |
| assert histSize > 0 : histSize; |
| |
| final int minSize = histSize / CONCURRENCY_LEVEL + 1; |
| int size = 1; |
| |
| // Need to find power of 2 size. |
| while (size < minSize) |
| size <<= 1; |
| |
| queSize = size; |
| |
| reset(); |
| } |
| |
| /** |
| * Internal metrics object for atomic replacement. |
| */ |
| private class InternalMetrics { |
| /** */ |
| private volatile long totalIdleTime; |
| |
| /** */ |
| private volatile long curIdleTime; |
| |
| /** */ |
| private final SnapshotsQueue[] snapshotsQueues; |
| |
| /** |
| * |
| */ |
| InternalMetrics() { |
| if (CONCURRENCY_LEVEL < 0) |
| snapshotsQueues = null; |
| else { |
| snapshotsQueues = new SnapshotsQueue[CONCURRENCY_LEVEL]; |
| |
| for (int i = 0; i < snapshotsQueues.length; i++) |
| snapshotsQueues[i] = new SnapshotsQueue(queSize); |
| } |
| } |
| } |
| |
| /** |
| * Resets metrics. |
| */ |
| public void reset() { |
| InternalMetrics prevMetrics = metrics; |
| |
| metrics = new InternalMetrics(); |
| |
| // Preserve totalIdleTime, because it is used for busy / idle time calculations. |
| if (prevMetrics != null) |
| metrics.totalIdleTime = prevMetrics.totalIdleTime; |
| } |
| |
| /** |
| * Gets metrics history size. |
| * |
| * @return Maximum metrics queue size. |
| */ |
| int getHistorySize() { |
| return histSize; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| assertParameter(histSize > 0, "metricsHistorySize > 0"); |
| assertParameter(expireTime > 0, "metricsExpireTime > 0"); |
| |
| if (metrics.snapshotsQueues == null) |
| throw new IgniteCheckedException("Invalid concurrency level configured " + |
| "(is 'IGNITE_JOBS_METRICS_CONCURRENCY_LEVEL' system property properly set?)."); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Job metrics processor started [histSize=" + histSize + |
| ", concurLvl=" + CONCURRENCY_LEVEL + |
| ", expireTime=" + expireTime + ']'); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop(boolean cancel) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Job metrics processor stopped."); |
| } |
| |
| /** |
| * Gets latest metrics. |
| * |
| * @return Latest metrics. |
| */ |
| public GridJobMetrics getJobMetrics() { |
| long now = U.currentTimeMillis(); |
| |
| SnapshotReducer rdc = new SnapshotReducer(); |
| |
| InternalMetrics im = metrics; |
| |
| for (SnapshotsQueue q : im.snapshotsQueues) |
| q.reduce(rdc, now); |
| |
| GridJobMetrics m = rdc.reduce(); |
| |
| // Set idle times. |
| m.setCurrentIdleTime(im.curIdleTime); |
| m.setTotalIdleTime(im.totalIdleTime); |
| |
| return m; |
| } |
| |
| /** |
| * @param metrics New metrics. |
| */ |
| public void addSnapshot(GridJobMetricsSnapshot metrics) { |
| assert metrics != null; |
| |
| InternalMetrics m = this.metrics; |
| |
| m.snapshotsQueues[ThreadLocalRandom.current().nextInt(m.snapshotsQueues.length)].add(metrics); |
| |
| // Handle current and total idle times. |
| long idleTimer0 = idleTimer; |
| |
| if (metrics.getActiveJobs() > 0) { |
| if (isIdle.get() && isIdle.compareAndSet(true, false)) { |
| long now = U.currentTimeMillis(); |
| |
| // Node started to execute jobs after being idle. |
| m.totalIdleTime += now - idleTimer0; |
| |
| m.curIdleTime = 0; |
| } |
| } |
| else { |
| long now = U.currentTimeMillis(); |
| |
| if (!isIdle.compareAndSet(false, true)) { |
| // Node is still idle. |
| m.curIdleTime += now - idleTimer0; |
| |
| m.totalIdleTime += now - idleTimer0; |
| } |
| |
| // Reset timer. |
| idleTimer = now; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void printMemoryStats() { |
| X.println(">>>"); |
| X.println(">>> Job metrics processor processor memory stats [igniteInstanceName=" + |
| ctx.igniteInstanceName() + ']'); |
| } |
| |
| /** |
| * |
| */ |
| private class SnapshotsQueue { |
| /** */ |
| private int idx; |
| |
| /** */ |
| private final GridJobMetricsSnapshot[] snapshots; |
| |
| /** */ |
| private final int mask; |
| |
| /** */ |
| private int totalFinishedJobs; |
| |
| /** */ |
| private int totalCancelledJobs; |
| |
| /** */ |
| private int totalRejectedJobs; |
| |
| /** */ |
| private long totalExecTime; |
| |
| /** |
| * @param size Size (should be power of 2). |
| */ |
| private SnapshotsQueue(int size) { |
| assert size > 0 : size; |
| |
| snapshots = new GridJobMetricsSnapshot[size]; |
| |
| mask = size - 1; |
| } |
| |
| /** |
| * @param s Snapshot to add. |
| */ |
| synchronized void add(GridJobMetricsSnapshot s) { |
| snapshots[idx++ & mask] = s; |
| |
| totalFinishedJobs += s.getFinishedJobs(); |
| totalCancelledJobs += s.getCancelJobs(); |
| totalRejectedJobs += s.getRejectJobs(); |
| totalExecTime += s.getExecutionTime(); |
| } |
| |
| /** |
| * @param rdc Reducer. |
| * @param now Timestamp. |
| */ |
| synchronized void reduce(SnapshotReducer rdc, long now) { |
| assert rdc != null; |
| |
| for (GridJobMetricsSnapshot s : snapshots) { |
| if (s == null) |
| break; |
| |
| if (now - s.getTimestamp() > expireTime) |
| continue; |
| |
| rdc.collect(s); |
| } |
| |
| rdc.collectTotals(totalFinishedJobs, totalCancelledJobs, totalRejectedJobs, totalExecTime); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class SnapshotReducer implements IgniteReducer<GridJobMetricsSnapshot, GridJobMetrics> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private final GridJobMetrics m = new GridJobMetrics(); |
| |
| /** */ |
| private int cnt; |
| |
| /** */ |
| private int totalActiveJobs; |
| |
| /** */ |
| private int totalWaitingJobs; |
| |
| /** */ |
| private int totalStartedJobs; |
| |
| /** */ |
| private int totalCancelledJobs; |
| |
| /** */ |
| private int totalRejectedJobs; |
| |
| /** */ |
| private int totalFinishedJobs; |
| |
| /** */ |
| private long totalExecTime; |
| |
| /** */ |
| private long totalWaitTime; |
| |
| /** */ |
| private double totalCpuLoad; |
| |
| /** */ |
| private GridJobMetricsSnapshot lastSnapshot; |
| |
| /** {@inheritDoc} */ |
| @Override public boolean collect(GridJobMetricsSnapshot s) { |
| assert s != null; |
| |
| cnt++; |
| |
| if (lastSnapshot == null || lastSnapshot.getTimestamp() < s.getTimestamp()) |
| lastSnapshot = s; |
| |
| // Maximums. |
| if (m.getMaximumActiveJobs() < s.getActiveJobs()) |
| m.setMaximumActiveJobs(s.getActiveJobs()); |
| |
| if (m.getMaximumWaitingJobs() < s.getPassiveJobs()) |
| m.setMaximumWaitingJobs(s.getPassiveJobs()); |
| |
| if (m.getMaximumCancelledJobs() < s.getCancelJobs()) |
| m.setMaximumCancelledJobs(s.getCancelJobs()); |
| |
| if (m.getMaximumRejectedJobs() < s.getRejectJobs()) |
| m.setMaximumRejectedJobs(s.getRejectJobs()); |
| |
| if (m.getMaximumJobWaitTime() < s.getMaximumWaitTime()) |
| m.setMaximumJobWaitTime(s.getMaximumWaitTime()); |
| |
| if (m.getMaximumJobExecuteTime() < s.getMaximumExecutionTime()) |
| m.setMaxJobExecutionTime(s.getMaximumExecutionTime()); |
| |
| // Totals. |
| totalActiveJobs += s.getActiveJobs(); |
| totalCancelledJobs += s.getCancelJobs(); |
| totalWaitingJobs += s.getPassiveJobs(); |
| totalRejectedJobs += s.getRejectJobs(); |
| totalWaitTime += s.getWaitTime(); |
| totalExecTime += s.getExecutionTime(); |
| totalStartedJobs += s.getStartedJobs(); |
| totalFinishedJobs += s.getFinishedJobs(); |
| totalCpuLoad += s.getCpuLoad(); |
| |
| return true; |
| } |
| |
| /** |
| * @param totalFinishedJobs Finished jobs. |
| * @param totalCancelledJobs Cancelled jobs. |
| * @param totalRejectedJobs Rejected jobs. |
| */ |
| void collectTotals(int totalFinishedJobs, int totalCancelledJobs, int totalRejectedJobs, long totalExecTime) { |
| // Totals. |
| m.setTotalExecutedJobs(m.getTotalExecutedJobs() + totalFinishedJobs); |
| m.setTotalCancelledJobs(m.getTotalCancelledJobs() + totalCancelledJobs); |
| m.setTotalRejectedJobs(m.getTotalRejectedJobs() + totalRejectedJobs); |
| m.setTotalJobsExecutionTime(m.getTotalJobsExecutionTime() + totalExecTime); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridJobMetrics reduce() { |
| // Current metrics. |
| if (lastSnapshot != null) { |
| m.setCurrentActiveJobs(lastSnapshot.getActiveJobs()); |
| m.setCurrentWaitingJobs(lastSnapshot.getPassiveJobs()); |
| m.setCurrentCancelledJobs(lastSnapshot.getCancelJobs()); |
| m.setCurrentRejectedJobs(lastSnapshot.getRejectJobs()); |
| m.setCurrentJobExecutionTime(lastSnapshot.getMaximumExecutionTime()); |
| m.setCurrentJobWaitTime(lastSnapshot.getMaximumWaitTime()); |
| } |
| |
| // Averages. |
| if (cnt > 0) { |
| m.setAverageActiveJobs((float)totalActiveJobs / cnt); |
| m.setAverageWaitingJobs((float)totalWaitingJobs / cnt); |
| m.setAverageCancelledJobs((float)totalCancelledJobs / cnt); |
| m.setAverageRejectedJobs((float)totalRejectedJobs / cnt); |
| m.setAverageCpuLoad(totalCpuLoad / cnt); |
| } |
| |
| m.setAverageJobExecutionTime(totalFinishedJobs > 0 ? (double)totalExecTime / totalFinishedJobs : 0); |
| m.setAverageJobWaitTime(totalStartedJobs > 0 ? (double)totalWaitTime / totalStartedJobs : 0); |
| |
| return m; |
| } |
| } |
| } |