/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.partitioned.rebalance;

import java.util.Map;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.control.PartitionRebalanceDetailsImpl;
import org.apache.geode.internal.cache.control.ResourceManagerStats;
import org.apache.geode.logging.internal.log4j.api.LogService;

public class BucketOperatorWrapper implements BucketOperator {
  private static final Logger logger = LogService.getLogger();

  private final BucketOperator delegate;
  private final Set<PartitionRebalanceDetailsImpl> detailSet;
  private final int regionCount;
  private final ResourceManagerStats stats;
  private final PartitionedRegion leaderRegion;

  public BucketOperatorWrapper(BucketOperator delegate,
      Set<PartitionRebalanceDetailsImpl> rebalanceDetails, ResourceManagerStats stats,
      PartitionedRegion leaderRegion) {
    this.delegate = delegate;
    this.detailSet = rebalanceDetails;
    this.regionCount = detailSet.size();
    this.stats = stats;
    this.leaderRegion = leaderRegion;
  }

  @Override
  public boolean moveBucket(InternalDistributedMember sourceMember,
      InternalDistributedMember targetMember, int id, Map<String, Long> colocatedRegionBytes) {
    long start = System.nanoTime();
    boolean result = false;
    long elapsed = 0;
    long totalBytes = 0;

    if (stats != null) {
      stats.startBucketTransfer(regionCount);
    }
    try {
      result = delegate.moveBucket(sourceMember, targetMember, id, colocatedRegionBytes);
      elapsed = System.nanoTime() - start;
      if (result) {
        if (logger.isDebugEnabled()) {
          logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id,
              sourceMember, targetMember);
        }
        for (PartitionRebalanceDetailsImpl details : detailSet) {
          String regionPath = details.getRegionPath();
          Long regionBytes = colocatedRegionBytes.get(regionPath);
          if (regionBytes != null) {
            // only increment the elapsed time for the leader region
            details.incTransfers(regionBytes.longValue(),
                details.getRegion().equals(leaderRegion) ? elapsed : 0);
            totalBytes += regionBytes.longValue();
          }
        }
      } else {
        if (logger.isDebugEnabled()) {
          logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id,
              sourceMember, targetMember);
        }
      }
    } finally {
      if (stats != null) {
        stats.endBucketTransfer(regionCount, result, totalBytes, elapsed);
      }
    }

    return result;
  }

  @Override
  public void createRedundantBucket(final InternalDistributedMember targetMember, final int i,
      final Map<String, Long> colocatedRegionBytes, final Completion completion) {

    if (stats != null) {
      stats.startBucketCreate(regionCount);
    }

    final long start = System.nanoTime();
    delegate.createRedundantBucket(targetMember, i, colocatedRegionBytes, new Completion() {

      @Override
      public void onSuccess() {
        long totalBytes = 0;
        long elapsed = System.nanoTime() - start;
        if (logger.isDebugEnabled()) {
          logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i,
              targetMember);
        }
        for (PartitionRebalanceDetailsImpl details : detailSet) {
          String regionPath = details.getRegionPath();
          Long lrb = colocatedRegionBytes.get(regionPath);
          if (lrb != null) { // region could have gone away - esp during shutdow
            long regionBytes = lrb.longValue();
            // Only add the elapsed time to the leader region.
            details.incCreates(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
            totalBytes += regionBytes;
          }
        }

        if (stats != null) {
          stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
        }

        // invoke onSuccess on the received completion callback
        completion.onSuccess();
      }

      @Override
      public void onFailure() {
        long elapsed = System.nanoTime() - start;

        if (logger.isDebugEnabled()) {
          logger.info("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i,
              targetMember);
        }

        if (stats != null) {
          stats.endBucketCreate(regionCount, false, 0, elapsed);
        }

        // invoke onFailure on the received completion callback
        completion.onFailure();
      }
    });
  }

  @Override
  public boolean removeBucket(InternalDistributedMember targetMember, int i,
      Map<String, Long> colocatedRegionBytes) {
    boolean result = false;
    long elapsed = 0;
    long totalBytes = 0;

    if (stats != null) {
      stats.startBucketRemove(regionCount);
    }
    try {
      long start = System.nanoTime();
      result = delegate.removeBucket(targetMember, i, colocatedRegionBytes);
      elapsed = System.nanoTime() - start;
      if (result) {
        if (logger.isDebugEnabled()) {
          logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i,
              targetMember);
        }
        for (PartitionRebalanceDetailsImpl details : detailSet) {
          String regionPath = details.getRegionPath();
          Long lrb = colocatedRegionBytes.get(regionPath);
          if (lrb != null) { // region could have gone away - esp during shutdow
            long regionBytes = lrb.longValue();
            // Only add the elapsed time to the leader region.
            details.incRemoves(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
            totalBytes += regionBytes;
          }
        }
      } else {
        if (logger.isDebugEnabled()) {
          logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i,
              targetMember);
        }
      }
    } finally {
      if (stats != null) {
        stats.endBucketRemove(regionCount, result, totalBytes, elapsed);
      }
    }

    return result;
  }

  @Override
  public boolean movePrimary(InternalDistributedMember source, InternalDistributedMember target,
      int bucketId) {
    boolean result = false;
    long elapsed = 0;

    if (stats != null) {
      stats.startPrimaryTransfer(regionCount);
    }

    try {
      long start = System.nanoTime();
      result = delegate.movePrimary(source, target, bucketId);
      elapsed = System.nanoTime() - start;
      if (result) {
        if (logger.isDebugEnabled()) {
          logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion,
              bucketId, source, target);
        }
        for (PartitionRebalanceDetailsImpl details : detailSet) {
          details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0);
        }
      } else {
        if (logger.isDebugEnabled()) {
          logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}",
              leaderRegion, bucketId, source, target);
        }
      }
    } finally {
      if (stats != null) {
        stats.endPrimaryTransfer(regionCount, result, elapsed);
      }
    }

    return result;
  }

  @Override
  public void waitForOperations() {
    delegate.waitForOperations();
  }

  public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
    return this.detailSet;
  }
}
