blob: 160e8bfb5b775283108b6b2e4341372749e68ae2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.master.metric;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nemo.common.exception.MetricException;
import org.apache.nemo.common.exception.UnsupportedMetricException;
import org.apache.nemo.runtime.common.metric.*;
import org.apache.nemo.runtime.common.state.PlanState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.lang.management.ManagementFactory;
import java.sql.*;
import java.util.*;
/**
* MetricStore stores metric data which will be used by web visualize interface, logging, and so on.
* All metric classes should be JSON-serializable by {@link ObjectMapper}.
*/
public final class MetricStore {
private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class.getName());
private final Map<Class<? extends Metric>, Map<String, Object>> metricMap = new HashMap<>();
// You can add more metrics by adding item to this metricList list.
private final Map<String, Class<? extends Metric>> metricList = new HashMap<>();
/**
* Private constructor.
*/
private MetricStore() {
metricList.put("JobMetric", JobMetric.class);
metricList.put("StageMetric", StageMetric.class);
metricList.put("TaskMetric", TaskMetric.class);
}
/**
* Getter for singleton instance.
* @return MetricStore object.
*/
public static MetricStore getStore() {
return InstanceHolder.INSTANCE;
}
/**
* Lazy class object holder for MetricStore class.
*/
private static class InstanceHolder {
private static final MetricStore INSTANCE = new MetricStore();
}
public <T extends Metric> Class<T> getMetricClassByName(final String className) {
if (!metricList.keySet().contains(className)) {
throw new NoSuchElementException();
}
return (Class<T>) metricList.get(className);
}
/**
* Store a metric object. Metric object should implement {@link Metric} interface.
* This method will store a metric into a {@link Map}, which have metric's id as its key.
* @param metric metric object.
* @param <T> class of metric
*/
public <T extends Metric> void putMetric(final T metric) {
final Class<? extends Metric> metricClass = metric.getClass();
if (!metricList.values().contains(metricClass)) {
throw new UnsupportedMetricException(new Throwable("Unsupported metric"));
}
metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).putIfAbsent(metric.getId(), metric);
}
/**
* Fetch metric by its metric class instance and its id.
* @param metricClass class instance of metric.
* @param id metric id, which can be fetched by getPlanId() method.
* @param <T> class of metric
* @return a metric object.
*/
public <T extends Metric> T getMetricWithId(final Class<T> metricClass, final String id) {
final T metric = (T) metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).get(id);
if (metric == null) {
throw new NoSuchElementException("No metric found");
}
return metric;
}
/**
* Fetch metric map by its metric class instance.
* @param metricClass class instance of metric.
* @param <T> class of metric
* @return a metric object.
*/
public <T extends Metric> Map<String, Object> getMetricMap(final Class<T> metricClass) {
return metricMap.computeIfAbsent(metricClass, k -> new HashMap<>());
}
/**
* Same as getMetricWithId(), but if there is no such metric, it will try to create new metric object
* using its constructor, which takes an id as a parameter.
* @param metricClass class of metric.
* @param id metric id, which can be fetched by getPlanId() method.
* @param <T> class of metric
* @return a metric object. If there was no such metric, newly create one.
*/
public <T extends Metric> T getOrCreateMetric(final Class<T> metricClass, final String id) {
T metric = (T) metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).get(id);
if (metric == null) {
try {
metric = metricClass.getConstructor(new Class[]{String.class}).newInstance(id);
putMetric(metric);
} catch (final Exception e) {
throw new MetricException(e);
}
}
return metric;
}
private void generatePreprocessedJsonFromMetricEntry(final Map.Entry<String, Object> idToMetricEntry,
final JsonGenerator jsonGenerator,
final ObjectMapper objectMapper) throws IOException {
final JsonNode jsonNode = objectMapper.valueToTree(idToMetricEntry.getValue());
jsonGenerator.writeFieldName(idToMetricEntry.getKey());
jsonGenerator.writeStartObject();
jsonGenerator.writeFieldName("id");
jsonGenerator.writeString(idToMetricEntry.getKey());
jsonGenerator.writeFieldName("data");
jsonGenerator.writeTree(jsonNode);
jsonGenerator.writeEndObject();
}
/**
* Dumps JSON-serialized string of specific metric.
* @param metricClass class of metric.
* @param <T> type of the metric to dump
* @return dumped JSON string of all metric.
* @throws IOException when failed to write json.
*/
public <T extends Metric> String dumpMetricToJson(final Class<T> metricClass) throws IOException {
final ObjectMapper objectMapper = new ObjectMapper();
final JsonFactory jsonFactory = new JsonFactory();
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try (final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8)) {
jsonGenerator.setCodec(objectMapper);
jsonGenerator.writeStartObject();
jsonGenerator.writeFieldName(metricClass.getSimpleName());
jsonGenerator.writeStartObject();
for (final Map.Entry<String, Object> idToMetricEntry : getMetricMap(metricClass).entrySet()) {
generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper);
}
jsonGenerator.writeEndObject();
jsonGenerator.writeEndObject();
}
return stream.toString();
}
/**
* Dumps JSON-serialized string of all stored metric.
* @return dumped JSON string of all metric.
* @throws IOException when failed to write file.
*/
public synchronized String dumpAllMetricToJson() throws IOException {
final ObjectMapper objectMapper = new ObjectMapper();
final JsonFactory jsonFactory = new JsonFactory();
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try (final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8)) {
jsonGenerator.setCodec(objectMapper);
jsonGenerator.writeStartObject();
for (final Map.Entry<Class<? extends Metric>, Map<String, Object>> metricMapEntry : metricMap.entrySet()) {
jsonGenerator.writeFieldName(metricMapEntry.getKey().getSimpleName());
jsonGenerator.writeStartObject();
for (final Map.Entry<String, Object> idToMetricEntry : metricMapEntry.getValue().entrySet()) {
generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper);
}
jsonGenerator.writeEndObject();
}
jsonGenerator.writeEndObject();
}
return stream.toString();
}
/**
* Same as dumpAllMetricToJson(), but this will save it to the file.
* @param filePath path to dump JSON.
*/
public void dumpAllMetricToFile(final String filePath) {
try (final BufferedWriter writer = new BufferedWriter(new FileWriter(filePath))) {
final String jsonDump = dumpAllMetricToJson();
writer.write(jsonDump);
} catch (final IOException e) {
throw new MetricException(e);
}
}
/**
* Save the job metrics for the optimization to the DB, in the form of LibSVM, to a local SQLite DB.
* The metrics are as follows: the JCT (duration), and the IR DAG execution properties.
*/
private void saveOptimizationMetricsToLocal() {
final String[] syntax = {"INTEGER PRIMARY KEY AUTOINCREMENT"};
try (final Connection c = DriverManager.getConnection(MetricUtils.SQLITE_DB_NAME)) {
LOG.info("Opened database successfully at {}", MetricUtils.SQLITE_DB_NAME);
saveOptimizationMetrics(c, syntax);
} catch (SQLException e) {
LOG.error("Error while saving optimization metrics to SQLite: {}", e);
}
}
/**
* Save the job metrics for the optimization to the DB, in the form of LibSVM, to a remote DB, if applicable.
* The metrics are as follows: the JCT (duration), and the IR DAG execution properties.
*/
public void saveOptimizationMetricsToDB(final String address, final String id, final String passwd) {
final String[] syntax = {"SERIAL PRIMARY KEY"};
if (!MetricUtils.metaDataLoaded()) {
saveOptimizationMetricsToLocal();
return;
}
try (final Connection c = DriverManager.getConnection(address, id, passwd)) {
LOG.info("Opened database successfully at {}", MetricUtils.POSTGRESQL_METADATA_DB_NAME);
saveOptimizationMetrics(c, syntax);
} catch (SQLException e) {
LOG.error("Error while saving optimization metrics to PostgreSQL: {}", e);
LOG.info("Saving metrics on the local SQLite DB");
saveOptimizationMetricsToLocal();
}
}
/**
* Save the job metrics for the optimization to the DB, in the form of LibSVM.
* @param c the connection to the DB.
* @param syntax the db-specific syntax.
*/
private void saveOptimizationMetrics(final Connection c, final String[] syntax) {
try (final Statement statement = c.createStatement()) {
statement.setQueryTimeout(30); // set timeout to 30 sec.
getMetricMap(JobMetric.class).values().forEach(o -> {
final JobMetric jobMetric = (JobMetric) o;
final String tableName = jobMetric.getIrDagSummary();
final long startTime = jobMetric.getStateTransitionEvents().stream()
.filter(ste -> ste.getPrevState().equals(PlanState.State.READY)
&& ste.getNewState().equals(PlanState.State.EXECUTING))
.findFirst().orElseThrow(() -> new MetricException("job has never started"))
.getTimestamp();
final long endTime = jobMetric.getStateTransitionEvents().stream()
.filter(ste -> ste.getNewState().equals(PlanState.State.COMPLETE))
.findFirst().orElseThrow(() -> new MetricException("job has never completed"))
.getTimestamp();
final long duration = endTime - startTime; // ms
final String vertexProperties = jobMetric.getVertexProperties();
final String edgeProperties = jobMetric.getEdgeProperties();
final Long inputSize = jobMetric.getInputSize();
final long jvmMemSize = Runtime.getRuntime().maxMemory();
final long memSize = ((com.sun.management.OperatingSystemMXBean) ManagementFactory
.getOperatingSystemMXBean()).getTotalPhysicalMemorySize();
try {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName
+ " (id " + syntax[0] + ", duration INTEGER NOT NULL, inputsize INTEGER NOT NULL, "
+ "jvmmemsize BIGINT NOT NULL, memsize BIGINT NOT NULL, "
+ "vertex_properties TEXT NOT NULL, edge_properties TEXT NOT NULL, "
+ "created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);");
LOG.info("CREATED TABLE For {} IF NOT PRESENT", tableName);
statement.executeUpdate("INSERT INTO " + tableName
+ " (duration, inputsize, jvmmemsize, memsize, vertex_properties, edge_properties) "
+ "VALUES (" + duration + ", " + inputSize + ", "
+ jvmMemSize + ", " + memSize + ", '"
+ vertexProperties + "', '" + edgeProperties + "');");
LOG.info("Recorded metrics on the table for {}", tableName);
} catch (SQLException e) {
LOG.error("Error while saving optimization metrics: {}", e);
}
});
} catch (SQLException e) {
LOG.error("Error while saving optimization metrics: {}", e);
}
}
/**
* Send changed metric data to {@link MetricBroadcaster}, which will broadcast it to
* all active WebSocket sessions. This method should be called manually if you want to
* send changed metric data to the frontend client. Also this method is synchronized.
* @param metricClass class of the metric.
* @param id id of the metric.
* @param <T> type of the metric to broadcast
*/
public synchronized <T extends Metric> void triggerBroadcast(final Class<T> metricClass, final String id) {
final MetricBroadcaster metricBroadcaster = MetricBroadcaster.getInstance();
final ObjectMapper objectMapper = new ObjectMapper();
final T metric = getMetricWithId(metricClass, id);
final JsonFactory jsonFactory = new JsonFactory();
final ByteArrayOutputStream stream = new ByteArrayOutputStream();
try (final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8)) {
jsonGenerator.setCodec(objectMapper);
jsonGenerator.writeStartObject();
jsonGenerator.writeFieldName("metricType");
jsonGenerator.writeString(metricClass.getSimpleName());
jsonGenerator.writeFieldName("data");
jsonGenerator.writeObject(metric);
jsonGenerator.writeEndObject();
metricBroadcaster.broadcast(stream.toString());
} catch (final IOException e) {
throw new MetricException(e);
}
}
}