blob: 87a324f48e5a22b237c7c32254212d7e25ac1c45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.agg;
import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.ClientFixture;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.ProfileParser;
import org.apache.drill.test.QueryBuilder;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test spilling for the Hash Aggr operator (using the mock reader)
*/
@Category({SlowTest.class, OperatorTest.class})
public class TestHashAggrSpill extends DrillTest {
// Matches the "2400K" in the table name.
public static final int DEFAULT_ROW_COUNT = 2_400_000;
@Rule
public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
/**
* A template for Hash Aggr spilling tests
*/
private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel, boolean fallback, boolean predict,
String sql, long expectedRows, int cycle, int fromPart, int toPart) throws Exception {
ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
.sessionOption(ExecConstants.HASHAGG_MAX_MEMORY_KEY,maxMem)
.sessionOption(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY,numPartitions)
.sessionOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_KEY,minBatches)
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
.sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
.sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, fallback)
.sessionOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_KEY,predict)
.maxParallelization(maxParallel)
.saveProfiles();
String sqlStr = sql != null ? sql : // if null then use this default query
"SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_2400K` GROUP BY empid_s17, dept_i, branch_i";
try (ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
runAndDump(client, sqlStr, expectedRows, cycle, fromPart, toPart);
}
}
/**
* Test "normal" spilling: Only 2 (or 3) partitions (out of 4) would require spilling
* ("normal spill" means spill-cycle = 1 )
*/
@Test
public void testSimpleHashAggrSpill() throws Exception {
testSpill(68_000_000, 16, 2, 2, false, true, null,
DEFAULT_ROW_COUNT, 1,2, 3);
}
/**
* Test with "needed memory" prediction turned off
* (i.e., exercise code paths that catch OOMs from the Hash Table and recover)
*/
@Test
@Ignore("DRILL-7301")
public void testNoPredictHashAggrSpill() throws Exception {
testSpill(135_000_000, 16, 2, 2, false, false /* no prediction */, null,
DEFAULT_ROW_COUNT, 1, 1, 1);
}
private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle, long fromSpilledPartitions, long toSpilledPartitions) throws Exception {
QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
if (expectedRows > 0) {
assertEquals(expectedRows, summary.recordCount());
}
ProfileParser profile = client.parseProfile(summary.queryIdString());
List<ProfileParser.OperatorProfile> ops = profile.getOpsOfType(UserBitShared.CoreOperatorType.HASH_AGGREGATE_VALUE);
assertFalse(ops.isEmpty());
// check for the first op only
ProfileParser.OperatorProfile hag0 = ops.get(0);
long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
assertEquals(spillCycle, opCycle);
long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
assertTrue(op_spilled_partitions >= fromSpilledPartitions);
assertTrue(op_spilled_partitions <= toSpilledPartitions);
}
/**
* Test Secondary and Tertiary spill cycles - Happens when some of the spilled
* partitions cause more spilling as they are read back
*/
@Test
public void testHashAggrSecondaryTertiarySpill() throws Exception {
testSpill(58_000_000, 16, 3, 1, false, true,
"SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i",
1_100_000, 3, 2, 2);
}
/**
* Test with the "fallback" option disabled: When not enough memory available
* to allow spilling, then fail (Resource error) !!
*/
@Test
public void testHashAggrFailWithFallbackDisabed() throws Exception {
try {
testSpill(34_000_000, 4, 5, 2, false /* no fallback */, true, null,
DEFAULT_ROW_COUNT, 0 /* no spill due to fallback to pre-1.11 */, 0, 0);
fail(); // in case the above test did not throw
} catch (Exception ex) {
assertTrue(ex instanceof UserRemoteException);
assertTrue(((UserRemoteException) ex).getErrorType() == UserBitShared.DrillPBError.ErrorType.RESOURCE);
// must get here for the test to succeed ...
}
}
/**
* Test with the "fallback" option ON: When not enough memory is available to
* allow spilling (internally need enough memory to create multiple
* partitions), then behave like the pre-1.11 Hash Aggregate: Allocate
* unlimited memory, no spill.
*/
@Test
public void testHashAggrSuccessWithFallbackEnabled() throws Exception {
testSpill(34_000_000, 4, 5, 2, true /* do fallback */, true, null,
DEFAULT_ROW_COUNT, 0 /* no spill due to fallback to pre-1.11 */, 0, 0);
}
}