blob: dab6d26715813768b57e375d9ad037b8b06c8066 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.control.common.job.profiling.om;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.job.profiling.IOperatorStats;
import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.control.common.job.profiling.StatsCollector;
import org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class TaskProfile extends AbstractProfile {
private static final long serialVersionUID = 1L;
private TaskAttemptId taskAttemptId;
private Map<PartitionId, PartitionProfile> partitionSendProfile;
private IStatsCollector statsCollector;
private Set<Warning> warnings;
private long totalWarningsCount;
public static TaskProfile create(DataInput dis) throws IOException {
TaskProfile taskProfile = new TaskProfile();
taskProfile.readFields(dis);
return taskProfile;
}
private TaskProfile() {
}
public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile,
IStatsCollector statsCollector, Set<Warning> warnings, long totalWarningsCount) {
this.taskAttemptId = taskAttemptId;
this.partitionSendProfile = new HashMap<>(partitionSendProfile);
this.statsCollector = statsCollector;
this.warnings = warnings;
this.totalWarningsCount = totalWarningsCount;
}
public TaskAttemptId getTaskId() {
return taskAttemptId;
}
public Map<PartitionId, PartitionProfile> getPartitionSendProfile() {
return partitionSendProfile;
}
@Override
public ObjectNode toJSON() {
ObjectMapper om = new ObjectMapper();
ObjectNode json = om.createObjectNode();
json.put("activity-id", taskAttemptId.getTaskId().getActivityId().toString());
json.put("partition", taskAttemptId.getTaskId().getPartition());
json.put("attempt", taskAttemptId.getAttempt());
if (partitionSendProfile != null) {
ArrayNode pspArray = om.createArrayNode();
for (PartitionProfile pp : partitionSendProfile.values()) {
ObjectNode ppObj = om.createObjectNode();
PartitionId pid = pp.getPartitionId();
ObjectNode pidObj = om.createObjectNode();
pidObj.put("job-id", pid.getJobId().toString());
pidObj.put("connector-id", pid.getConnectorDescriptorId().toString());
pidObj.put("sender-index", pid.getSenderIndex());
pidObj.put("receiver-index", pid.getReceiverIndex());
ppObj.set("partition-id", pidObj);
ppObj.put("open-time", pp.getOpenTime());
ppObj.put("close-time", pp.getCloseTime());
MultiResolutionEventProfiler samples = pp.getSamples();
ppObj.put("offset", samples.getOffset());
int resolution = samples.getResolution();
int sampleCount = samples.getCount();
ArrayNode ftA = om.createArrayNode();
int[] ft = samples.getSamples();
for (int i = 0; i < sampleCount; ++i) {
ftA.add(ft[i]);
}
ppObj.set("frame-times", ftA);
ppObj.put("resolution", resolution);
pspArray.add(ppObj);
}
json.set("partition-send-profile", pspArray);
}
populateCounters(json);
return json;
}
@Override
protected void populateCounters(ObjectNode json) {
ObjectMapper om = new ObjectMapper();
Map<String, IOperatorStats> opTimes = statsCollector.getAllOperatorStats();
ArrayNode countersObj = om.createArrayNode();
opTimes.forEach((key, value) -> {
ObjectNode jpe = om.createObjectNode();
jpe.put("name", key);
jpe.put("time", Double
.parseDouble(new DecimalFormat("#.####").format((double) value.getTimeCounter().get() / 1000000)));
jpe.put("disk-io", value.getDiskIoCounter().get());
countersObj.add(jpe);
});
json.set("counters", countersObj);
}
public IStatsCollector getStatsCollector() {
return statsCollector;
}
public Set<Warning> getWarnings() {
return warnings;
}
public long getTotalWarningsCount() {
return totalWarningsCount;
}
@Override
public void readFields(DataInput input) throws IOException {
super.readFields(input);
taskAttemptId = TaskAttemptId.create(input);
int size = input.readInt();
partitionSendProfile = new HashMap<>();
for (int i = 0; i < size; i++) {
PartitionId key = PartitionId.create(input);
PartitionProfile value = PartitionProfile.create(input);
partitionSendProfile.put(key, value);
}
statsCollector = StatsCollector.create(input);
warnings = new HashSet<>();
deserializeWarnings(input, warnings);
totalWarningsCount = input.readLong();
}
@Override
public void writeFields(DataOutput output) throws IOException {
super.writeFields(output);
taskAttemptId.writeFields(output);
output.writeInt(partitionSendProfile.size());
for (Entry<PartitionId, PartitionProfile> entry : partitionSendProfile.entrySet()) {
entry.getKey().writeFields(output);
entry.getValue().writeFields(output);
}
statsCollector.writeFields(output);
serializeWarnings(output);
output.writeLong(totalWarningsCount);
}
private void serializeWarnings(DataOutput output) throws IOException {
output.writeInt(warnings.size());
for (Warning warning : warnings) {
warning.writeFields(output);
}
}
private static void deserializeWarnings(DataInput input, Set<Warning> warnings) throws IOException {
int warnCount = input.readInt();
for (int i = 0; i < warnCount; i++) {
warnings.add(Warning.create(input));
}
}
}