blob: 2c0fc9928f6175441c72647082d8881422e2c844 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.ConsoleAppender;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch;
import org.apache.drill.exec.testing.Controls;
import org.apache.drill.exec.testing.ControlsInjectionUtil;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.LogFixture;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertTrue;
public class TestOperatorDump extends ClusterTest {
private static final String ENTRY_DUMP_COMPLETED = "Batch dump completed";
private static final String ENTRY_DUMP_STARTED = "Batch dump started";
private LogFixture logFixture;
private EventAwareContextAppender appender;
@BeforeClass
public static void setupFiles() {
dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
}
@Before
public void setup() throws Exception {
ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
appender = new EventAwareContextAppender();
logFixture = LogFixture.builder()
.toConsole(appender, LogFixture.DEFAULT_CONSOLE_FORMAT)
.build();
builder.configBuilder().put(BaseRootExec.ENABLE_BATCH_DUMP_CONFIG, "true");
startCluster(builder);
}
@After
public void tearDown(){
logFixture.close();
}
@Test(expected = UserRemoteException.class)
public void testScanBatchChecked() throws Exception {
String exceptionDesc = "next-allocate";
final String controls = Controls.newBuilder()
.addException(ScanBatch.class, exceptionDesc, OutOfMemoryException.class, 0, 1)
.build();
ControlsInjectionUtil.setControls(client.client(), controls);
String query = "select * from dfs.`multilevel/parquet` limit 100";
try {
client.queryBuilder().sql(query).run();
} catch (UserRemoteException e) {
assertTrue(e.getMessage().contains(exceptionDesc));
String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
validateContainsEntries(expectedEntries, ScanBatch.class.getName());
throw e;
}
}
@Test(expected = UserRemoteException.class)
public void testExternalSortUnchecked() throws Exception {
Class<?> siteClass = org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch.class;
final String controls = Controls.newBuilder()
.addException(siteClass, ExternalSortBatch.INTERRUPTION_AFTER_SORT, RuntimeException.class)
.build();
ControlsInjectionUtil.setControls(client.client(), controls);
String query = "select n_name from cp.`tpch/lineitem.parquet` order by n_name";
try {
client.queryBuilder().sql(query).run();
} catch (UserRemoteException e) {
assertTrue(e.getMessage().contains(ExternalSortBatch.INTERRUPTION_AFTER_SORT));
String[] expectedEntries = new String[] {ENTRY_DUMP_STARTED, ENTRY_DUMP_COMPLETED};
validateContainsEntries(expectedEntries, ExternalSortBatch.class.getName());
throw e;
}
}
private void validateContainsEntries(String[] entries, String expectedClassName) {
if (entries == null) {
entries = new String[0];
}
List<String> messages = appender.getMessages();
List<String> entryList = new ArrayList<>(entries.length);
Collections.addAll(entryList, entries);
Iterator<String> it = entryList.iterator();
while (it.hasNext()) {
String entry = it.next();
for (String message : messages) {
if (message.contains(entry)) {
it.remove();
break;
}
}
}
assertTrue(String.format("Entries %s were not found in logs.", entryList), entryList.isEmpty());
Set<String> loggerNames = appender.getLoggerNames();
assertTrue(String.format("Entry for class %s was not found", expectedClassName),
loggerNames.contains(expectedClassName));
}
// ConsoleAppender which stores logged events
private static class EventAwareContextAppender extends ConsoleAppender<ILoggingEvent> {
private List<ILoggingEvent> events = new ArrayList<>();
@Override
protected void append(ILoggingEvent e) {
events.add(e);
}
List<String> getMessages() {
return events.stream()
.map(ILoggingEvent::getMessage)
.collect(Collectors.toList());
}
Set<String> getLoggerNames() {
return events.stream()
.map(ILoggingEvent::getLoggerName)
.collect(Collectors.toSet());
}
}
}