| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.yarn; |
| |
| import org.apache.flink.api.common.time.Deadline; |
| import org.apache.flink.client.cli.CliFrontend; |
| import org.apache.flink.configuration.ConfigConstants; |
| import org.apache.flink.configuration.GlobalConfiguration; |
| import org.apache.flink.configuration.JobManagerOptions; |
| import org.apache.flink.runtime.clusterframework.BootstrapTools; |
| import org.apache.flink.test.util.TestBaseUtils; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.Preconditions; |
| import org.apache.flink.util.TestLogger; |
| import org.apache.flink.util.function.RunnableWithException; |
| import org.apache.flink.yarn.cli.FlinkYarnSessionCli; |
| import org.apache.flink.yarn.util.TestUtils; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonConfigurationKeysPublic; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.service.Service; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ApplicationReport; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.YarnApplicationState; |
| import org.apache.hadoop.yarn.client.api.YarnClient; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.exceptions.YarnException; |
| import org.apache.hadoop.yarn.server.MiniYARNCluster; |
| import org.apache.hadoop.yarn.server.nodemanager.NodeManager; |
| import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.rules.TemporaryFolder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.slf4j.Marker; |
| import org.slf4j.MarkerFactory; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.FileWriter; |
| import java.io.FilenameFilter; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.PipedInputStream; |
| import java.io.PipedOutputStream; |
| import java.io.PrintStream; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Scanner; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.function.Supplier; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import java.util.stream.StreamSupport; |
| |
| import static org.apache.flink.util.Preconditions.checkState; |
| import static org.junit.Assert.assertEquals; |
| |
| /** |
| * This base class allows to use the MiniYARNCluster. The cluster is re-used for all tests. |
| * |
| * <p>This class is located in a different package which is build after flink-dist. This way, we can |
| * use the YARN uberjar of flink to start a Flink YARN session. |
| * |
| * <p>The test is not thread-safe. Parallel execution of tests is not possible! |
| */ |
| public abstract class YarnTestBase extends TestLogger { |
| private static final Logger LOG = LoggerFactory.getLogger(YarnTestBase.class); |
| |
| protected static final PrintStream ORIGINAL_STDOUT = System.out; |
| protected static final PrintStream ORIGINAL_STDERR = System.err; |
| private static final InputStream ORIGINAL_STDIN = System.in; |
| |
| protected static final String TEST_CLUSTER_NAME_KEY = "flink-yarn-minicluster-name"; |
| |
| protected static final int NUM_NODEMANAGERS = 2; |
| |
| /** The tests are scanning for these strings in the final output. */ |
| protected static final String[] PROHIBITED_STRINGS = { |
| "Exception", // we don't want any exceptions to happen |
| "Started SelectChannelConnector@0.0.0.0:8081" // Jetty should start on a random port in YARN |
| // mode. |
| }; |
| |
| /** These strings are white-listed, overriding the prohibited strings. */ |
| protected static final Pattern[] WHITELISTED_STRINGS = { |
| // happens if yarn does not support external resources |
| Pattern.compile( |
| "ClassNotFoundException: org.apache.hadoop.yarn.api.records.ResourceInformation"), |
| // occurs if a TM disconnects from a JM because it is no longer hosting any slots |
| Pattern.compile("has no more allocated slots for job"), |
| // can happen if another process hasn't fully started yet |
| Pattern.compile("akka.actor.ActorNotFound: Actor not found for"), |
| // can happen if another process hasn't fully started yet |
| Pattern.compile("RpcConnectionException: Could not connect to rpc endpoint under address"), |
| // rest handler whose registration is logged on DEBUG level |
| Pattern.compile("JobExceptionsHandler"), |
| Pattern.compile("akka\\.remote\\.RemoteTransportExceptionNoStackTrace"), |
| // workaround for annoying InterruptedException logging: |
| // https://issues.apache.org/jira/browse/YARN-1022 |
| Pattern.compile("java\\.lang\\.InterruptedException"), |
| // very specific on purpose; whitelist meaningless exceptions that occur during akka |
| // shutdown: |
| Pattern.compile( |
| "Remote connection to \\[null\\] failed with java.net.ConnectException: Connection refused"), |
| Pattern.compile( |
| "Remote connection to \\[null\\] failed with java.nio.channels.NotYetConnectedException"), |
| Pattern.compile("java\\.io\\.IOException: Connection reset by peer"), |
| Pattern.compile( |
| "Association with remote system \\[akka.tcp://flink@[^]]+\\] has failed, address is now gated for \\[50\\] ms. Reason: \\[Association failed with \\[akka.tcp://flink@[^]]+\\]\\] Caused by: \\[java.net.ConnectException: Connection refused: [^]]+\\]"), |
| |
| // filter out expected ResourceManagerException caused by intended shutdown request |
| Pattern.compile(YarnResourceManagerDriver.ERROR_MESSAGE_ON_SHUTDOWN_REQUEST), |
| |
| // this can happen in Akka 2.4 on shutdown. |
| Pattern.compile( |
| "java\\.util\\.concurrent\\.RejectedExecutionException: Worker has already been shutdown"), |
| Pattern.compile("org\\.apache\\.flink.util\\.FlinkException: Stopping JobMaster"), |
| Pattern.compile( |
| "org\\.apache\\.flink.util\\.FlinkException: JobManager is shutting down\\."), |
| Pattern.compile("lost the leadership."), |
| Pattern.compile( |
| "akka.remote.transport.netty.NettyTransport.*Remote connection to \\[[^]]+\\] failed with java.io.IOException: Broken pipe"), |
| |
| // this can happen during cluster shutdown, if AMRMClient happens to be heartbeating |
| Pattern.compile("Exception on heartbeat"), |
| Pattern.compile("java\\.io\\.InterruptedIOException: Call interrupted") |
| }; |
| |
| // Temp directory which is deleted after the unit test. |
| @ClassRule public static TemporaryFolder tmp = new TemporaryFolder(); |
| |
| // Temp directory for mini hdfs |
| @ClassRule public static TemporaryFolder tmpHDFS = new TemporaryFolder(); |
| |
| protected static MiniYARNCluster yarnCluster = null; |
| |
| protected static MiniDFSCluster miniDFSCluster = null; |
| |
| /** Uberjar (fat jar) file of Flink. */ |
| protected static File flinkUberjar; |
| |
| protected static final YarnConfiguration YARN_CONFIGURATION; |
| |
| /** lib/ folder of the flink distribution. */ |
| protected static File flinkLibFolder; |
| |
| /** Temporary folder where Flink configurations will be kept for secure run. */ |
| protected static File tempConfPathForSecureRun = null; |
| |
| protected static File yarnSiteXML = null; |
| protected static File hdfsSiteXML = null; |
| |
| private YarnClient yarnClient = null; |
| |
| private static org.apache.flink.configuration.Configuration globalConfiguration; |
| |
| protected org.apache.flink.configuration.Configuration flinkConfiguration; |
| |
| static { |
| YARN_CONFIGURATION = new YarnConfiguration(); |
| YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32); |
| YARN_CONFIGURATION.setInt( |
| YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, |
| 4096); // 4096 is the available memory anyways |
| YARN_CONFIGURATION.setBoolean( |
| YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); |
| YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); |
| YARN_CONFIGURATION.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2); |
| YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4); |
| YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); |
| YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); |
| YARN_CONFIGURATION.setInt( |
| YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. |
| // so we have to change the number of cores for testing. |
| YARN_CONFIGURATION.setFloat( |
| YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 99.0F); |
| YARN_CONFIGURATION.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, getYarnClasspath()); |
| } |
| |
| /** |
| * Searches for the yarn.classpath file generated by the "dependency:build-classpath" maven |
| * plugin in "flink-yarn-tests". |
| * |
| * @return a classpath suitable for running all YARN-launched JVMs |
| */ |
| private static String getYarnClasspath() { |
| final String start = "../flink-yarn-tests"; |
| try { |
| File classPathFile = |
| TestUtils.findFile(start, (dir, name) -> name.equals("yarn.classpath")); |
| return FileUtils.readFileToString( |
| classPathFile); // potential NPE is supposed to be fatal |
| } catch (Throwable t) { |
| LOG.error( |
| "Error while getting YARN classpath in {}", |
| new File(start).getAbsoluteFile(), |
| t); |
| throw new RuntimeException("Error while getting YARN classpath", t); |
| } |
| } |
| |
| public static void populateYarnSecureConfigurations( |
| Configuration conf, String principal, String keytab) { |
| |
| conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); |
| conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); |
| |
| conf.set(YarnConfiguration.RM_KEYTAB, keytab); |
| conf.set(YarnConfiguration.RM_PRINCIPAL, principal); |
| conf.set(YarnConfiguration.NM_KEYTAB, keytab); |
| conf.set(YarnConfiguration.NM_PRINCIPAL, principal); |
| |
| conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, principal); |
| conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab); |
| conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, principal); |
| conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab); |
| |
| conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]"); |
| } |
| |
| @Before |
| public void setupYarnClient() { |
| if (yarnClient == null) { |
| yarnClient = YarnClient.createYarnClient(); |
| yarnClient.init(getYarnConfiguration()); |
| yarnClient.start(); |
| } |
| |
| flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration); |
| } |
| |
| /** Sleep a bit between the tests (we are re-using the YARN cluster for the tests). */ |
| @After |
| public void shutdownYarnClient() { |
| yarnClient.stop(); |
| } |
| |
| protected void runTest(RunnableWithException test) throws Exception { |
| // wrapping the cleanup logic in an AutoClosable automatically suppresses additional |
| // exceptions |
| try (final CleanupYarnApplication ignored = new CleanupYarnApplication()) { |
| test.run(); |
| } |
| } |
| |
| private class CleanupYarnApplication implements AutoCloseable { |
| @Override |
| public void close() throws Exception { |
| Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10)); |
| |
| boolean isAnyJobRunning = |
| getApplicationReportWithRetryOnNPE(yarnClient).stream() |
| .anyMatch(YarnTestBase::isApplicationRunning); |
| |
| while (deadline.hasTimeLeft() && isAnyJobRunning) { |
| try { |
| Thread.sleep(500); |
| } catch (InterruptedException e) { |
| Assert.fail("Should not happen"); |
| } |
| isAnyJobRunning = |
| getApplicationReportWithRetryOnNPE(yarnClient).stream() |
| .anyMatch(YarnTestBase::isApplicationRunning); |
| } |
| |
| if (isAnyJobRunning) { |
| final List<String> runningApps = |
| getApplicationReportWithRetryOnNPE(yarnClient).stream() |
| .filter(YarnTestBase::isApplicationRunning) |
| .map( |
| app -> |
| "App " |
| + app.getApplicationId() |
| + " is in state " |
| + app.getYarnApplicationState() |
| + '.') |
| .collect(Collectors.toList()); |
| if (!runningApps.isEmpty()) { |
| Assert.fail( |
| "There is at least one application on the cluster that is not finished." |
| + runningApps); |
| } |
| } |
| } |
| } |
| |
| static List<ApplicationReport> getApplicationReportWithRetryOnNPE(final YarnClient yarnClient) |
| throws IOException, YarnException { |
| return getApplicationReportWithRetryOnNPE(yarnClient, null); |
| } |
| |
| static List<ApplicationReport> getApplicationReportWithRetryOnNPE( |
| final YarnClient yarnClient, @Nullable EnumSet<YarnApplicationState> states) |
| throws IOException, YarnException { |
| final int maxRetryCount = 10; |
| NullPointerException mostRecentNPE = null; |
| for (int i = 0; i < maxRetryCount; i++) { |
| try { |
| return yarnClient.getApplications(states); |
| } catch (NullPointerException e) { |
| String npeStr = ExceptionUtils.stringifyException(e); |
| if (!npeStr.contains("RMAppAttemptMetrics.getAggregateAppResourceUsage")) { |
| // unrelated NullPointerExceptions should be forwarded to the calling method |
| throw e; |
| } |
| |
| mostRecentNPE = e; |
| final String logMessage = |
| "NullPointerException was caught most likely being related to YARN-7007. The related discussion is happening in FLINK-15534. The exception is going to be ignored."; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(logMessage, mostRecentNPE); |
| } else { |
| LOG.warn(logMessage); |
| } |
| } |
| } |
| |
| throw new IllegalStateException( |
| "YarnClient.getApplications command failed " |
| + maxRetryCount |
| + " times to gather the application report. Check FLINK-15534 for further details.", |
| mostRecentNPE); |
| } |
| |
| private static boolean isApplicationRunning(ApplicationReport app) { |
| final YarnApplicationState yarnApplicationState = app.getYarnApplicationState(); |
| return yarnApplicationState != YarnApplicationState.FINISHED |
| && app.getYarnApplicationState() != YarnApplicationState.KILLED |
| && app.getYarnApplicationState() != YarnApplicationState.FAILED; |
| } |
| |
| @Nullable |
| protected YarnClient getYarnClient() { |
| return yarnClient; |
| } |
| |
| protected static YarnConfiguration getYarnConfiguration() { |
| return YARN_CONFIGURATION; |
| } |
| |
| @Nonnull |
| YarnClusterDescriptor createYarnClusterDescriptor( |
| org.apache.flink.configuration.Configuration flinkConfiguration) { |
| final YarnClusterDescriptor yarnClusterDescriptor = |
| createYarnClusterDescriptorWithoutLibDir(flinkConfiguration); |
| yarnClusterDescriptor.addShipFiles(Collections.singletonList(flinkLibFolder)); |
| return yarnClusterDescriptor; |
| } |
| |
| YarnClusterDescriptor createYarnClusterDescriptorWithoutLibDir( |
| org.apache.flink.configuration.Configuration flinkConfiguration) { |
| final YarnClusterDescriptor yarnClusterDescriptor = |
| YarnTestUtils.createClusterDescriptorWithLogging( |
| tempConfPathForSecureRun.getAbsolutePath(), |
| flinkConfiguration, |
| YARN_CONFIGURATION, |
| yarnClient, |
| true); |
| yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.toURI())); |
| return yarnClusterDescriptor; |
| } |
| |
| /** |
| * A simple {@link FilenameFilter} that only accepts files if their name contains every string |
| * in the array passed to the constructor. |
| */ |
| public static class ContainsName implements FilenameFilter { |
| private String[] names; |
| private String excludeInPath = null; |
| |
| /** @param names which have to be included in the filename. */ |
| public ContainsName(String[] names) { |
| this.names = names; |
| } |
| |
| public ContainsName(String[] names, String excludeInPath) { |
| this.names = names; |
| this.excludeInPath = excludeInPath; |
| } |
| |
| @Override |
| public boolean accept(File dir, String name) { |
| if (excludeInPath == null) { |
| for (String n : names) { |
| if (!name.contains(n)) { |
| return false; |
| } |
| } |
| return true; |
| } else { |
| for (String n : names) { |
| if (!name.contains(n)) { |
| return false; |
| } |
| } |
| return !dir.toString().contains(excludeInPath); |
| } |
| } |
| } |
| |
| // write yarn-site.xml to target/test-classes so that flink pick can pick up this when |
| // initializing YarnClient properly from classpath |
| public static void writeYarnSiteConfigXML(Configuration yarnConf, File targetFolder) |
| throws IOException { |
| yarnSiteXML = new File(targetFolder, "/yarn-site.xml"); |
| try (FileWriter writer = new FileWriter(yarnSiteXML)) { |
| yarnConf.writeXml(writer); |
| writer.flush(); |
| } |
| } |
| |
| private static void writeHDFSSiteConfigXML(Configuration coreSite, File targetFolder) |
| throws IOException { |
| hdfsSiteXML = new File(targetFolder, "/hdfs-site.xml"); |
| try (FileWriter writer = new FileWriter(hdfsSiteXML)) { |
| coreSite.writeXml(writer); |
| writer.flush(); |
| } |
| } |
| |
| /** |
| * This method checks the written TaskManager and JobManager log files for exceptions. |
| * |
| * <p>WARN: Please make sure the tool doesn't find old logfiles from previous test runs. So |
| * always run "mvn clean" before running the tests here. |
| */ |
| public static void ensureNoProhibitedStringInLogFiles( |
| final String[] prohibited, final Pattern[] whitelisted) { |
| File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY)); |
| Assert.assertTrue( |
| "Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists()); |
| Assert.assertTrue( |
| "Expecting directory " + cwd.getAbsolutePath() + " to be a directory", |
| cwd.isDirectory()); |
| |
| List<String> prohibitedExcerpts = new ArrayList<>(); |
| File foundFile = |
| TestUtils.findFile( |
| cwd.getAbsolutePath(), |
| new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| // scan each file for prohibited strings. |
| File logFile = new File(dir.getAbsolutePath() + "/" + name); |
| try { |
| BufferingScanner scanner = |
| new BufferingScanner(new Scanner(logFile), 10); |
| while (scanner.hasNextLine()) { |
| final String lineFromFile = scanner.nextLine(); |
| for (String aProhibited : prohibited) { |
| if (lineFromFile.contains(aProhibited)) { |
| |
| boolean whitelistedFound = false; |
| for (Pattern whitelistPattern : whitelisted) { |
| Matcher whitelistMatch = |
| whitelistPattern.matcher(lineFromFile); |
| if (whitelistMatch.find()) { |
| whitelistedFound = true; |
| break; |
| } |
| } |
| |
| if (!whitelistedFound) { |
| // logging in FATAL to see the actual message in |
| // CI tests. |
| Marker fatal = MarkerFactory.getMarker("FATAL"); |
| LOG.error( |
| fatal, |
| "Prohibited String '{}' in '{}:{}'", |
| aProhibited, |
| logFile.getAbsolutePath(), |
| lineFromFile); |
| |
| StringBuilder logExcerpt = new StringBuilder(); |
| |
| logExcerpt.append(System.lineSeparator()); |
| |
| // include some previous lines in case of |
| // irregular formatting |
| for (String previousLine : |
| scanner.getPreviousLines()) { |
| logExcerpt.append(previousLine); |
| logExcerpt.append(System.lineSeparator()); |
| } |
| |
| logExcerpt.append(lineFromFile); |
| logExcerpt.append(System.lineSeparator()); |
| // extract potential stack trace from log |
| while (scanner.hasNextLine()) { |
| String line = scanner.nextLine(); |
| logExcerpt.append(line); |
| logExcerpt.append(System.lineSeparator()); |
| if (line.isEmpty() |
| || (!Character.isWhitespace( |
| line.charAt(0)) |
| && !line.startsWith( |
| "Caused by"))) { |
| // the cause has been printed, now add a |
| // few more lines in case of irregular |
| // formatting |
| for (int x = 0; |
| x < 10 && scanner.hasNextLine(); |
| x++) { |
| logExcerpt.append( |
| scanner.nextLine()); |
| logExcerpt.append( |
| System.lineSeparator()); |
| } |
| break; |
| } |
| } |
| prohibitedExcerpts.add(logExcerpt.toString()); |
| |
| return true; |
| } |
| } |
| } |
| } |
| } catch (FileNotFoundException e) { |
| LOG.warn( |
| "Unable to locate file: " |
| + e.getMessage() |
| + " file: " |
| + logFile.getAbsolutePath()); |
| } |
| |
| return false; |
| } |
| }); |
| if (foundFile != null) { |
| Scanner scanner = null; |
| try { |
| scanner = new Scanner(foundFile); |
| } catch (FileNotFoundException e) { |
| Assert.fail( |
| "Unable to locate file: " |
| + e.getMessage() |
| + " file: " |
| + foundFile.getAbsolutePath()); |
| } |
| LOG.warn("Found a file with a prohibited string. Printing contents:"); |
| while (scanner.hasNextLine()) { |
| LOG.warn("LINE: " + scanner.nextLine()); |
| } |
| Assert.fail( |
| "Found a file " |
| + foundFile |
| + " with a prohibited string (one of " |
| + Arrays.toString(prohibited) |
| + "). " |
| + "Excerpts:" |
| + System.lineSeparator() |
| + prohibitedExcerpts); |
| } |
| } |
| |
| public static boolean verifyStringsInNamedLogFiles( |
| final String[] mustHave, final ApplicationId applicationId, final String fileName) { |
| final List<String> mustHaveList = Arrays.asList(mustHave); |
| final File cwd = new File("target", YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY)); |
| if (!cwd.exists() || !cwd.isDirectory()) { |
| return false; |
| } |
| |
| final File foundFile = |
| TestUtils.findFile( |
| cwd.getAbsolutePath(), |
| new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| if (fileName != null && !name.equals(fileName)) { |
| return false; |
| } |
| final File f = new File(dir.getAbsolutePath(), name); |
| // Only check the specified application logs |
| if (StreamSupport.stream(f.toPath().spliterator(), false) |
| .noneMatch(p -> p.endsWith(applicationId.toString()))) { |
| return false; |
| } |
| LOG.info("Searching in {}", f.getAbsolutePath()); |
| try (Scanner scanner = new Scanner(f)) { |
| final Set<String> foundSet = new HashSet<>(mustHave.length); |
| while (scanner.hasNextLine()) { |
| final String lineFromFile = scanner.nextLine(); |
| for (String str : mustHave) { |
| if (lineFromFile.contains(str)) { |
| foundSet.add(str); |
| } |
| } |
| if (foundSet.containsAll(mustHaveList)) { |
| return true; |
| } |
| } |
| } catch (FileNotFoundException e) { |
| LOG.warn( |
| "Unable to locate file: " |
| + e.getMessage() |
| + " file: " |
| + f.getAbsolutePath()); |
| } |
| return false; |
| } |
| }); |
| |
| if (foundFile != null) { |
| LOG.info( |
| "Found string {} in {}.", |
| Arrays.toString(mustHave), |
| foundFile.getAbsolutePath()); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| public static boolean verifyTokenKindInContainerCredentials( |
| final Collection<String> tokens, final String containerId) throws IOException { |
| File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY)); |
| if (!cwd.exists() || !cwd.isDirectory()) { |
| return false; |
| } |
| |
| File containerTokens = |
| TestUtils.findFile( |
| cwd.getAbsolutePath(), |
| new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return name.equals(containerId + ".tokens"); |
| } |
| }); |
| |
| if (containerTokens != null) { |
| LOG.info("Verifying tokens in {}", containerTokens.getAbsolutePath()); |
| |
| Credentials tmCredentials = |
| Credentials.readTokenStorageFile(containerTokens, new Configuration()); |
| |
| Collection<Token<? extends TokenIdentifier>> userTokens = tmCredentials.getAllTokens(); |
| Set<String> tokenKinds = new HashSet<>(4); |
| for (Token<? extends TokenIdentifier> token : userTokens) { |
| tokenKinds.add(token.getKind().toString()); |
| } |
| |
| return tokenKinds.containsAll(tokens); |
| } else { |
| LOG.warn("Unable to find credential file for container {}", containerId); |
| return false; |
| } |
| } |
| |
| public static String getContainerIdByLogName(String logName) { |
| File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY)); |
| File containerLog = |
| TestUtils.findFile( |
| cwd.getAbsolutePath(), |
| new FilenameFilter() { |
| @Override |
| public boolean accept(File dir, String name) { |
| return name.equals(logName); |
| } |
| }); |
| if (containerLog != null) { |
| return containerLog.getParentFile().getName(); |
| } else { |
| throw new IllegalStateException("No container has log named " + logName); |
| } |
| } |
| |
| public static void sleep(int time) { |
| try { |
| Thread.sleep(time); |
| } catch (InterruptedException e) { |
| LOG.warn("Interruped", e); |
| } |
| } |
| |
| public static int getRunningContainers() { |
| int count = 0; |
| for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) { |
| NodeManager nm = yarnCluster.getNodeManager(nmId); |
| ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers(); |
| count += containers.size(); |
| } |
| return count; |
| } |
| |
| protected ApplicationReport getOnlyApplicationReport() throws IOException, YarnException { |
| final YarnClient yarnClient = getYarnClient(); |
| checkState(yarnClient != null); |
| |
| final List<ApplicationReport> apps = |
| getApplicationReportWithRetryOnNPE( |
| yarnClient, EnumSet.of(YarnApplicationState.RUNNING)); |
| assertEquals(1, apps.size()); // Only one running |
| return apps.get(0); |
| } |
| |
| public static void startYARNSecureMode( |
| YarnConfiguration conf, String principal, String keytab) { |
| start(conf, principal, keytab, false); |
| } |
| |
| public static void startYARNWithConfig(YarnConfiguration conf) { |
| startYARNWithConfig(conf, false); |
| } |
| |
| public static void startYARNWithConfig(YarnConfiguration conf, boolean withDFS) { |
| start(conf, null, null, withDFS); |
| } |
| |
| private static void start( |
| YarnConfiguration conf, String principal, String keytab, boolean withDFS) { |
| // set the home directory to a temp directory. Flink on YARN is using the home dir to |
| // distribute the file |
| File homeDir = null; |
| try { |
| homeDir = tmp.newFolder(); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| Assert.fail(e.getMessage()); |
| } |
| System.setProperty("user.home", homeDir.getAbsolutePath()); |
| String uberjarStartLoc = ".."; |
| LOG.info("Trying to locate uberjar in {}", new File(uberjarStartLoc).getAbsolutePath()); |
| flinkUberjar = TestUtils.findFile(uberjarStartLoc, new TestUtils.RootDirFilenameFilter()); |
| Assert.assertNotNull("Flink uberjar not found", flinkUberjar); |
| String flinkDistRootDir = flinkUberjar.getParentFile().getParent(); |
| flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/ |
| Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder); |
| Assert.assertTrue("lib folder not found", flinkLibFolder.exists()); |
| Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory()); |
| |
| if (!flinkUberjar.exists()) { |
| Assert.fail("Unable to locate yarn-uberjar.jar"); |
| } |
| |
| try { |
| LOG.info("Starting up MiniYARNCluster"); |
| if (yarnCluster == null) { |
| final String testName = conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY); |
| yarnCluster = |
| new MiniYARNCluster( |
| testName == null ? "YarnTest_" + UUID.randomUUID() : testName, |
| NUM_NODEMANAGERS, |
| 1, |
| 1); |
| |
| yarnCluster.init(conf); |
| yarnCluster.start(); |
| } |
| |
| Map<String, String> map = new HashMap<String, String>(System.getenv()); |
| |
| File flinkConfDirPath = |
| TestUtils.findFile( |
| flinkDistRootDir, new ContainsName(new String[] {"flink-conf.yaml"})); |
| Assert.assertNotNull(flinkConfDirPath); |
| |
| final String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath(); |
| globalConfiguration = GlobalConfiguration.loadConfiguration(confDirPath); |
| globalConfiguration.set( |
| JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofSeconds(30)); |
| |
| // copy conf dir to test temporary workspace location |
| tempConfPathForSecureRun = tmp.newFolder("conf"); |
| |
| FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun); |
| |
| BootstrapTools.writeConfiguration( |
| globalConfiguration, new File(tempConfPathForSecureRun, "flink-conf.yaml")); |
| |
| String configDir = tempConfPathForSecureRun.getAbsolutePath(); |
| |
| LOG.info( |
| "Temporary Flink configuration directory to be used for secure test: {}", |
| configDir); |
| |
| Assert.assertNotNull(configDir); |
| |
| map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir); |
| |
| File targetTestClassesFolder = new File("target/test-classes"); |
| writeYarnSiteConfigXML(conf, targetTestClassesFolder); |
| |
| if (withDFS) { |
| LOG.info("Starting up MiniDFSCluster"); |
| setMiniDFSCluster(targetTestClassesFolder); |
| } |
| |
| map.put( |
| "IN_TESTS", |
| "yes we are in tests"); // see YarnClusterDescriptor() for more infos |
| map.put("YARN_CONF_DIR", targetTestClassesFolder.getAbsolutePath()); |
| map.put("MAX_LOG_FILE_NUMBER", "10"); |
| TestBaseUtils.setEnv(map); |
| |
| Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED); |
| |
| // wait for the nodeManagers to connect |
| while (!yarnCluster.waitForNodeManagersToConnect(500)) { |
| LOG.info("Waiting for Nodemanagers to connect"); |
| } |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| LOG.error("setup failure", ex); |
| Assert.fail(); |
| } |
| } |
| |
| private static void setMiniDFSCluster(File targetTestClassesFolder) throws Exception { |
| if (miniDFSCluster == null) { |
| Configuration hdfsConfiguration = new Configuration(); |
| hdfsConfiguration.set( |
| MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpHDFS.getRoot().getAbsolutePath()); |
| miniDFSCluster = new MiniDFSCluster.Builder(hdfsConfiguration).numDataNodes(2).build(); |
| miniDFSCluster.waitClusterUp(); |
| |
| hdfsConfiguration = miniDFSCluster.getConfiguration(0); |
| writeHDFSSiteConfigXML(hdfsConfiguration, targetTestClassesFolder); |
| YARN_CONFIGURATION.addResource(hdfsConfiguration); |
| } |
| } |
| |
| /** Default @BeforeClass impl. Overwrite this for passing a different configuration */ |
| @BeforeClass |
| public static void setup() throws Exception { |
| startYARNWithConfig(YARN_CONFIGURATION, false); |
| } |
| |
| // -------------------------- Runner -------------------------- // |
| |
| protected static ByteArrayOutputStream outContent; |
| protected static ByteArrayOutputStream errContent; |
| |
| enum RunTypes { |
| YARN_SESSION, |
| CLI_FRONTEND |
| } |
| |
| /** This method returns once the "startedAfterString" has been seen. */ |
| protected Runner startWithArgs(String[] args, String startedAfterString, RunTypes type) |
| throws IOException { |
| LOG.info("Running with args {}", Arrays.toString(args)); |
| |
| outContent = new ByteArrayOutputStream(); |
| errContent = new ByteArrayOutputStream(); |
| PipedOutputStream out = new PipedOutputStream(); |
| PipedInputStream in = new PipedInputStream(out); |
| PrintStream stdinPrintStream = new PrintStream(out); |
| |
| System.setOut(new PrintStream(outContent)); |
| System.setErr(new PrintStream(errContent)); |
| System.setIn(in); |
| |
| final int startTimeoutSeconds = 60; |
| |
| Runner runner = |
| new Runner( |
| args, |
| flinkConfiguration, |
| CliFrontend.getConfigurationDirectoryFromEnv(), |
| type, |
| 0, |
| stdinPrintStream); |
| runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs())."); |
| runner.start(); |
| |
| for (int second = 0; second < startTimeoutSeconds; second++) { |
| sleep(1000); |
| // check output for correct TaskManager startup. |
| if (outContent.toString().contains(startedAfterString) |
| || errContent.toString().contains(startedAfterString)) { |
| LOG.info("Found expected output in redirected streams"); |
| return runner; |
| } |
| // check if thread died |
| if (!runner.isAlive()) { |
| resetStreamsAndSendOutput(); |
| if (runner.getRunnerError() != null) { |
| throw new RuntimeException( |
| "Runner failed with exception.", runner.getRunnerError()); |
| } |
| Assert.fail("Runner thread died before the test was finished."); |
| } |
| } |
| |
| resetStreamsAndSendOutput(); |
| Assert.fail( |
| "During the timeout period of " |
| + startTimeoutSeconds |
| + " seconds the " |
| + "expected string did not show up"); |
| return null; |
| } |
| |
| protected void runWithArgs( |
| String[] args, |
| String terminateAfterString, |
| String[] failOnStrings, |
| RunTypes type, |
| int returnCode) |
| throws IOException { |
| runWithArgs( |
| args, |
| terminateAfterString, |
| failOnStrings, |
| type, |
| returnCode, |
| Collections::emptyList); |
| } |
| |
| /** |
| * The test has been passed once the "terminateAfterString" has been seen. |
| * |
| * @param args Command line arguments for the runner |
| * @param terminateAfterString the runner is searching the stdout and stderr for this string. as |
| * soon as it appears, the test has passed |
| * @param failOnPatterns The runner is searching stdout and stderr for the pattern (regexp) |
| * specified here. If one appears, the test has failed |
| * @param type Set the type of the runner |
| * @param expectedReturnValue Expected return code from the runner. |
| * @param logMessageSupplier Supplier for log messages |
| */ |
| protected void runWithArgs( |
| String[] args, |
| String terminateAfterString, |
| String[] failOnPatterns, |
| RunTypes type, |
| int expectedReturnValue, |
| Supplier<Collection<String>> logMessageSupplier) |
| throws IOException { |
| LOG.info("Running with args {}", Arrays.toString(args)); |
| |
| outContent = new ByteArrayOutputStream(); |
| errContent = new ByteArrayOutputStream(); |
| PipedOutputStream out = new PipedOutputStream(); |
| PipedInputStream in = new PipedInputStream(out); |
| PrintStream stdinPrintStream = new PrintStream(out); |
| System.setOut(new PrintStream(outContent)); |
| System.setErr(new PrintStream(errContent)); |
| System.setIn(in); |
| |
| // we wait for at most three minutes |
| final int startTimeoutSeconds = 180; |
| final long deadline = System.currentTimeMillis() + (startTimeoutSeconds * 1000); |
| |
| Runner runner = |
| new Runner( |
| args, |
| flinkConfiguration, |
| CliFrontend.getConfigurationDirectoryFromEnv(), |
| type, |
| expectedReturnValue, |
| stdinPrintStream); |
| runner.start(); |
| |
| boolean expectedStringSeen = false; |
| boolean testPassedFromLog4j = false; |
| long shutdownTimeout = 30000L; |
| do { |
| sleep(1000); |
| String outContentString = outContent.toString(); |
| String errContentString = errContent.toString(); |
| if (failOnPatterns != null) { |
| for (String failOnString : failOnPatterns) { |
| Pattern pattern = Pattern.compile(failOnString); |
| if (pattern.matcher(outContentString).find() |
| || pattern.matcher(errContentString).find()) { |
| LOG.warn( |
| "Failing test. Output contained illegal string '" |
| + failOnString |
| + "'"); |
| resetStreamsAndSendOutput(); |
| // stopping runner. |
| runner.sendStop(); |
| // wait for the thread to stop |
| try { |
| runner.join(shutdownTimeout); |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted while stopping runner", e); |
| } |
| Assert.fail("Output contained illegal string '" + failOnString + "'"); |
| } |
| } |
| } |
| |
| for (String logMessage : logMessageSupplier.get()) { |
| if (logMessage.contains(terminateAfterString)) { |
| testPassedFromLog4j = true; |
| LOG.info("Found expected output in logging event {}", logMessage); |
| } |
| } |
| |
| if (outContentString.contains(terminateAfterString) |
| || errContentString.contains(terminateAfterString) |
| || testPassedFromLog4j) { |
| expectedStringSeen = true; |
| LOG.info("Found expected output in redirected streams"); |
| // send "stop" command to command line interface |
| LOG.info("RunWithArgs: request runner to stop"); |
| runner.sendStop(); |
| // wait for the thread to stop |
| try { |
| runner.join(shutdownTimeout); |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted while stopping runner", e); |
| } |
| LOG.warn("RunWithArgs runner stopped."); |
| } else { |
| // check if thread died |
| if (!runner.isAlive()) { |
| // leave loop: the runner died, so we can not expect new strings to show up. |
| break; |
| } |
| } |
| } while (runner.getRunnerError() == null |
| && !expectedStringSeen |
| && System.currentTimeMillis() < deadline); |
| |
| resetStreamsAndSendOutput(); |
| |
| if (runner.getRunnerError() != null) { |
| // this lets the test fail. |
| throw new RuntimeException("Runner failed", runner.getRunnerError()); |
| } |
| Assert.assertTrue( |
| "During the timeout period of " |
| + startTimeoutSeconds |
| + " seconds the " |
| + "expected string \"" |
| + terminateAfterString |
| + "\" did not show up.", |
| expectedStringSeen); |
| |
| LOG.info("Test was successful"); |
| } |
| |
| protected static void resetStreamsAndSendOutput() { |
| System.setOut(ORIGINAL_STDOUT); |
| System.setErr(ORIGINAL_STDERR); |
| System.setIn(ORIGINAL_STDIN); |
| |
| LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString()); |
| LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString()); |
| } |
| |
| /** Utility class to run yarn jobs. */ |
| protected static class Runner extends Thread { |
| private final String[] args; |
| private final org.apache.flink.configuration.Configuration configuration; |
| private final String configurationDirectory; |
| private final int expectedReturnValue; |
| |
| private final PrintStream stdinPrintStream; |
| |
| private RunTypes type; |
| private FlinkYarnSessionCli yCli; |
| private Throwable runnerError; |
| |
| public Runner( |
| String[] args, |
| org.apache.flink.configuration.Configuration configuration, |
| String configurationDirectory, |
| RunTypes type, |
| int expectedReturnValue, |
| PrintStream stdinPrintStream) { |
| |
| this.args = args; |
| this.configuration = Preconditions.checkNotNull(configuration); |
| this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory); |
| this.type = type; |
| this.expectedReturnValue = expectedReturnValue; |
| this.stdinPrintStream = Preconditions.checkNotNull(stdinPrintStream); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| int returnValue; |
| switch (type) { |
| case YARN_SESSION: |
| yCli = |
| new FlinkYarnSessionCli( |
| configuration, configurationDirectory, "", "", true); |
| returnValue = yCli.run(args); |
| break; |
| case CLI_FRONTEND: |
| try { |
| CliFrontend cli = |
| new CliFrontend( |
| configuration, |
| CliFrontend.loadCustomCommandLines( |
| configuration, configurationDirectory)); |
| returnValue = cli.parseAndRun(args); |
| } catch (Exception e) { |
| throw new RuntimeException( |
| "Failed to execute the following args with CliFrontend: " |
| + Arrays.toString(args), |
| e); |
| } |
| break; |
| default: |
| throw new RuntimeException("Unknown type " + type); |
| } |
| |
| if (returnValue != this.expectedReturnValue) { |
| Assert.fail( |
| "The YARN session returned with unexpected value=" |
| + returnValue |
| + " expected=" |
| + expectedReturnValue); |
| } |
| } catch (Throwable t) { |
| LOG.info("Runner stopped with exception", t); |
| // save error. |
| this.runnerError = t; |
| } |
| } |
| |
| /** Stops the Yarn session. */ |
| public void sendStop() { |
| stdinPrintStream.println("stop"); |
| } |
| |
| public Throwable getRunnerError() { |
| return runnerError; |
| } |
| } |
| |
| // -------------------------- Tear down -------------------------- // |
| |
| @AfterClass |
| public static void teardown() throws Exception { |
| |
| if (yarnCluster != null) { |
| LOG.info("Stopping MiniYarn Cluster"); |
| yarnCluster.stop(); |
| yarnCluster = null; |
| } |
| |
| if (miniDFSCluster != null) { |
| LOG.info("Stopping MiniDFS Cluster"); |
| miniDFSCluster.shutdown(); |
| miniDFSCluster = null; |
| } |
| |
| // Unset FLINK_CONF_DIR, as it might change the behavior of other tests |
| Map<String, String> map = new HashMap<>(System.getenv()); |
| map.remove(ConfigConstants.ENV_FLINK_CONF_DIR); |
| map.remove("YARN_CONF_DIR"); |
| map.remove("IN_TESTS"); |
| TestBaseUtils.setEnv(map); |
| |
| if (tempConfPathForSecureRun != null) { |
| FileUtil.fullyDelete(tempConfPathForSecureRun); |
| tempConfPathForSecureRun = null; |
| } |
| |
| if (yarnSiteXML != null) { |
| yarnSiteXML.delete(); |
| } |
| |
| if (hdfsSiteXML != null) { |
| hdfsSiteXML.delete(); |
| } |
| |
| // When we are on CI, we copy the temp files of JUnit (containing the MiniYARNCluster log |
| // files) |
| // to <flinkRoot>/target/flink-yarn-tests-*. |
| // The files from there are picked up by the tools/ci/* scripts to upload them. |
| if (isOnCI()) { |
| File target = new File("../target" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY)); |
| if (!target.mkdirs()) { |
| LOG.warn("Error creating dirs to {}", target); |
| } |
| File src = tmp.getRoot(); |
| LOG.info( |
| "copying the final files from {} to {}", |
| src.getAbsolutePath(), |
| target.getAbsolutePath()); |
| try { |
| FileUtils.copyDirectoryToDirectory(src, target); |
| } catch (IOException e) { |
| LOG.warn( |
| "Error copying the final files from {} to {}: msg: {}", |
| src.getAbsolutePath(), |
| target.getAbsolutePath(), |
| e.getMessage(), |
| e); |
| } |
| } |
| } |
| |
| public static boolean isOnCI() { |
| return System.getenv("IS_CI") != null && System.getenv("IS_CI").equals("true"); |
| } |
| |
| protected void waitApplicationFinishedElseKillIt( |
| ApplicationId applicationId, |
| Duration timeout, |
| YarnClusterDescriptor yarnClusterDescriptor, |
| int sleepIntervalInMS) |
| throws Exception { |
| Deadline deadline = Deadline.now().plus(timeout); |
| YarnApplicationState state = |
| getYarnClient().getApplicationReport(applicationId).getYarnApplicationState(); |
| |
| while (state != YarnApplicationState.FINISHED) { |
| if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { |
| Assert.fail("Application became FAILED or KILLED while expecting FINISHED"); |
| } |
| |
| if (deadline.isOverdue()) { |
| yarnClusterDescriptor.killCluster(applicationId); |
| Assert.fail("Application didn't finish before timeout"); |
| } |
| |
| sleep(sleepIntervalInMS); |
| state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState(); |
| } |
| } |
| |
| /** Wrapper around a {@link Scanner} that buffers the last N lines read. */ |
| private static class BufferingScanner { |
| |
| private final Scanner scanner; |
| private final int numLinesBuffered; |
| private final List<String> bufferedLines; |
| |
| BufferingScanner(Scanner scanner, int numLinesBuffered) { |
| this.scanner = scanner; |
| this.numLinesBuffered = numLinesBuffered; |
| this.bufferedLines = new ArrayList<>(numLinesBuffered); |
| } |
| |
| public boolean hasNextLine() { |
| return scanner.hasNextLine(); |
| } |
| |
| public String nextLine() { |
| if (bufferedLines.size() == numLinesBuffered) { |
| bufferedLines.remove(0); |
| } |
| String line = scanner.nextLine(); |
| bufferedLines.add(line); |
| return line; |
| } |
| |
| public List<String> getPreviousLines() { |
| return new ArrayList<>(bufferedLines); |
| } |
| } |
| } |