| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.nemo.runtime.common.metric; |
| |
| import com.google.common.collect.HashBiMap; |
| import org.apache.commons.lang3.SerializationUtils; |
| import org.apache.nemo.common.Pair; |
| import org.apache.nemo.common.Util; |
| import org.apache.nemo.common.coder.DecoderFactory; |
| import org.apache.nemo.common.coder.EncoderFactory; |
| import org.apache.nemo.common.exception.MetricException; |
| import org.apache.nemo.common.ir.IRDAG; |
| import org.apache.nemo.common.ir.executionproperty.ExecutionProperty; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.Serializable; |
| import java.io.UnsupportedEncodingException; |
| import java.lang.reflect.Method; |
| import java.sql.*; |
| import java.util.Arrays; |
| import java.util.Base64; |
| import java.util.Optional; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.function.Supplier; |
| import java.util.stream.IntStream; |
| |
| /** |
| * Utility class for metrics. |
| * TODO #372: This class should later be refactored into a separate metric package. |
| */ |
| public final class MetricUtils { |
| private static final Logger LOG = LoggerFactory.getLogger(MetricUtils.class.getName()); |
| |
| private static final CountDownLatch METADATA_LOADED = new CountDownLatch(1); |
| private static final CountDownLatch MUST_UPDATE_EP_KEY_METADATA = new CountDownLatch(1); |
| private static final CountDownLatch MUST_UPDATE_EP_METADATA = new CountDownLatch(1); |
| |
| // BiMap of (1) INDEX and (2) the Execution Property class and the value type class. |
| static final HashBiMap<Integer, Pair<Class<? extends ExecutionProperty>, Class<? extends Serializable>>> |
| EP_KEY_METADATA = HashBiMap.create(); |
| // BiMap of (1) the Execution Property class INDEX and the value INDEX pair and (2) the Execution Property value. |
| private static final HashBiMap<Pair<Integer, Integer>, ExecutionProperty<? extends Serializable>> |
| EP_METADATA = HashBiMap.create(); |
| |
| static { |
| try { |
| Class.forName("org.postgresql.Driver"); |
| } catch (ClassNotFoundException e) { |
| throw new MetricException("PostgreSQL Driver not found: " + e); |
| } |
| } |
| |
| public static final String SQLITE_DB_NAME = |
| "jdbc:sqlite:" + Util.fetchProjectRootPath() + "/optimization_db.sqlite3"; |
| public static final String POSTGRESQL_METADATA_DB_NAME = |
| "jdbc:postgresql://nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com:5432/nemo_optimization"; |
| private static final String METADATA_TABLE_NAME = "nemo_optimization_meta"; |
| private static final String SAVING_METADATA_FAIL_MSG = "Saving of Metadata to DB failed: "; |
| |
| /** |
| * Private constructor. |
| */ |
| private MetricUtils() { |
| } |
| |
| /** |
| * Method to derive db credentials. This method is not for real authentication. Please change the method accordingly. |
| * @return db credentials. |
| */ |
| private static String getCreds() { |
| try { |
| final String str = "ZmFrZV9wYXNzd29yZA=="; |
| byte[] decodedBytes = Base64.getDecoder().decode(str.getBytes("UTF-8")); |
| return new String(decodedBytes); |
| } catch (UnsupportedEncodingException e) { |
| throw new MetricException(e); |
| } |
| } |
| |
| /** |
| * Load the BiMaps (lightweight) Metadata from the DB. |
| * |
| * @return Whether or not the metadata has been successfully loaded from the DB. |
| */ |
| public static Boolean loadMetaData() { |
| try (Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME, |
| "postgres", getCreds())) { |
| try (Statement statement = c.createStatement()) { |
| statement.setQueryTimeout(30); // set timeout to 30 sec. |
| |
| statement.executeUpdate( |
| "CREATE TABLE IF NOT EXISTS " + METADATA_TABLE_NAME |
| + " (type TEXT NOT NULL, key INT NOT NULL UNIQUE, value BYTEA NOT NULL, " |
| + "created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);"); |
| |
| try (ResultSet rsl = statement.executeQuery( |
| "SELECT * FROM " + METADATA_TABLE_NAME + " WHERE type='EP_KEY_METADATA';")) { |
| LOG.info("Metadata can be successfully loaded."); |
| while (rsl.next()) { |
| EP_KEY_METADATA.put(rsl.getInt("key"), |
| SerializationUtils.deserialize(rsl.getBytes("value"))); |
| } |
| } |
| |
| try (ResultSet rsr = statement.executeQuery( |
| "SELECT * FROM " + METADATA_TABLE_NAME + " WHERE type='EP_METADATA';")) { |
| while (rsr.next()) { |
| final Integer l = rsr.getInt("key"); |
| EP_METADATA.put(Pair.of(l / 10000, 1 % 10000), |
| SerializationUtils.deserialize(rsr.getBytes("value"))); |
| } |
| } |
| METADATA_LOADED.countDown(); |
| LOG.info("Metadata successfully loaded from DB."); |
| } catch (Exception e) { |
| LOG.warn("Loading metadata from DB failed: ", e); |
| } |
| } catch (Exception e) { |
| LOG.warn("Loading metadata from DB failed : ", e); |
| } |
| return metaDataLoaded(); |
| } |
| |
| public static Boolean metaDataLoaded() { |
| return METADATA_LOADED.getCount() == 0; |
| } |
| |
| /** |
| * Save the BiMaps to DB if changes are necessary (rarely executed). |
| */ |
| private static void updateMetaData() { |
| if (!metaDataLoaded() |
| || (MUST_UPDATE_EP_METADATA.getCount() + MUST_UPDATE_EP_KEY_METADATA.getCount() == 2)) { |
| // no need to update |
| LOG.info("Not saving Metadata: metadata loaded: {}, Index-EP data: {}, Index-EP Key data: {}", |
| metaDataLoaded(), MUST_UPDATE_EP_METADATA.getCount() == 0, MUST_UPDATE_EP_KEY_METADATA.getCount() == 0); |
| return; |
| } |
| LOG.info("Saving Metadata.."); |
| |
| try (Connection c = DriverManager.getConnection(MetricUtils.POSTGRESQL_METADATA_DB_NAME, |
| "postgres", getCreds())) { |
| try (Statement statement = c.createStatement()) { |
| statement.setQueryTimeout(30); // set timeout to 30 sec. |
| |
| if (MUST_UPDATE_EP_KEY_METADATA.getCount() == 0) { |
| EP_KEY_METADATA.forEach((l, r) -> { |
| try { |
| insertOrUpdateMetadata(c, "EP_KEY_METADATA", l, r); |
| } catch (SQLException e) { |
| LOG.warn(SAVING_METADATA_FAIL_MSG, e); |
| } |
| }); |
| LOG.info("EP Key Metadata saved to DB."); |
| } |
| |
| if (MUST_UPDATE_EP_METADATA.getCount() == 0) { |
| EP_METADATA.forEach((l, r) -> { |
| try { |
| insertOrUpdateMetadata(c, "EP_METADATA", l.left() * 10000 + l.right(), r); |
| } catch (SQLException e) { |
| LOG.warn(SAVING_METADATA_FAIL_MSG, e); |
| } |
| }); |
| LOG.info("EP Metadata saved to DB."); |
| } |
| } |
| } catch (SQLException e) { |
| LOG.warn(SAVING_METADATA_FAIL_MSG, e); |
| } |
| } |
| |
| /** |
| * Utility method to save key, value to the metadata table. |
| * |
| * @param c the connection to the DB. |
| * @param type the key to write to the DB metadata table. |
| * @param key the key to write to the DB metadata table (integer). |
| * @param value the value to write to the DB metadata table (object). |
| * @throws SQLException SQLException on the way. |
| */ |
| private static void insertOrUpdateMetadata(final Connection c, final String type, |
| final Integer key, final Serializable value) throws SQLException { |
| try (PreparedStatement pstmt = c.prepareStatement( |
| "INSERT INTO " + METADATA_TABLE_NAME + " (type, key, value) " |
| + "VALUES ('" + type + "', " + key + ", ?) ON CONFLICT (key) DO UPDATE SET value = excluded.value;")) { |
| pstmt.setBinaryStream(1, new ByteArrayInputStream(SerializationUtils.serialize(value))); |
| pstmt.executeUpdate(); |
| } |
| } |
| |
| /** |
| * Stringify execution properties of an IR DAG. |
| * |
| * @param irdag IR DAG to observe. |
| * @return the pair of stringified execution properties. Left is for vertices, right is for edges. |
| */ |
| public static Pair<String, String> stringifyIRDAGProperties(final IRDAG irdag) { |
| final StringBuilder vStringBuilder = new StringBuilder(); |
| final StringBuilder eStringBuilder = new StringBuilder(); |
| |
| irdag.getVertices().forEach(v -> |
| v.getExecutionProperties().forEachProperties(ep -> |
| epFormatter(vStringBuilder, 1, v.getNumericId(), ep))); |
| |
| irdag.getVertices().forEach(v -> |
| irdag.getIncomingEdgesOf(v).forEach(e -> |
| e.getExecutionProperties().forEachProperties(ep -> |
| epFormatter(eStringBuilder, 2, e.getNumericId(), ep)))); |
| |
| // Update the metric metadata if new execution property key / values have been discovered and updates are required. |
| updateMetaData(); |
| return Pair.of(vStringBuilder.toString().trim(), eStringBuilder.toString().trim()); |
| } |
| |
| /** |
| * Formatter for execution properties. It updates the metadata for the metrics if new EP key / values are discovered. |
| * |
| * @param builder string builder to append the metrics to. |
| * @param idx index specifying whether it's a vertex or an edge. This should be one digit. |
| * @param numericId numeric ID of the vertex or the edge. |
| * @param ep the execution property. |
| */ |
| private static void epFormatter(final StringBuilder builder, final int idx, |
| final Integer numericId, final ExecutionProperty<?> ep) { |
| // Formatted into 9 digits: 0:vertex/edge 1-5:ID 5-9:EP Index. |
| builder.append(idx); |
| builder.append(String.format("%04d", numericId)); |
| final Integer epKeyIndex = getEpKeyIndex(ep); |
| builder.append(String.format("%04d", epKeyIndex)); |
| |
| // Format value to an index. |
| builder.append(":"); |
| final Integer epIndex = valueToIndex(epKeyIndex, ep); |
| builder.append(epIndex); |
| builder.append(" "); |
| } |
| |
| /** |
| * Get the EP Key index from the metadata. |
| * |
| * @param ep the EP to retrieve the Key index of. |
| * @return the Key index. |
| */ |
| static Integer getEpKeyIndex(final ExecutionProperty<?> ep) { |
| return EP_KEY_METADATA.inverse() |
| .computeIfAbsent(Pair.of(ep.getClass(), getParameterType(ep.getClass(), ep.getValue().getClass())), |
| epClassPair -> { |
| final Integer idx = EP_KEY_METADATA.keySet().stream().mapToInt(i -> i).max().orElse(0) + 1; |
| LOG.info("New EP Key Index: {} for {}", idx, epClassPair.left().getSimpleName()); |
| // Update the metadata if new EP key has been discovered. |
| MUST_UPDATE_EP_KEY_METADATA.countDown(); |
| return idx; |
| }); |
| } |
| |
| /** |
| * Recursive method for getting the parameter type of the execution property. |
| * This can be used, for example, to get DecoderFactory, instead of BeamDecoderFactory. |
| * |
| * @param epClass execution property class to observe. |
| * @param valueClass the value class of the execution property. |
| * @return the parameter type. |
| */ |
| private static Class<? extends Serializable> getParameterType(final Class<? extends ExecutionProperty> epClass, |
| final Class<? extends Serializable> valueClass) { |
| if (!getMethodFor(epClass, "of", valueClass.getSuperclass()).isPresent() |
| || !(Serializable.class.isAssignableFrom(valueClass.getSuperclass()))) { |
| final Class<? extends Serializable> candidate = Arrays.stream(valueClass.getInterfaces()) |
| .filter(vc -> Serializable.class.isAssignableFrom(vc) && getMethodFor(epClass, "of", vc).isPresent()) |
| .map(vc -> getParameterType(epClass, ((Class<? extends Serializable>) vc))).findFirst().orElse(null); |
| return candidate == null ? valueClass : candidate; |
| } else { |
| return getParameterType(epClass, ((Class<? extends Serializable>) valueClass.getSuperclass())); |
| } |
| } |
| |
| /** |
| * Utility method to getting an optional method called 'name' for the class. |
| * |
| * @param clazz class to get the method of. |
| * @param name the name of the method. |
| * @param valueTypes the value types of the method. |
| * @return optional of the method. It returns Optional.empty() if the method could not be found. |
| */ |
| public static Optional<Method> getMethodFor(final Class<? extends ExecutionProperty> clazz, |
| final String name, final Class<?>... valueTypes) { |
| try { |
| final Method mthd = clazz.getMethod(name, valueTypes); |
| return Optional.of(mthd); |
| } catch (final Exception e) { |
| return Optional.empty(); |
| } |
| } |
| |
| /** |
| * Inverse method of the #getEpKeyIndex method. |
| * |
| * @param index the index of the EP Key. |
| * @return the class of the execution property (EP), as well as the type of the value of the EP. |
| */ |
| private static Pair<Class<? extends ExecutionProperty>, Class<? extends Serializable>> getEpPairFromKeyIndex( |
| final Integer index) { |
| return EP_KEY_METADATA.get(index); |
| } |
| |
| /** |
| * Helper method to convert Execution Property value objects to an integer index. |
| * It updates the metadata for the metrics if new EP values are discovered. |
| * |
| * @param epKeyIndex the index of the execution property key. |
| * @param ep the execution property containing the value. |
| * @return the converted value index. |
| */ |
| static Integer valueToIndex(final Integer epKeyIndex, final ExecutionProperty<?> ep) { |
| final Object o = ep.getValue(); |
| |
| if (o instanceof Enum) { |
| return ((Enum) o).ordinal(); |
| } else if (o instanceof Integer) { |
| return (int) o; |
| } else if (o instanceof Boolean) { |
| return ((Boolean) o) ? 1 : 0; |
| } else { |
| final ExecutionProperty<? extends Serializable> ep1; |
| if (o instanceof EncoderFactory || o instanceof DecoderFactory) { |
| ep1 = EP_METADATA.values().stream() |
| .filter(ep2 -> ep2.getValue().toString().equals(o.toString()) || ep2.getValue().equals(o)) |
| .findFirst().orElse(null); |
| } else { |
| ep1 = EP_METADATA.values().stream() |
| .filter(ep2 -> ep2.getValue().equals(o)) |
| .findFirst().orElse(null); |
| } |
| |
| if (ep1 != null) { |
| return EP_METADATA.inverse().get(ep1).right(); |
| } else { |
| final Integer valueIndex = EP_METADATA.keySet().stream() |
| .filter(pair -> pair.left().equals(epKeyIndex)) |
| .mapToInt(Pair::right).max().orElse(0) + 1; |
| // Update the metadata if new EP value has been discovered. |
| EP_METADATA.put(Pair.of(epKeyIndex, valueIndex), ep); |
| LOG.info("New EP Index: ({}, {}) for {}", epKeyIndex, valueIndex, ep); |
| MUST_UPDATE_EP_METADATA.countDown(); |
| return valueIndex; |
| } |
| } |
| } |
| |
| /** |
| * Helper method to do the opposite of the #valueToIndex method. |
| * It receives the split, and the direction of the tweak value (which show the target index value), |
| * and returns the actual value which the execution property uses. |
| * |
| * @param split the split value, from which to start from. |
| * @param tweak the tweak value, to which we should tweak the split value. |
| * @param epKeyIndex the EP Key index to retrieve information from. |
| * @return the project root path. |
| */ |
| static Serializable indexToValue(final Double split, final Double tweak, final Integer epKeyIndex) { |
| final Class<? extends Serializable> targetObjectClass = getEpPairFromKeyIndex(epKeyIndex).right(); |
| final boolean splitIsInteger = split.compareTo((double) split.intValue()) == 0; |
| final Pair<Integer, Integer> splitIntVal = splitIsInteger |
| ? Pair.of(split.intValue() - 1, split.intValue() + 1) |
| : Pair.of(split.intValue(), split.intValue() + 1); |
| |
| if (targetObjectClass.isEnum()) { |
| final int ordinal; |
| if (split < 0) { |
| ordinal = 0; |
| } else { |
| final int maxOrdinal = targetObjectClass.getFields().length - 1; |
| final int left = splitIntVal.left() <= 0 ? 0 : splitIntVal.left(); |
| final int right = splitIntVal.right() >= maxOrdinal ? maxOrdinal : splitIntVal.right(); |
| ordinal = tweak < 0 ? left : right; |
| } |
| LOG.info("Translated: {} into ENUM with ordinal {}", split, ordinal); |
| return targetObjectClass.getEnumConstants()[ordinal]; |
| } else if (targetObjectClass.isAssignableFrom(Integer.class)) { |
| final Double val = split + tweak + 0.5; |
| final Integer res = val.intValue(); |
| LOG.info("Translated: {} into INTEGER of {}", split, res); |
| return res; |
| } else if (targetObjectClass.isAssignableFrom(Boolean.class)) { |
| final Boolean res; |
| if (split < 0) { |
| res = false; |
| } else if (split > 1) { |
| res = true; |
| } else { |
| final Boolean left = splitIntVal.left() >= 1; // false by default, true if >= 1 |
| final Boolean right = splitIntVal.right() > 0; // true by default, false if <= 0 |
| res = tweak < 0 ? left : right; |
| } |
| LOG.info("Translated: {} into BOOLEAN of {}", split, res); |
| return res; |
| } else { |
| final Supplier<IntStream> valueCandidates = () -> EP_METADATA.keySet().stream() |
| .filter(p -> p.left().equals(epKeyIndex)) |
| .mapToInt(Pair::right); |
| final Integer left = valueCandidates.get() |
| .filter(n -> n < split) |
| .map(n -> -n).sorted().map(n -> -n) // maximum among smaller values |
| .findFirst().orElse(valueCandidates.get().min().getAsInt()); |
| final Integer right = valueCandidates.get() |
| .filter(n -> n > split) |
| .sorted() // minimum among larger values |
| .findFirst().orElse(valueCandidates.get().max().getAsInt()); |
| final Integer targetValue = tweak < 0 ? left : right; |
| final Serializable res = EP_METADATA.get(Pair.of(epKeyIndex, targetValue)).getValue(); |
| LOG.info("Translated: {} into VALUE of {}", split, res); |
| return res; |
| } |
| } |
| |
| /** |
| * Receives the pair of execution property and value classes, and returns the optimized value of the EP. |
| * |
| * @param epKeyIndex the EP Key index to retrieve the new EP from. |
| * @param split the split point. |
| * @param tweak the direction in which to tweak the execution property value. |
| * @return The execution property constructed from the key index and the split value. |
| */ |
| public static ExecutionProperty<? extends Serializable> keyAndValueToEP( |
| final Integer epKeyIndex, |
| final Double split, |
| final Double tweak) { |
| |
| final Serializable value = indexToValue(split, tweak, epKeyIndex); |
| final Class<? extends ExecutionProperty> epClass = getEpPairFromKeyIndex(epKeyIndex).left(); |
| |
| final ExecutionProperty<? extends Serializable> ep; |
| try { |
| final Method staticConstructor = getMethodFor(epClass, "of", getParameterType(epClass, value.getClass())) |
| .orElseThrow(NoSuchMethodException::new); |
| ep = (ExecutionProperty<? extends Serializable>) staticConstructor.invoke(null, value); |
| } catch (final NoSuchMethodException e) { |
| throw new MetricException("Class " + epClass.getName() |
| + " does not have a static method exposing the constructor 'of' with value type " + value.getClass().getName() |
| + ": " + e); |
| } catch (final Exception e) { |
| throw new MetricException(e); |
| } |
| return ep; |
| } |
| } |