/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.logging;

import static org.apache.geode.internal.logging.LogWriterLevel.ALL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.LogWriter;
import org.apache.geode.SystemFailure;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.junit.categories.LoggingTest;

/**
 * Integration tests for (new multi-threaded) {@link MergeLogFiles} utility.
 */
@Category(LoggingTest.class)
public class MergeLogFilesIntegrationTest {

  /** The next integer to be written to the log */
  private int next;

  /**
   * A bunch of threads write a strictly increasing integer to log "files" stored in byte arrays.
   * The files are merged and the order is verified.
   */
  @Test
  public void testMultipleThreads() throws Exception {
    // Spawn a bunch of threads that write to a log
    WorkerGroup group = new WorkerGroup("Workers");
    List workers = new ArrayList();
    for (int i = 0; i < 10; i++) {
      Worker worker = new Worker("Worker " + i, group);
      workers.add(worker);
      worker.start();
    }

    for (Iterator iter = workers.iterator(); iter.hasNext();) {
      Worker worker = (Worker) iter.next();
      ThreadUtils.join(worker, 120 * 1000);
    }

    if (group.exceptionOccurred()) {
      fail(group.getExceptionString());
    }

    // Merge the log files together
    Map<String, InputStream> logs = new HashMap<>();
    for (int i = 0; i < workers.size(); i++) {
      Worker worker = (Worker) workers.get(i);
      logs.put(worker.getName(), worker.getInputStream());
    }
    StringWriter sw = new StringWriter();
    PrintWriter pw = new PrintWriter(sw, true);
    MergeLogFiles.mergeLogFiles(logs, pw);

    // System.out.println(sw.toString());

    // Verify that the entries are sorted
    BufferedReader br = new BufferedReader(new StringReader(sw.toString()));
    Pattern pattern = Pattern.compile("^Worker \\d+: .* VALUE: (\\d+)");
    int lastValue = -1;
    while (br.ready()) {
      String line = br.readLine();
      if (line == null) {
        break;
      }

      Matcher matcher = pattern.matcher(line);
      if (matcher.matches()) {
        int value = Integer.parseInt(matcher.group(1));
        assertTrue(lastValue + " <= " + value, value > lastValue);
        lastValue = value;
      }
    }

    assertEquals(999, lastValue);
  }

  /**
   * A <code>ThreadGroup</code> for workers
   */
  private static class WorkerGroup extends ThreadGroup {
    /**
     * Did an uncaught exception occur in one of this group's threads?
     */
    private boolean exceptionOccurred;

    /**
     * A <code>StringBuffer</code> containing a description of the uncaught exceptions thrown by the
     * worker threads.
     */
    private final StringBuffer sb;

    /**
     * Creates a new <code>WorkerGroup</code> with the given name
     */
    WorkerGroup(String name) {
      super(name);
      sb = new StringBuffer();
    }

    @Override
    public void uncaughtException(Thread t, Throwable e) {
      if (e instanceof VirtualMachineError) {
        SystemFailure.setFailure((VirtualMachineError) e); // don't throw
      }
      sb.append("Uncaught exception in thread ");
      sb.append(t.getName());
      sb.append("\n");
      StringWriter sw = new StringWriter();
      e.printStackTrace(new PrintWriter(sw, true));
      sb.append(sw.toString());
      sb.append("\n");

      exceptionOccurred = true;
    }

    /**
     * Returns whether or not an uncaught exception occurred in one of the worker threads.
     */
    public boolean exceptionOccurred() {
      return exceptionOccurred;
    }

    /**
     * Returns a string describing the uncaught exception(s) that occurred in the worker threads.
     */
    String getExceptionString() {
      return sb.toString();
    }
  }

  /**
   * Writes a strictly increasing number to a log "file" stored in a byte array. Waits a random
   * amount of time between writing entries.
   */
  private class Worker extends Thread {
    /** The input stream for reading from the log file */
    private InputStream in;

    /** A random number generator */
    private final Random random;

    /**
     * Creates a new <code>Worker</code> with the given name
     */
    public Worker(String name, ThreadGroup group) {
      super(group, name);
      random = new Random();
    }

    @Override
    public void run() {
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      LogWriter logger =
          new LocalLogWriter(ALL.intLevel(), new PrintStream(baos, true));
      for (int i = 0; i < 100; i++) {
        int n;
        synchronized (MergeLogFilesIntegrationTest.this) {
          n = next++;

          // Have to log with the lock to guarantee ordering
          logger.info("VALUE: " + n);

          try {
            // Make sure that no two entries are at the same
            // millisecond. Since Windows doesn't have millisecond
            // granularity, we have to wait at least ten. Sigh.
            // DJM - even at 10, some entries have the same timestamp
            // on some PCs.
            Thread.sleep(15);

          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            break; // TODO
          }
        }

        try {
          Thread.sleep(random.nextInt(250));

        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
          break; // TODO
        }
      }

      // System.out.println(baos.toString());
      in = new ByteArrayInputStream(baos.toByteArray());
    }

    /**
     * Returns an <code>InputStream</code> for reading from the log that this worker wrote.
     */
    InputStream getInputStream() {
      return in;
    }
  }
}
