/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.reef.runtime.common.client;

import com.google.protobuf.ByteString;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.CompletedJob;
import org.apache.reef.client.FailedJob;
import org.apache.reef.client.JobMessage;
import org.apache.reef.client.RunningJob;
import org.apache.reef.client.parameters.JobCompletedHandler;
import org.apache.reef.client.parameters.JobFailedHandler;
import org.apache.reef.client.parameters.JobMessageHandler;
import org.apache.reef.client.parameters.JobRunningHandler;
import org.apache.reef.driver.parameters.DriverIdentifier;
import org.apache.reef.proto.ClientRuntimeProtocol.JobControlProto;
import org.apache.reef.proto.ClientRuntimeProtocol.Signal;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.proto.ReefServiceProtos.JobStatusProto;
import org.apache.reef.runtime.common.utils.ExceptionCodec;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.Optional;
import org.apache.reef.wake.EventHandler;

import javax.inject.Inject;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Implementation of RunningJob.
 */
@ClientSide
@Private
public final class RunningJobImpl implements RunningJob, EventHandler<JobStatusProto> {

  private static final Logger LOG = Logger.getLogger(RunningJob.class.getName());

  private final String jobId;

  private final EventHandler<JobControlProto> jobControlHandler;
  private final EventHandler<RunningJob> runningJobEventHandler;
  private final EventHandler<CompletedJob> completedJobEventHandler;
  private final EventHandler<FailedJob> failedJobEventHandler;
  private final EventHandler<JobMessage> jobMessageEventHandler;
  private final ExceptionCodec exceptionCodec;

  @Inject
  RunningJobImpl(final RemoteManager remoteManager,
                 @Parameter(DriverIdentifier.class) final String driverIdentifier,
                 @Parameter(REEFImplementation.DriverRemoteIdentifier.class) final String driverRID,
                 @Parameter(JobRunningHandler.class) final EventHandler<RunningJob> runningJobEventHandler,
                 @Parameter(JobCompletedHandler.class) final EventHandler<CompletedJob> completedJobEventHandler,
                 @Parameter(JobFailedHandler.class) final EventHandler<FailedJob> failedJobEventHandler,
                 @Parameter(JobMessageHandler.class) final EventHandler<JobMessage> jobMessageEventHandler,
                 final ExceptionCodec exceptionCodec) {

    this.jobId = driverIdentifier;
    this.runningJobEventHandler = runningJobEventHandler;
    this.completedJobEventHandler = completedJobEventHandler;
    this.failedJobEventHandler = failedJobEventHandler;
    this.jobMessageEventHandler = jobMessageEventHandler;
    this.exceptionCodec = exceptionCodec;
    this.jobControlHandler = remoteManager.getHandler(driverRID, JobControlProto.class);

    this.runningJobEventHandler.onNext(this);
    LOG.log(Level.FINE, "Instantiated 'RunningJobImpl'");
  }

  @Override
  public synchronized void close() {
    this.jobControlHandler.onNext(
        JobControlProto.newBuilder()
            .setIdentifier(this.jobId)
            .setSignal(Signal.SIG_TERMINATE)
            .build()
    );
  }

  @Override
  public synchronized void close(final byte[] message) {
    this.jobControlHandler.onNext(
        JobControlProto.newBuilder()
            .setIdentifier(this.jobId)
            .setSignal(Signal.SIG_TERMINATE)
            .setMessage(ByteString.copyFrom(message))
            .build()
    );
  }

  @Override
  public String getId() {
    return this.jobId;
  }

  @Override
  public synchronized void send(final byte[] message) {
    this.jobControlHandler.onNext(
        JobControlProto.newBuilder()
            .setIdentifier(this.jobId)
            .setMessage(ByteString.copyFrom(message))
            .build()
    );
  }

  @Override
  public synchronized void onNext(final JobStatusProto value) {

    final ReefServiceProtos.State state = value.getState();
    LOG.log(Level.FINEST, "Received job status: {0} from {1}",
        new Object[]{state, value.getIdentifier()});

    if (value.hasMessage()) {
      this.jobMessageEventHandler.onNext(
          new JobMessage(getId(), value.getMessage().toByteArray()));
    }
    if (state == ReefServiceProtos.State.DONE) {
      this.completedJobEventHandler.onNext(new CompletedJobImpl(this.getId()));
    } else if (state == ReefServiceProtos.State.FAILED) {
      this.onJobFailure(value);
    }
  }

  /**
   * Inform the client of a failed job.
   *
   * @param jobStatusProto status of the failed job
   */
  private synchronized void onJobFailure(final JobStatusProto jobStatusProto) {
    assert jobStatusProto.getState() == ReefServiceProtos.State.FAILED;

    final String id = this.jobId;
    final Optional<byte[]> data = jobStatusProto.hasException() ?
        Optional.of(jobStatusProto.getException().toByteArray()) :
        Optional.<byte[]>empty();
    final Optional<Throwable> cause = this.exceptionCodec.fromBytes(data);

    final String message;
    if (cause.isPresent() && cause.get().getMessage() != null) {
      message = cause.get().getMessage();
    } else {
      message = "No Message sent by the Job in exception " + cause.get();
      LOG.log(Level.WARNING, message, cause.get());
    }
    final Optional<String> description = Optional.of(message);

    final FailedJob failedJob = new FailedJob(id, message, description, cause, data);
    this.failedJobEventHandler.onNext(failedJob);
  }

  @Override
  public String toString() {
    return "RunningJob{'" + this.jobId + "'}";
  }
}
