/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.twill.yarn;

import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.Configs;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.io.LocationCache;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

import java.io.PrintWriter;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Unit test for {@link LocationCache} usage in {@link YarnTwillRunnerService}.
 */
public class LocationCacheTest {

  // Create a TwillTester with LocationCache enabled
  @ClassRule
  public static final TwillTester TWILL_TESTER = new TwillTester(Configs.Keys.LOCATION_CACHE_DIR, ".cache");

  @Test(timeout = 120000L)
  public void testLocationCache() throws Exception {
    TwillRunner twillRunner = TWILL_TESTER.getTwillRunner();

    // Start the runnable
    TwillController controller = twillRunner.prepare(new BlockingTwillRunnable())
      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
      .start();

    // Wait until the runnable is runnable
    String runnableName = BlockingTwillRunnable.class.getSimpleName();
    ResourceReport resourceReport = controller.getResourceReport();
    while (resourceReport == null || resourceReport.getRunnableResources(runnableName).isEmpty()) {
      TimeUnit.SECONDS.sleep(1);
      resourceReport = controller.getResourceReport();
    }

    long startTime = System.currentTimeMillis();

    // Inspect the cache directory, there should be a directory, which is the current session
    // inside that directory, there should be three files, launcher.jar, twill.jar and an application jar
    LocationFactory locationFactory = TWILL_TESTER.createLocationFactory();
    Location cacheBase = locationFactory.create(".cache");

    List<Location> cacheDirs = cacheBase.list();
    Assert.assertEquals(1, cacheDirs.size());

    Location currentSessionCache = cacheDirs.get(0);
    Assert.assertEquals(3, currentSessionCache.list().size());

    // Force a cleanup of cache. The first call is to collect the locations to be cleanup.
    // The second call is the actual cleanup.
    ((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime);
    ((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime +
                                                                       Configs.Defaults.LOCATION_CACHE_EXPIRY_MS);

    // Since the app is still runnable, no files in the cache should get removed.
    Assert.assertEquals(3, currentSessionCache.list().size());

    // Stop the app
    controller.terminate().get();

    // Force a cleanup of cache. The first call is to collect the locations to be cleanup.
    // The second call is the actual cleanup.
    ((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime);
    ((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime +
                                                                       Configs.Defaults.LOCATION_CACHE_EXPIRY_MS);

    // Since the app is stopped, there should only be two files, the launcher.jar and twill.jar, as they
    // will never get removed for the current session.
    Set<Location> cachedLocations = new HashSet<>(currentSessionCache.list());
    Assert.assertEquals(2, cachedLocations.size());
    Assert.assertTrue(cachedLocations.contains(currentSessionCache.append(Constants.Files.LAUNCHER_JAR)));
    Assert.assertTrue(cachedLocations.contains(currentSessionCache.append(Constants.Files.TWILL_JAR)));

    // Start another YarnTwillRunnerService
    TwillRunnerService newTwillRunner = TWILL_TESTER.createTwillRunnerService();
    newTwillRunner.start();

    // Force a cleanup using the antique expiry. The list of locations that need to be cleanup was already
    // collected when the new twill runner was started.
    // Need to add some time in addition to the antique expiry time because the cache cleaner collects
    // pending list asynchronously, which the "current" time it uses to calculate the expiration time might be
    // later than the System.currentTimeMillis() call in the next line.
    ((YarnTwillRunnerService) newTwillRunner)
      .forceLocationCacheCleanup(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30) +
                                   Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);

    // Now there shouldn't be any file under the current session cache directory
    List<Location> locations = currentSessionCache.list();
    Assert.assertTrue("Location is not empty " + locations, locations.isEmpty());
  }

  /**
   * A runnable that blocks until stopped explicitly.
   */
  public static final class BlockingTwillRunnable extends AbstractTwillRunnable {

    private final CountDownLatch stopLatch = new CountDownLatch(1);

    @Override
    public void run() {
      Uninterruptibles.awaitUninterruptibly(stopLatch);
    }

    @Override
    public void stop() {
      stopLatch.countDown();
    }
  }
}
