blob: 63ca28bb691a77127a96d9299a7e4b47e9eecf10 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.yarn;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.twill.api.AbstractTwillRunnable;
import org.apache.twill.api.Configs;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.io.LocationCache;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import java.io.PrintWriter;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Unit test for {@link LocationCache} usage in {@link YarnTwillRunnerService}.
*/
public class LocationCacheTest {
// Create a TwillTester with LocationCache enabled
@ClassRule
public static final TwillTester TWILL_TESTER = new TwillTester(Configs.Keys.LOCATION_CACHE_DIR, ".cache");
@Test(timeout = 120000L)
public void testLocationCache() throws Exception {
TwillRunner twillRunner = TWILL_TESTER.getTwillRunner();
// Start the runnable
TwillController controller = twillRunner.prepare(new BlockingTwillRunnable())
.addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.start();
// Wait until the runnable is runnable
String runnableName = BlockingTwillRunnable.class.getSimpleName();
ResourceReport resourceReport = controller.getResourceReport();
while (resourceReport == null || resourceReport.getRunnableResources(runnableName).isEmpty()) {
TimeUnit.SECONDS.sleep(1);
resourceReport = controller.getResourceReport();
}
long startTime = System.currentTimeMillis();
// Inspect the cache directory, there should be a directory, which is the current session
// inside that directory, there should be three files, launcher.jar, twill.jar and an application jar
LocationFactory locationFactory = TWILL_TESTER.createLocationFactory();
Location cacheBase = locationFactory.create(".cache");
List<Location> cacheDirs = cacheBase.list();
Assert.assertEquals(1, cacheDirs.size());
Location currentSessionCache = cacheDirs.get(0);
Assert.assertEquals(3, currentSessionCache.list().size());
// Force a cleanup of cache. The first call is to collect the locations to be cleanup.
// The second call is the actual cleanup.
((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime);
((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime +
Configs.Defaults.LOCATION_CACHE_EXPIRY_MS);
// Since the app is still runnable, no files in the cache should get removed.
Assert.assertEquals(3, currentSessionCache.list().size());
// Stop the app
controller.terminate().get();
// Force a cleanup of cache. The first call is to collect the locations to be cleanup.
// The second call is the actual cleanup.
((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime);
((YarnTwillRunnerService) twillRunner).forceLocationCacheCleanup(startTime +
Configs.Defaults.LOCATION_CACHE_EXPIRY_MS);
// Since the app is stopped, there should only be two files, the launcher.jar and twill.jar, as they
// will never get removed for the current session.
Set<Location> cachedLocations = new HashSet<>(currentSessionCache.list());
Assert.assertEquals(2, cachedLocations.size());
Assert.assertTrue(cachedLocations.contains(currentSessionCache.append(Constants.Files.LAUNCHER_JAR)));
Assert.assertTrue(cachedLocations.contains(currentSessionCache.append(Constants.Files.TWILL_JAR)));
// Start another YarnTwillRunnerService
TwillRunnerService newTwillRunner = TWILL_TESTER.createTwillRunnerService();
newTwillRunner.start();
// Force a cleanup using the antique expiry. The list of locations that need to be cleanup was already
// collected when the new twill runner was started.
// Need to add some time in addition to the antique expiry time because the cache cleaner collects
// pending list asynchronously, which the "current" time it uses to calculate the expiration time might be
// later than the System.currentTimeMillis() call in the next line.
((YarnTwillRunnerService) newTwillRunner)
.forceLocationCacheCleanup(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(30) +
Configs.Defaults.LOCATION_CACHE_ANTIQUE_EXPIRY_MS);
// Now there shouldn't be any file under the current session cache directory
List<Location> locations = currentSessionCache.list();
Assert.assertTrue("Location is not empty " + locations, locations.isEmpty());
}
/**
* A runnable that blocks until stopped explicitly.
*/
public static final class BlockingTwillRunnable extends AbstractTwillRunnable {
private final CountDownLatch stopLatch = new CountDownLatch(1);
@Override
public void run() {
Uninterruptibles.awaitUninterruptibly(stopLatch);
}
@Override
public void stop() {
stopLatch.countDown();
}
}
}