/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.client.internal;

import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.List;

import org.apache.logging.log4j.Logger;

import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.ChunkedMessage;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderEventRemoteDispatcher.GatewayAck;
import org.apache.geode.logging.internal.log4j.api.LogService;

@SuppressWarnings("unchecked")
public class GatewaySenderBatchOp {

  private static final Logger logger = LogService.getLogger();

  /**
   * Send a list of gateway events to a server to execute using connections from the given pool to
   * communicate with the server.
   *
   * @param con the connection to send the message on.
   * @param pool the pool to use to communicate with the server.
   * @param events list of gateway events
   * @param batchId the ID of this batch
   * @param removeFromQueueOnException true if the events should be processed even after some
   *        exception
   */
  public static void executeOn(Connection con, ExecutablePool pool, List events, int batchId,
      boolean removeFromQueueOnException, boolean isRetry) {
    AbstractOp op = new GatewaySenderGFEBatchOpImpl(events, batchId, removeFromQueueOnException,
        con.getDistributedSystemId(), isRetry);
    pool.executeOn(con, op, true/* timeoutFatal */);
  }


  public static Object executeOn(Connection con, ExecutablePool pool) {
    AbstractOp op = new GatewaySenderGFEBatchOpImpl();
    return pool.executeOn(con, op, true/* timeoutFatal */);
  }

  private GatewaySenderBatchOp() {
    // no instances allowed
  }

  static class GatewaySenderGFEBatchOpImpl extends AbstractOp {

    /**
     * @throws org.apache.geode.SerializationException if serialization fails
     */
    public GatewaySenderGFEBatchOpImpl(List events, int batchId, boolean removeFromQueueOnException,
        int dsId, boolean isRetry) {
      super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events));
      if (isRetry) {
        getMessage().setIsRetry();
      }
      getMessage().addIntPart(events.size());
      getMessage().addIntPart(batchId);
      getMessage().addIntPart(dsId);
      getMessage().addBytesPart(new byte[] {removeFromQueueOnException ? (byte) 1 : (byte) 0});
      // Add each event
      for (Iterator i = events.iterator(); i.hasNext();) {
        GatewaySenderEventImpl event = (GatewaySenderEventImpl) i.next();
        // Add action
        int action = event.getAction();
        getMessage().addIntPart(action);
        { // Add posDup flag
          byte posDupByte = (byte) (event.getPossibleDuplicate() ? 0x01 : 0x00);
          getMessage().addBytesPart(new byte[] {posDupByte});
        }
        if (action >= 0 && action <= 3) {
          // 0 = create
          // 1 = update
          // 2 = destroy
          String regionName = event.getRegionPath();
          EventID eventId = event.getEventId();
          Object key = event.getKey();
          Object callbackArg = event.getSenderCallbackArgument();

          // Add region name
          getMessage().addStringPart(regionName, true);
          // Add event id
          getMessage().addObjPart(eventId);
          // Add key
          getMessage().addStringOrObjPart(key);
          if (action < 2 /* it is 0 or 1 */) {
            byte[] value = event.getSerializedValue();
            byte valueIsObject = event.getValueIsObject();;
            // Add value (which is already a serialized byte[])
            getMessage().addRawPart(value, (valueIsObject == 0x01));
          }
          // Add callback arg if necessary
          if (callbackArg == null) {
            getMessage().addBytesPart(new byte[] {0x00});
          } else {
            getMessage().addBytesPart(new byte[] {0x01});
            getMessage().addObjPart(callbackArg);
          }
          getMessage().addLongPart(event.getVersionTimeStamp());
        }
      }
    }

    public GatewaySenderGFEBatchOpImpl() {
      super(MessageType.GATEWAY_RECEIVER_COMMAND, 0);
    }

    @Override
    public Object attempt(Connection cnx) throws Exception {
      if (getMessage().getNumberOfParts() == 0) {
        return attemptRead(cnx);
      }
      this.failed = true;
      this.timedOut = false;
      long start = startAttempt(cnx.getStats());
      try {
        try {
          attemptSend(cnx);
          this.failed = false;
        } finally {
          endSendAttempt(cnx.getStats(), start);
        }
      } finally {
        endAttempt(cnx.getStats(), start);
      }
      return this.failed;
    }

    private Object attemptRead(Connection cnx) throws Exception {
      this.failed = true;
      try {
        Object result = attemptReadResponse(cnx);
        this.failed = false;
        return result;
      } catch (SocketTimeoutException ste) {
        this.failed = false;
        this.timedOut = true;
        throw ste;
      }
    }


    /**
     * Attempts to read a response to this operation by reading it from the given connection, and
     * returning it.
     *
     * @param cnx the connection to read the response from
     * @return the result of the operation or <code>null</code> if the operation has no result.
     * @throws Exception if the execute failed
     */
    @Override
    protected Object attemptReadResponse(Connection cnx) throws Exception {
      Message msg = createResponseMessage();
      if (msg != null) {
        msg.setComms(cnx.getSocket(), cnx.getInputStream(), cnx.getOutputStream(),
            ((ConnectionImpl) cnx).getCommBufferForAsyncRead(), cnx.getStats());
        if (msg instanceof ChunkedMessage) {
          try {
            return processResponse(msg, cnx);
          } finally {
            msg.unsetComms();
            // TODO (ashetkar) Handle the case when we fail to read the
            // connection id.
            processSecureBytes(cnx, msg);
          }
        }

        try {
          msg.receive();
        } finally {
          msg.unsetComms();
          processSecureBytes(cnx, msg);
        }
        return processResponse(msg, cnx);
      }

      return null;
    }


    private static int calcPartCount(List events) {
      int numberOfParts = 4; // for the number of events and the batchId
      for (Iterator i = events.iterator(); i.hasNext();) {
        GatewaySenderEventImpl event = (GatewaySenderEventImpl) i.next();
        numberOfParts += event.getNumberOfParts();
      }
      return numberOfParts;
    }

    @Override
    protected boolean needsUserId() {
      return false;
    }

    @Override
    protected void sendMessage(Connection cnx) throws Exception {
      getMessage().clearMessageHasSecurePartFlag();
      getMessage().send(false);
    }

    @Override
    protected Object processResponse(Message msg) throws Exception {
      GatewayAck ack = null;
      try {
        // Read the header which describes the type of message following
        switch (msg.getMessageType()) {
          case MessageType.REPLY:
            // Read the chunk
            Part part0 = msg.getPart(0);
            if (part0.isBytes() && part0.getLength() == 1 && part0.getSerializedForm()[0] == 0) {
              // REPLY_OKAY from a CloseConnection
              break;
            }
            int batchId = part0.getInt();
            int numEvents = msg.getPart(1).getInt();
            ack = new GatewayAck(batchId, numEvents);
            break;
          case MessageType.EXCEPTION:
            part0 = msg.getPart(0);

            Object obj = part0.getObject();
            if (obj instanceof List) {
              List<BatchException70> l = (List<BatchException70>) part0.getObject();

              if (logger.isDebugEnabled()) {
                logger.debug(
                    "We got an exception from the GatewayReceiver. MessageType : {} obj :{}",
                    msg.getMessageType(), obj);
              }
              // don't throw Exception but set it in the Ack
              BatchException70 be = new BatchException70(l);
              ack = new GatewayAck(be, l.get(0).getBatchId());

            } else if (obj instanceof Throwable) {
              String s = ": While reading Ack from receiver " + ((Throwable) obj).getMessage();
              throw new ServerOperationException(s, (Throwable) obj);
            }
            break;
          default:
            throw new InternalGemFireError(String.format("Unknown message type %s",
                Integer.valueOf(msg.getMessageType())));
        }
      } finally {
        msg.clear();
      }
      return ack;
    }

    @Override
    protected boolean isErrorResponse(int msgType) {
      return false;
    }

    @Override
    protected long startAttempt(ConnectionStats stats) {
      return stats.startGatewayBatch();
    }

    @Override
    protected void endSendAttempt(ConnectionStats stats, long start) {
      stats.endGatewayBatchSend(start, hasFailed());
    }

    @Override
    protected void endAttempt(ConnectionStats stats, long start) {
      stats.endGatewayBatch(start, hasTimedOut(), hasFailed());
    }

    @Override
    public boolean isGatewaySenderOp() {
      return true;
    }
  }
}
