/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.tier.sockets;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;

import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.internal.serialization.Version;
import org.apache.geode.test.junit.categories.ClientServerTest;

@Category({ClientServerTest.class})
public class MessageJUnitTest {

  private Message message;

  @Before
  public void setUp() throws Exception {
    Socket mockSocket = mock(Socket.class);
    this.message = new Message(2, Version.CURRENT);
    assertEquals(2, this.message.getNumberOfParts());
    MessageStats mockStats = mock(MessageStats.class);
    ByteBuffer msgBuffer = ByteBuffer.allocate(1000);
    ServerConnection mockServerConnection = mock(ServerConnection.class);
    this.message.setComms(mockServerConnection, mockSocket, msgBuffer, mockStats);
  }

  @Test
  public void clearDoesNotThrowNPE() throws Exception {
    // unsetComms clears the message's ByteBuffer, which was causing an NPE during shutdown
    // when clear() was invoked
    this.message.unsetComms();
    this.message.clear();
  }

  @Test
  public void numberOfPartsIsAdjusted() {
    int numParts = this.message.getNumberOfParts();
    this.message.setNumberOfParts(2 * numParts + 1);
    assertEquals(2 * numParts + 1, this.message.getNumberOfParts());
    this.message.addBytesPart(new byte[1]);
    this.message.addIntPart(2);
    this.message.addLongPart(3);
    this.message.addObjPart("4");
    this.message.addStringPart("5");
    assertEquals(5, this.message.getNextPartNumber());
  }

  @Test
  public void messageLongerThanMaxIntIsRejected() throws Exception {
    Part mockPart1 = mock(Part.class);
    when(mockPart1.getLength()).thenReturn(Integer.MAX_VALUE / 2);
    Part[] parts = new Part[2];
    parts[0] = mockPart1;
    parts[1] = mockPart1;
    this.message.setParts(parts);
    try {
      this.message.send();
      fail("expected an exception but none was thrown");
    } catch (MessageTooLargeException e) {
      assertTrue(e.getMessage().contains("exceeds maximum integer value"));
    }
  }

  @Test
  public void maxMessageSizeIsRespected() throws Exception {
    Part mockPart1 = mock(Part.class);
    when(mockPart1.getLength()).thenReturn(Message.DEFAULT_MAX_MESSAGE_SIZE / 2);
    Part[] parts = new Part[2];
    parts[0] = mockPart1;
    parts[1] = mockPart1;
    this.message.setParts(parts);
    try {
      this.message.send();
      fail("expected an exception but none was thrown");
    } catch (MessageTooLargeException e) {
      assertFalse(e.getMessage().contains("exceeds maximum integer value"));
    }
  }

  /**
   * geode-1468: Message should clear the chunks in its Parts when performing cleanup.
   */
  @Test
  public void streamBuffersAreClearedDuringCleanup() throws Exception {
    Part mockPart1 = mock(Part.class);
    when(mockPart1.getLength()).thenReturn(100);
    Part[] parts = new Part[2];
    parts[0] = mockPart1;
    parts[1] = mockPart1;
    this.message.setParts(parts);
    this.message.clearParts();
    verify(mockPart1, times(2)).clear();
  }

  /**
   * Client subscription threads establish a timeout when reading a message header in order to avoid
   * hanging should the server's machine fail, or should the network path to the server have
   * problems. This test ensures that the method Message.receiveWithHeaderReadTimeout correctly
   * times out when trying to read a message header.
   *
   * @See ClientServerMiscDUnitTest#testClientReceivesPingIntervalSetting
   */
  @Test(expected = SocketTimeoutException.class)
  public void messageWillTimeoutDuringRecvOnInactiveSocket() throws Exception {
    final ServerSocket serverSocket = new ServerSocket();
    serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
    Thread serverThread = new Thread("acceptor thread") {
      @Override
      public void run() {
        Socket client = null;
        try {
          client = serverSocket.accept();
          Thread.sleep(12000);
        } catch (InterruptedException e) {

        } catch (IOException e) {

        } finally {
          if (client != null && !client.isClosed()) {
            try {
              client.close();
            } catch (IOException e) {
            }
          }
        }
      }
    };
    serverThread.setDaemon(true);
    serverThread.start();

    try {
      Socket socket = new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
      MessageStats messageStats = mock(MessageStats.class);

      message.setComms(socket, ByteBuffer.allocate(100), messageStats);
      message.receiveWithHeaderReadTimeout(500);

    } finally {
      serverThread.interrupt();
      if (serverSocket != null && !serverSocket.isClosed()) {
        serverSocket.close();
      }
    }
  }

  @Test(expected = SocketTimeoutException.class)
  public void messageWillTimeoutDuringRecvOnInactiveSocketWithoutExplicitTimeoutSetting()
      throws Exception {
    final ServerSocket serverSocket = new ServerSocket();
    serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
    Thread serverThread = new Thread("acceptor thread") {
      @Override
      public void run() {
        Socket client = null;
        try {
          client = serverSocket.accept();
          Thread.sleep(12000);
        } catch (InterruptedException e) {

        } catch (IOException e) {

        } finally {
          if (client != null && !client.isClosed()) {
            try {
              client.close();
            } catch (IOException e) {
            }
          }
        }
      }
    };
    serverThread.setDaemon(true);
    serverThread.start();

    try {
      Socket socket = new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
      socket.setSoTimeout(500);
      MessageStats messageStats = mock(MessageStats.class);

      message.setComms(socket, ByteBuffer.allocate(100), messageStats);
      message.receive();

    } finally {
      serverThread.interrupt();
      if (serverSocket != null && !serverSocket.isClosed()) {
        serverSocket.close();
      }
    }
  }

}
