/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache license, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the license for the specific language governing permissions and
 * limitations under the license.
 */
package org.apache.logging.log4j.core.appender;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LoggingException;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.impl.Log4jLogEvent;
import org.apache.logging.log4j.core.net.Protocol;
import org.apache.logging.log4j.core.util.Constants;
import org.apache.logging.log4j.core.util.Throwables;
import org.apache.logging.log4j.jackson.json.Log4jJsonObjectMapper;
import org.apache.logging.log4j.jackson.json.layout.JsonLayout;
import org.apache.logging.log4j.test.AvailablePortFinder;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 *
 */
public class SocketAppenderTest {

    private static final int PORT = AvailablePortFinder.getNextAvailable();
    private static final int DYN_PORT = AvailablePortFinder.getNextAvailable();
    private static final int ERROR_PORT = AvailablePortFinder.getNextAvailable();

    private static TcpSocketTestServer tcpServer;
    private static UdpSocketTestServer udpServer;

    private final LoggerContext context = LoggerContext.getContext();
    private final Logger logger = context.getLogger(SocketAppenderTest.class.getName());

    @BeforeClass
    public static void setupClass() throws Exception {
        tcpServer = new TcpSocketTestServer(PORT);
        tcpServer.start();
        udpServer = new UdpSocketTestServer();
        udpServer.start();
        LoggerContext.getContext().reconfigure();
        ThreadContext.clearAll();
    }

    @AfterClass
    public static void cleanupClass() {
        tcpServer.shutdown();
        udpServer.shutdown();
        ThreadContext.clearAll();
    }

    @After
    public void teardown() {
        ThreadContext.clearAll();
        removeAndStopAppenders();
        reset();
    }

    void removeAndStopAppenders() {
        final Map<String, Appender> map = logger.getAppenders();
        for (final Map.Entry<String, Appender> entry : map.entrySet()) {
            final Appender appender = entry.getValue();
            logger.removeAppender(appender);
            appender.stop();
        }
    }

    static void reset() {
        tcpServer.reset();
        udpServer.reset();
    }

    @Test
    public void testTcpAppender1() throws Exception {
        testTcpAppender(tcpServer, logger, Constants.ENCODER_BYTE_BUFFER_SIZE);
    }

    @Test
    @Ignore("WIP Bug when this method runs after testTcpAppender1()")
    public void testTcpAppender2() throws Exception {
        testTcpAppender(tcpServer, logger, Constants.ENCODER_BYTE_BUFFER_SIZE);
    }

    static void testTcpAppender(final TcpSocketTestServer tcpTestServer, final Logger logger, final int bufferSize)
            throws Exception {
        // @formatter:off
        final SocketAppender appender = SocketAppender.newBuilder()
                .withHost("localhost")
                .withPort(tcpTestServer.getLocalPort())
                .withReconnectDelayMillis(-1)
                .withName("test")
                .withImmediateFail(false)
                .withBufferSize(bufferSize)
                .withLayout(JsonLayout.newBuilder().setProperties(true).build())
                .build();
        // @formatter:on
        appender.start();
        Assert.assertEquals(bufferSize, appender.getManager().getByteBuffer().capacity());

        // set appender on root and set level to debug
        logger.addAppender(appender);
        logger.setAdditive(false);
        logger.setLevel(Level.DEBUG);
        final String tcKey = "UUID";
        final String expectedUuidStr = UUID.randomUUID().toString();
        ThreadContext.put(tcKey, expectedUuidStr);
        ThreadContext.push(expectedUuidStr);
        final String expectedExMsg = "This is a test";
        try {
            logger.debug("This is a test message");
            final Throwable child = new LoggingException(expectedExMsg);
            logger.error("Throwing an exception", child);
            logger.debug("This is another test message");
        } finally {
            ThreadContext.remove(tcKey);
            ThreadContext.pop();
        }
        Thread.sleep(250);
        LogEvent event = tcpTestServer.getQueue().poll(3, TimeUnit.SECONDS);
        assertNotNull("No event retrieved", event);
        assertTrue("Incorrect event", event.getMessage().getFormattedMessage().equals("This is a test message"));
        assertTrue("Message not delivered via TCP", tcpTestServer.getCount() > 0);
        assertEquals(expectedUuidStr, event.getContextData().getValue(tcKey));
        event = tcpTestServer.getQueue().poll(3, TimeUnit.SECONDS);
        assertNotNull("No event retrieved", event);
        assertTrue("Incorrect event", event.getMessage().getFormattedMessage().equals("Throwing an exception"));
        assertTrue("Message not delivered via TCP", tcpTestServer.getCount() > 1);
        assertEquals(expectedUuidStr, event.getContextStack().pop());
        assertNotNull(event.getThrownProxy());
        assertEquals(expectedExMsg, event.getThrownProxy().getMessage());
    }

    @Test
    public void testDefaultProtocol() throws Exception {
        // @formatter:off
        final SocketAppender appender = SocketAppender.newBuilder()
                .withPort(tcpServer.getLocalPort())
                .withReconnectDelayMillis(-1)
                .withName("test")
                .withImmediateFail(false)
                .withLayout(JsonLayout.newBuilder().setProperties(true).build())
                .build();
        // @formatter:on
        assertNotNull(appender);
        appender.stop();
    }

    @Test
    public void testUdpAppender() throws Exception {
        try {
            udpServer.latch.await();
        } catch (final InterruptedException ex) {
            ex.printStackTrace();
        }

        // @formatter:off
        final SocketAppender appender = SocketAppender.newBuilder()
                .withProtocol(Protocol.UDP)
                .withPort(tcpServer.getLocalPort())
                .withReconnectDelayMillis(-1)
                .withName("test")
                .withImmediateFail(false)
                .withLayout(JsonLayout.newBuilder().setProperties(true).build())
                .build();
        // @formatter:on
        appender.start();

        // set appender on root and set level to debug
        logger.addAppender(appender);
        logger.setAdditive(false);
        logger.setLevel(Level.DEBUG);
        logger.debug("This is a udp message");
        final LogEvent event = udpServer.getQueue().poll(3, TimeUnit.SECONDS);
        assertNotNull("No event retrieved", event);
        assertTrue("Incorrect event", event.getMessage().getFormattedMessage().equals("This is a udp message"));
        assertTrue("Message not delivered via UDP", udpServer.getCount() > 0);
    }

    @Test
    public void testTcpAppenderDeadlock() throws Exception {

        // @formatter:off
        final SocketAppender appender = SocketAppender.newBuilder()
                .withHost("localhost")
                .withPort(DYN_PORT)
                .withReconnectDelayMillis(100)
                .withName("test")
                .withImmediateFail(false)
                .withLayout(JsonLayout.newBuilder().setProperties(true).build())
                .build();
        // @formatter:on
        appender.start();
        // set appender on root and set level to debug
        logger.addAppender(appender);
        logger.setAdditive(false);
        logger.setLevel(Level.DEBUG);

        final TcpSocketTestServer tcpSocketServer = new TcpSocketTestServer(DYN_PORT);
        try {
            tcpSocketServer.start();

            logger.debug("This message is written because a deadlock never.");

            final LogEvent event = tcpSocketServer.getQueue().poll(3, TimeUnit.SECONDS);
            assertNotNull("No event retrieved", event);
        } finally {
            tcpSocketServer.shutdown();
        }
    }

    @Test
    public void testTcpAppenderNoWait() throws Exception {
        // @formatter:off
        final SocketAppender appender = SocketAppender.newBuilder()
                .withHost("localhost")
                .withPort(ERROR_PORT)
                .withReconnectDelayMillis(100)
                .withName("test")
                .withImmediateFail(false)
                .withIgnoreExceptions(false)
                .withLayout(JsonLayout.newBuilder().setProperties(true).build())
                .build();
        // @formatter:on
        appender.start();
        // set appender on root and set level to debug
        logger.addAppender(appender);
        logger.setAdditive(false);
        logger.setLevel(Level.DEBUG);

        try {
            logger.debug("This message is written because a deadlock never.");
            fail("No Exception was thrown");
        } catch (final Exception ex) {
            // TODO: move exception to @Test(expect = Exception.class)
            // Failure is expected.
            // ex.printStackTrace();
        }
    }

    public static class UdpSocketTestServer extends Thread {

        private final DatagramSocket sock;
        private boolean shutdown = false;
        private Thread thread;
        private final CountDownLatch latch = new CountDownLatch(1);
        private volatile int count = 0;
        private final BlockingQueue<LogEvent> queue;
        private final ObjectMapper objectMapper = new Log4jJsonObjectMapper();

        public UdpSocketTestServer() throws IOException {
            this.sock = new DatagramSocket(PORT);
            this.queue = new ArrayBlockingQueue<>(10);
        }

        public void reset() {
            queue.clear();
            count = 0;
        }

        public void shutdown() {
            this.shutdown = true;
            thread.interrupt();
            try {
                thread.join(100);
            } catch (InterruptedException ie) {
                System.out.println("Unable to stop server");
            }
        }

        @Override
        public void run() {
            this.thread = Thread.currentThread();
            final byte[] bytes = new byte[4096];
            final DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
            try {
                while (!shutdown) {
                    latch.countDown();
                    sock.receive(packet);
                    ++count;
                    final LogEvent event = objectMapper.readValue(packet.getData(), Log4jLogEvent.class);
                    queue.add(event);
                }
            } catch (final Throwable e) {
                e.printStackTrace();
                if (!shutdown) {
                    Throwables.rethrow(e);
                }
            }
        }

        public int getCount() {
            return count;
        }

        public BlockingQueue<LogEvent> getQueue() {
            return queue;
        }
    }

    public static class TcpSocketTestServer extends Thread {

        private final ServerSocket serverSocket;
        private volatile boolean shutdown = false;
        private volatile int count = 0;
        private final BlockingQueue<LogEvent> queue;
        private final ObjectMapper objectMapper = new Log4jJsonObjectMapper();

        @SuppressWarnings("resource")
        public TcpSocketTestServer(final int port) throws IOException {
            this(new ServerSocket(port));
        }

        public TcpSocketTestServer(final ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
            this.queue = new ArrayBlockingQueue<>(10);
        }

        public int getLocalPort() {
            return serverSocket.getLocalPort();
        }

        public void reset() {
            queue.clear();
            count = 0;
        }

        public void shutdown() {
            this.shutdown = true;
            interrupt();
            try {
                this.join(100);
            } catch (InterruptedException ie) {
                System.out.println("Unable to stop server");
            }
        }

        @Override
        public void run() {
            try {
                try (final Socket socket = serverSocket.accept()) {
                    if (socket != null) {
                        final InputStream is = socket.getInputStream();
                        while (!shutdown) {
                            final MappingIterator<LogEvent> mappingIterator = objectMapper.readerFor(Log4jLogEvent.class).readValues(is);
                            while (mappingIterator.hasNextValue()) {
                                queue.add(mappingIterator.nextValue());
                                ++count;
                            }
                        }
                    }
                }
            } catch (final EOFException eof) {
                // Socket is closed.
            } catch (final Exception e) {
                if (!shutdown) {
                    Throwables.rethrow(e);
                }
            }
        }

        public BlockingQueue<LogEvent> getQueue() {
            return queue;
        }

        public int getCount() {
            return count;
        }
    }

}
