/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.giraph.conf;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
import org.apache.giraph.factories.DefaultMessageValueFactory;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import com.google.common.base.Preconditions;

/**
 * Default implementation of MessageClasses
 *
 * @param <I> Vertex id type
 * @param <M> Message type
 */
public class DefaultMessageClasses
    <I extends WritableComparable, M extends Writable>
    implements MessageClasses<I, M> {
  /** message class */
  private Class<M> messageClass;
  /** message value factory class */
  private Class<? extends MessageValueFactory<M>>
  messageValueFactoryClass;
  /** message combiner class */
  private Class<? extends MessageCombiner<? super I, M>> messageCombinerClass;
  /** whether message class was manually modified in this superstep */
  private boolean messageClassModified;
  /** message encode and store type */
  private MessageEncodeAndStoreType messageEncodeAndStoreType;

  /** Constructor */
  public DefaultMessageClasses() {
  }

  /**
   * Constructor
   * @param messageClass message class
   * @param messageValueFactoryClass message value factory class
   * @param messageCombinerClass message combiner class
   * @param messageEncodeAndStoreType message encode and store type
   */
  public DefaultMessageClasses(
      Class<M> messageClass,
      Class<? extends MessageValueFactory<M>> messageValueFactoryClass,
      Class<? extends MessageCombiner<? super I, M>> messageCombinerClass,
        MessageEncodeAndStoreType messageEncodeAndStoreType) {
    this.messageClass = messageClass;
    this.messageValueFactoryClass = messageValueFactoryClass;
    this.messageCombinerClass = messageCombinerClass;
    this.messageClassModified = false;
    this.messageEncodeAndStoreType = messageEncodeAndStoreType;
  }

  @Override
  public Class<M> getMessageClass() {
    return messageClass;
  }

  @Override
  public MessageValueFactory<M> createMessageValueFactory(
      ImmutableClassesGiraphConfiguration conf) {
    if (messageValueFactoryClass.equals(DefaultMessageValueFactory.class)) {
      return new DefaultMessageValueFactory<>(messageClass, conf);
    }

    MessageValueFactory factory = ReflectionUtils.newInstance(
        messageValueFactoryClass, conf);
    if (!factory.newInstance().getClass().equals(messageClass)) {
      throw new IllegalStateException("Message factory " +
        messageValueFactoryClass + " creates " +
        factory.newInstance().getClass().getName() + ", but message type is " +
        messageClass.getName());
    }
    return factory;
  }

  @Override
  public MessageCombiner<? super I, M> createMessageCombiner(
      ImmutableClassesGiraphConfiguration conf) {
    if (messageCombinerClass == null) {
      return null;
    }

    MessageCombiner combiner =
        ReflectionUtils.newInstance(messageCombinerClass, conf);
    if (combiner != null) {
      Preconditions.checkState(
          combiner.createInitialMessage().getClass().equals(messageClass));
    }
    return combiner;
  }

  @Override
  public boolean useMessageCombiner() {
    return messageCombinerClass != null;
  }

  @Override
  public boolean ignoreExistingVertices() {
    return false;
  }

  @Override
  public MessageEncodeAndStoreType getMessageEncodeAndStoreType() {
    return messageEncodeAndStoreType;
  }

  @Override
  public MessageClasses<I, M> createCopyForNewSuperstep() {
    return new DefaultMessageClasses<>(messageClass, messageValueFactoryClass,
        messageCombinerClass, messageEncodeAndStoreType);
  }

  @Override
  public void verifyConsistent(
      ImmutableClassesGiraphConfiguration conf) {
    Class<?>[] factoryTypes = ReflectionUtils.getTypeArguments(
        MessageValueFactory.class, messageValueFactoryClass);
    ReflectionUtils.verifyTypes(messageClass, factoryTypes[0],
        "Message factory", messageValueFactoryClass);

    if (messageCombinerClass != null) {
      Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
          MessageCombiner.class, messageCombinerClass);
      ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
          "Vertex id", messageCombinerClass);
      ReflectionUtils.verifyTypes(messageClass, combinerTypes[1],
          "Outgoing message", messageCombinerClass);
    }
  }

  /**
   * Set message class
   * @param messageClass message classs
   */
  public void setMessageClass(Class<M> messageClass) {
    this.messageClassModified = true;
    this.messageClass = messageClass;
  }

  /**
   * Set message class if not set already in this superstep
   * @param messageClass message class
   */
  public void setIfNotModifiedMessageClass(Class<M> messageClass) {
    if (!messageClassModified) {
      this.messageClass = messageClass;
    }
  }

  public void setMessageCombinerClass(
      Class<? extends MessageCombiner<? super I, M>> messageCombinerClass) {
    this.messageCombinerClass = messageCombinerClass;
  }

  public void setMessageValueFactoryClass(
      Class<? extends MessageValueFactory<M>> messageValueFactoryClass) {
    this.messageValueFactoryClass = messageValueFactoryClass;
  }

  public void setMessageEncodeAndStoreType(
      MessageEncodeAndStoreType messageEncodeAndStoreType) {
    this.messageEncodeAndStoreType = messageEncodeAndStoreType;
  }

  @Override
  public void write(DataOutput out) throws IOException {
    WritableUtils.writeClass(messageClass, out);
    WritableUtils.writeClass(messageValueFactoryClass, out);
    WritableUtils.writeClass(messageCombinerClass, out);
    out.writeBoolean(messageClassModified);
    out.writeByte(messageEncodeAndStoreType.ordinal());
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    messageClass = WritableUtils.readClass(in);
    messageValueFactoryClass = WritableUtils.readClass(in);
    messageCombinerClass = WritableUtils.readClass(in);
    messageClassModified = in.readBoolean();
    messageEncodeAndStoreType =
        messageEncodeAndStoreType.values()[in.readByte()];
  }
}
