blob: 23fdef70e25d75471268ae8864c9064d903dfb66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.common.metric;
import com.google.common.collect.HashBiMap;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.coder.DecoderFactory;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.common.exception.MetricException;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.executionproperty.ExecutionProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.*;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
/**
* Utility class for metrics.
*/
public final class MetricUtils {
private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class.getName());
private static final CountDownLatch METADATA_LOADED = new CountDownLatch(1);
private static final CountDownLatch MUST_UPDATE_EP_KEY_METADATA = new CountDownLatch(1);
private static final CountDownLatch MUST_UPDATE_EP_METADATA = new CountDownLatch(1);
private static final Pair<HashBiMap<Integer, Class<? extends ExecutionProperty>>,
HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>> METADATA = loadMetaData();
// BiMap of (1) INDEX and (2) the Execution Property class
private static final HashBiMap<Integer, Class<? extends ExecutionProperty>>
EP_KEY_METADATA = METADATA.left();
// BiMap of (1) the Execution Property class INDEX and the value INDEX pair and (2) the Execution Property.
private static final HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>
EP_METADATA = METADATA.right();
private static final int VERTEX = 1;
private static final int EDGE = 2;
public static final String SQLITE_DB_NAME =
"jdbc:sqlite:" + MetricUtils.fetchProjectRootPath() + "/optimization_db.sqlite3";
public static final String POSTGRESQL_METADATA_DB_NAME =
"jdbc:postgresql://nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com:5432/nemo_optimization";
private static final String METADATA_TABLE_NAME = "nemo_optimization_meta";
/**
* Private constructor.
*/
private MetricUtils() {
}
/**
* Load the BiMaps (lightweight) Metadata from the DB.
* @return the loaded BiMaps, or initialized ones.
*/
private static Pair<HashBiMap<Integer, Class<? extends ExecutionProperty>>,
HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>>> loadMetaData() {
try (final Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME,
"postgres", "fake_password")) {
try (final Statement statement = c.createStatement()) {
statement.setQueryTimeout(30); // set timeout to 30 sec.
statement.executeUpdate(
"CREATE TABLE IF NOT EXISTS " + METADATA_TABLE_NAME
+ " (key TEXT NOT NULL UNIQUE, data BYTEA NOT NULL);");
final ResultSet rsl = statement.executeQuery(
"SELECT * FROM " + METADATA_TABLE_NAME + " WHERE key='EP_KEY_METADATA';");
LOG.info("Metadata can be loaded.");
if (rsl.next()) {
final HashBiMap<Integer, Class<? extends ExecutionProperty>> indexEpKeyBiMap =
SerializationUtils.deserialize(rsl.getBytes("Data"));
rsl.close();
final ResultSet rsr = statement.executeQuery(
"SELECT * FROM " + METADATA_TABLE_NAME + " WHERE key='EP_METADATA';");
if (rsr.next()) {
final HashBiMap<Pair<Integer, Integer>, ExecutionProperty<?>> indexEpBiMap =
SerializationUtils.deserialize(rsr.getBytes("Data"));
rsr.close();
METADATA_LOADED.countDown();
LOG.info("Metadata successfully loaded from DB.");
return Pair.of(indexEpKeyBiMap, indexEpBiMap);
} else {
METADATA_LOADED.countDown();
LOG.info("No initial metadata for EP.");
return Pair.of(indexEpKeyBiMap, HashBiMap.create());
}
} else {
METADATA_LOADED.countDown();
LOG.info("No initial metadata.");
return Pair.of(HashBiMap.create(), HashBiMap.create());
}
} catch (Exception e) {
LOG.warn("Loading metadata from DB failed: ", e);
return Pair.of(HashBiMap.create(), HashBiMap.create());
}
} catch (Exception e) {
LOG.warn("Loading metadata from DB failed : ", e);
return Pair.of(HashBiMap.create(), HashBiMap.create());
}
}
public static Boolean metaDataLoaded() {
return METADATA_LOADED.getCount() == 0;
}
/**
* Save the BiMaps to DB if changes are necessary (rarely executed).
*/
private static void updateMetaData() {
if (!metaDataLoaded()
|| (MUST_UPDATE_EP_METADATA.getCount() + MUST_UPDATE_EP_KEY_METADATA.getCount() == 2)) {
// no need to update
LOG.info("Not saving Metadata: metadata loaded: {}, Index-EP data: {}, Index-EP Key data: {}",
metaDataLoaded(), MUST_UPDATE_EP_METADATA.getCount() == 0, MUST_UPDATE_EP_KEY_METADATA.getCount() == 0);
return;
}
LOG.info("Saving Metadata..");
try (final Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME,
"postgres", "fake_password")) {
try (final Statement statement = c.createStatement()) {
statement.setQueryTimeout(30); // set timeout to 30 sec.
if (MUST_UPDATE_EP_KEY_METADATA.getCount() == 0) {
try (final PreparedStatement pstmt = c.prepareStatement(
"INSERT INTO " + METADATA_TABLE_NAME + " (key, data) "
+ "VALUES ('EP_KEY_METADATA', ?) ON CONFLICT (key) DO UPDATE SET data = excluded.data;")) {
pstmt.setBinaryStream(1,
new ByteArrayInputStream(SerializationUtils.serialize(EP_KEY_METADATA)));
pstmt.executeUpdate();
LOG.info("EP Key Metadata saved to DB.");
}
}
if (MUST_UPDATE_EP_METADATA.getCount() == 0) {
try (final PreparedStatement pstmt =
c.prepareStatement("INSERT INTO " + METADATA_TABLE_NAME + "(key, data) "
+ "VALUES ('EP_METADATA', ?) ON CONFLICT (key) DO UPDATE SET data = excluded.data;")) {
pstmt.setBinaryStream(1,
new ByteArrayInputStream(SerializationUtils.serialize(EP_METADATA)));
pstmt.executeUpdate();
LOG.info("EP Metadata saved to DB.");
}
}
}
} catch (SQLException e) {
LOG.warn("Saving of Metadata to DB failed: ", e);
}
}
/**
* Stringify execution properties of an IR DAG.
* @param irdag IR DAG to observe.
* @return the pair of stringified execution properties. Left is for vertices, right is for edges.
*/
static Pair<String, String> stringifyIRDAGProperties(final IRDAG irdag) {
final StringBuilder vStringBuilder = new StringBuilder();
final StringBuilder eStringBuilder = new StringBuilder();
irdag.getVertices().forEach(v ->
v.getExecutionProperties().forEachProperties(ep ->
epFormatter(vStringBuilder, VERTEX, v.getNumericId(), ep)));
irdag.getVertices().forEach(v ->
irdag.getIncomingEdgesOf(v).forEach(e ->
e.getExecutionProperties().forEachProperties(ep ->
epFormatter(eStringBuilder, EDGE, e.getNumericId(), ep))));
// Update the metric metadata if new execution property key / values have been discovered and updates are required.
updateMetaData();
return Pair.of(vStringBuilder.toString().trim(), eStringBuilder.toString().trim());
}
/**
* Formatter for execution properties. It updates the metadata for the metrics if new EP key / values are discovered.
* @param builder string builder to append the metrics to.
* @param idx index specifying whether it's a vertex or an edge. This should be one digit.
* @param numericId numeric ID of the vertex or the edge.
* @param ep the execution property.
*/
private static void epFormatter(final StringBuilder builder, final int idx,
final Integer numericId, final ExecutionProperty<?> ep) {
builder.append(idx);
builder.append(numericId);
builder.append("0");
final Integer epKeyIndex = EP_KEY_METADATA.inverse().computeIfAbsent(ep.getClass(), epClass -> {
// Update the metadata if new EP key has been discovered.
LOG.info("New EP Key Index: {} for {}", EP_KEY_METADATA.size() + 1, epClass.getSimpleName());
MUST_UPDATE_EP_KEY_METADATA.countDown();
return EP_KEY_METADATA.size() + 1;
});
builder.append(epKeyIndex);
builder.append(":");
final Integer epIndex = valueToIndex(epKeyIndex, ep);
builder.append(epIndex);
builder.append(" ");
}
/**
* Helper method to convert Execution Property value objects to an integer index.
* It updates the metadata for the metrics if new EP values are discovered.
* @param epKeyIndex the index of the execution property key.
* @param ep the execution property containing the value.
* @return the converted value index.
*/
private static Integer valueToIndex(final Integer epKeyIndex, final ExecutionProperty<?> ep) {
final Object o = ep.getValue();
if (o instanceof Enum) {
return ((Enum) o).ordinal();
} else if (o instanceof Integer) {
return (int) o;
} else if (o instanceof Boolean) {
return ((Boolean) o) ? 1 : 0;
} else {
final ExecutionProperty<?> ep1;
if (o instanceof EncoderFactory || o instanceof DecoderFactory) {
ep1 = EP_METADATA.values().stream()
.filter(ep2 -> ep2.getValue().toString().equals(o.toString()))
.findFirst().orElse(null);
} else {
ep1 = EP_METADATA.values().stream()
.filter(ep2 -> ep2.getValue().equals(o))
.findFirst().orElse(null);
}
if (ep1 != null) {
return EP_METADATA.inverse().get(ep1).right();
} else {
final Integer valueIndex = Math.toIntExact(EP_METADATA.keySet().stream()
.filter(pair -> pair.left().equals(epKeyIndex))
.count()) + 1;
// Update the metadata if new EP value has been discovered.
EP_METADATA.put(Pair.of(epKeyIndex, valueIndex), ep);
LOG.info("New EP Index: ({}, {}) for {}", epKeyIndex, valueIndex, ep);
MUST_UPDATE_EP_METADATA.countDown();
return valueIndex;
}
}
}
/**
* Finds the project root path.
* @return the project root path.
*/
private static String fetchProjectRootPath() {
return recursivelyFindLicense(Paths.get(System.getProperty("user.dir")));
}
/**
* Helper method to recursively find the LICENSE file.
* @param path the path to search for.
* @return the path containing the LICENSE file.
*/
private static String recursivelyFindLicense(final Path path) {
try (final Stream stream = Files.find(path, 1, (p, attributes) -> p.endsWith("LICENSE"))) {
if (stream.count() > 0) {
return path.toAbsolutePath().toString();
} else {
return recursivelyFindLicense(path.getParent());
}
} catch (IOException e) {
throw new MetricException(e);
}
}
}