/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tajo.worker;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.ResourceProtos.FetchProto;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.querymaster.Task;
import org.apache.tajo.util.TUtil;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;

/**
 * <code>FetchImpl</code> information to indicate the locations of intermediate data.
 */
public class FetchImpl implements ProtoObject<FetchProto>, Cloneable {
  private Task.PullHost host;             // The pull server host information
  private ShuffleType type; // hash or range partition method.
  private ExecutionBlockId executionBlockId;   // The executionBlock id
  private int partitionId;                     // The hash partition id
  private String name;                         // The intermediate source name
  private String rangeParams;                  // optional, the http parameters of range partition. (e.g., start=xx&end=yy)
  private boolean hasNext = false;             // optional, if true, has more taskIds

  private List<Integer> taskIds;               // repeated, the task ids
  private List<Integer> attemptIds;            // repeated, the attempt ids

  private long offset = -1;
  private long length = -1;

  public FetchImpl() {
    taskIds = new ArrayList<Integer>();
    attemptIds = new ArrayList<Integer>();
  }

  public FetchImpl(FetchProto proto) {
    this(new Task.PullHost(proto.getHost(), proto.getPort()),
        proto.getType(),
        new ExecutionBlockId(proto.getExecutionBlockId()),
        proto.getPartitionId(),
        proto.getRangeParams(),
        proto.getHasNext(),
        proto.getName(),
        proto.getTaskIdList(), proto.getAttemptIdList());

    if (proto.hasOffset()) {
      this.offset = proto.getOffset();
    }

    if (proto.hasLength()) {
      this.length = proto.getLength();
    }
  }

  public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
                   int partitionId) {
    this(host, type, executionBlockId, partitionId, null, false, null,
        new ArrayList<Integer>(), new ArrayList<Integer>());
  }

  public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
                   int partitionId, List<Task.IntermediateEntry> intermediateEntryList) {
    this(host, type, executionBlockId, partitionId, null, false, null,
        new ArrayList<Integer>(), new ArrayList<Integer>());
    for (Task.IntermediateEntry entry : intermediateEntryList){
      addPart(entry.getTaskId(), entry.getAttemptId());
    }
  }

  public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId,
                   int partitionId, String rangeParams, boolean hasNext, String name,
                   List<Integer> taskIds, List<Integer> attemptIds) {
    this.host = host;
    this.type = type;
    this.executionBlockId = executionBlockId;
    this.partitionId = partitionId;
    this.rangeParams = rangeParams;
    this.hasNext = hasNext;
    this.name = name;
    this.taskIds = taskIds;
    this.attemptIds = attemptIds;
  }

  @Override
  public int hashCode() {
    return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams,
        hasNext, taskIds, attemptIds, offset, length);
  }

  @Override
  public FetchProto getProto() {
    FetchProto.Builder builder = FetchProto.newBuilder();

    builder.setHost(host.getHost());
    builder.setPort(host.getPort());
    builder.setType(type);
    builder.setExecutionBlockId(executionBlockId.getProto());
    builder.setPartitionId(partitionId);
    builder.setHasNext(hasNext);
    builder.setName(name);

    if (rangeParams != null && !rangeParams.isEmpty()) {
      builder.setRangeParams(rangeParams);
    }

    Preconditions.checkArgument(taskIds.size() == attemptIds.size());
    builder.addAllTaskId(taskIds);
    builder.addAllAttemptId(attemptIds);

    builder.setOffset(offset);
    builder.setLength(length);
    return builder.build();
  }

  public void addPart(int taskId, int attemptId) {
    this.taskIds.add(taskId);
    this.attemptIds.add(attemptId);
  }

  public Task.PullHost getPullHost() {
    return this.host;
  }

  public ExecutionBlockId getExecutionBlockId() {
    return executionBlockId;
  }

  public void setExecutionBlockId(ExecutionBlockId executionBlockId) {
    this.executionBlockId = executionBlockId;
  }

  public int getPartitionId() {
    return partitionId;
  }

  public void setPartitionId(int partitionId) {
    this.partitionId = partitionId;
  }

  public String getRangeParams() {
    return rangeParams;
  }

  public void setRangeParams(String rangeParams) {
    this.rangeParams = rangeParams;
  }

  public boolean hasNext() {
    return hasNext;
  }

  public void setHasNext(boolean hasNext) {
    this.hasNext = hasNext;
  }

  public ShuffleType getType() {
    return type;
  }

  public void setType(ShuffleType type) {
    this.type = type;
  }

  /**
   * Get the pull server URIs.
   */
  public List<URI> getURIs(){
    return Repartitioner.createFetchURL(this, true);
  }

  /**
   * Get the pull server URIs without repeated parameters.
   */
  public List<URI> getSimpleURIs(){
    return Repartitioner.createFetchURL(this, false);
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public List<Integer> getTaskIds() {
    return taskIds;
  }

  public List<Integer> getAttemptIds() {
    return attemptIds;
  }

  public long getOffset() {
    return offset;
  }

  public void setOffset(long offset) {
    this.offset = offset;
  }

  public long getLength() {
    return length;
  }

  public void setLength(long length) {
    this.length = length;
  }

  public FetchImpl clone() throws CloneNotSupportedException {
    FetchImpl newFetchImpl = (FetchImpl) super.clone();

    newFetchImpl.host = host.clone();
    newFetchImpl.type = type;
    newFetchImpl.executionBlockId = executionBlockId;
    newFetchImpl.partitionId = partitionId;
    newFetchImpl.name = name;
    newFetchImpl.rangeParams = rangeParams;
    newFetchImpl.hasNext = hasNext;
    if (taskIds != null) {
      newFetchImpl.taskIds = Lists.newArrayList(taskIds);
    }
    if (attemptIds != null) {
      newFetchImpl.attemptIds = Lists.newArrayList(attemptIds);
    }
    newFetchImpl.offset = offset;
    newFetchImpl.length = length;
    return newFetchImpl;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) {
      return true;
    }
    if (o == null || getClass() != o.getClass()) {
      return false;
    }

    FetchImpl fetch = (FetchImpl) o;

    return TUtil.checkEquals(hasNext, fetch.hasNext) &&
        TUtil.checkEquals(partitionId, fetch.partitionId) &&
        TUtil.checkEquals(attemptIds, fetch.attemptIds) &&
        TUtil.checkEquals(executionBlockId, fetch.executionBlockId) &&
        TUtil.checkEquals(host, fetch.host) &&
        TUtil.checkEquals(name, fetch.name) &&
        TUtil.checkEquals(rangeParams, fetch.rangeParams) &&
        TUtil.checkEquals(taskIds, fetch.taskIds) &&
        TUtil.checkEquals(type, fetch.type) &&
        TUtil.checkEquals(offset, fetch.offset) &&
        TUtil.checkEquals(length, fetch.length);
  }
}
