blob: 30b7439bb7c0b5132416f5ea8a3bd34d56edbae6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.tests.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.OverlordResourceTestClient;
import org.apache.druid.testing.utils.RetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
public abstract class AbstractIndexerTest
{
@Inject
protected CoordinatorResourceTestClient coordinator;
@Inject
protected OverlordResourceTestClient indexer;
@Inject
@Json
protected ObjectMapper jsonMapper;
@Inject
@Smile
protected ObjectMapper smileMapper;
@Inject
protected TestQueryHelper queryHelper;
@Inject
private IntegrationTestingConfig config;
protected Closeable unloader(final String dataSource)
{
return () -> unloadAndKillData(dataSource);
}
protected void unloadAndKillData(final String dataSource)
{
List<String> intervals = coordinator.getSegmentIntervals(dataSource);
// each element in intervals has this form:
// 2015-12-01T23:15:00.000Z/2015-12-01T23:16:00.000Z
// we'll sort the list (ISO dates have lexicographic order)
// then delete segments from the 1st date in the first string
// to the 2nd date in the last string
Collections.sort(intervals);
String first = intervals.get(0).split("/")[0];
String last = intervals.get(intervals.size() - 1).split("/")[1];
unloadAndKillData(dataSource, first, last);
}
private void unloadAndKillData(final String dataSource, String start, String end)
{
// Wait for any existing index tasks to complete before disabling the datasource otherwise
// realtime tasks can get stuck waiting for handoff. https://github.com/apache/incubator-druid/issues/1729
waitForAllTasksToComplete();
Interval interval = Intervals.of(start + "/" + end);
coordinator.unloadSegmentsForDataSource(dataSource);
RetryUtil.retryUntilFalse(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
return coordinator.areSegmentsLoaded(dataSource);
}
}, "Segment Unloading"
);
coordinator.deleteSegmentsDataSource(dataSource, interval);
waitForAllTasksToComplete();
}
protected void waitForAllTasksToComplete()
{
RetryUtil.retryUntilTrue(
() -> {
int numTasks = indexer.getPendingTasks().size() +
indexer.getRunningTasks().size() +
indexer.getWaitingTasks().size();
return numTasks == 0;
},
"Waiting for Tasks Completion"
);
}
protected String getResourceAsString(String file) throws IOException
{
final InputStream inputStream = ITRealtimeIndexTaskTest.class.getResourceAsStream(file);
try {
return IOUtils.toString(inputStream, "UTF-8");
}
finally {
IOUtils.closeQuietly(inputStream);
}
}
}