blob: 129bf9d51b70d2b2422404eefd57ceda62650311 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.schema;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.ServerView.CallbackAction;
import org.apache.druid.client.ServerView.SegmentCallback;
import org.apache.druid.client.ServerView.ServerRemovedCallback;
import org.apache.druid.client.SingleServerInventoryView.FilteringSegmentCallback;
import org.apache.druid.client.TimelineServerView.TimelineCallback;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.table.DruidTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class DruidSchemaConcurrencyTest extends DruidSchemaTestCommon
{
private static final String DATASOURCE = "datasource";
private File tmpDir;
private SpecificSegmentsQuerySegmentWalker walker;
private TestServerInventoryView inventoryView;
private BrokerServerView serverView;
private DruidSchema schema;
private ExecutorService exec;
@Before
public void setUp() throws Exception
{
tmpDir = temporaryFolder.newFolder();
walker = new SpecificSegmentsQuerySegmentWalker(conglomerate);
inventoryView = new TestServerInventoryView();
serverView = newBrokerServerView(inventoryView);
inventoryView.init();
serverView.awaitInitialization();
exec = Execs.multiThreaded(4, "DruidSchemaConcurrencyTest-%d");
}
@After
public void tearDown() throws Exception
{
exec.shutdownNow();
walker.close();
}
/**
* This tests the contention between 3 components, DruidSchema, InventoryView, and BrokerServerView.
* It first triggers refreshing DruidSchema. To mimic some heavy work done with {@link DruidSchema#lock},
* {@link DruidSchema#buildDruidTable} is overriden to sleep before doing real work. While refreshing DruidSchema,
* more new segments are added to InventoryView, which triggers updates of BrokerServerView. Finally, while
* BrokerServerView is updated, {@link BrokerServerView#getTimeline} is continuously called to mimic user query
* processing. All these calls must return without heavy contention.
*/
@Test(timeout = 30000L)
public void testDruidSchemaRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline()
throws InterruptedException, ExecutionException, TimeoutException
{
schema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator()
)
{
@Override
DruidTable buildDruidTable(final String dataSource)
{
doInLock(() -> {
try {
// Mimic some heavy work done in lock in DruidSchema
Thread.sleep(5000);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
return super.buildDruidTable(dataSource);
}
};
int numExistingSegments = 100;
int numServers = 19;
CountDownLatch segmentLoadLatch = new CountDownLatch(numExistingSegments);
serverView.registerTimelineCallback(
Execs.directExecutor(),
new TimelineCallback()
{
@Override
public CallbackAction timelineInitialized()
{
return CallbackAction.CONTINUE;
}
@Override
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
segmentLoadLatch.countDown();
return CallbackAction.CONTINUE;
}
@Override
public CallbackAction segmentRemoved(DataSegment segment)
{
return CallbackAction.CONTINUE;
}
@Override
public CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment)
{
return CallbackAction.CONTINUE;
}
}
);
addSegmentsToCluster(0, numServers, numExistingSegments);
// Wait for all segments to be loaded in BrokerServerView
Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS));
// Trigger refresh of DruidSchema. This will internally run the heavy work mimicked by the overriden buildDruidTable
Future refreshFuture = exec.submit(() -> {
schema.refresh(
walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()),
Sets.newHashSet(DATASOURCE)
);
return null;
});
// Trigger updates of BrokerServerView. This should be done asynchronously.
addSegmentsToCluster(numExistingSegments, numServers, 50); // add completely new segments
addReplicasToCluster(1, numServers, 30); // add replicas of the first 30 segments.
// for the first 30 segments, we will still have replicas.
// for the other 20 segments, they will be completely removed from the cluster.
removeSegmentsFromCluster(numServers, 50);
Assert.assertFalse(refreshFuture.isDone());
for (int i = 0; i < 1000; i++) {
boolean hasTimeline = exec.submit(
() -> serverView.getTimeline(DataSourceAnalysis.forDataSource(new TableDataSource(DATASOURCE)))
.isPresent()
).get(100, TimeUnit.MILLISECONDS);
Assert.assertTrue(hasTimeline);
// We want to call getTimeline while BrokerServerView is being updated. Sleep might help with timing.
Thread.sleep(2);
}
refreshFuture.get(10, TimeUnit.SECONDS);
}
/**
* This tests the contention between 2 methods of DruidSchema, {@link DruidSchema#refresh} and
* {@link DruidSchema#getSegmentMetadataSnapshot()}. It first triggers refreshing DruidSchema.
* To mimic some heavy work done with {@link DruidSchema#lock}, {@link DruidSchema#buildDruidTable} is overriden
* to sleep before doing real work. While refreshing DruidSchema, getSegmentMetadataSnapshot() is continuously
* called to mimic reading the segments table of SystemSchema. All these calls must return without heavy contention.
*/
@Test(timeout = 30000L)
public void testDruidSchemaRefreshAndDruidSchemaGetSegmentMetadata()
throws InterruptedException, ExecutionException, TimeoutException
{
schema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
segmentManager,
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
PLANNER_CONFIG_DEFAULT,
new NoopEscalator()
)
{
@Override
DruidTable buildDruidTable(final String dataSource)
{
doInLock(() -> {
try {
// Mimic some heavy work done in lock in DruidSchema
Thread.sleep(5000);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
return super.buildDruidTable(dataSource);
}
};
int numExistingSegments = 100;
int numServers = 19;
CountDownLatch segmentLoadLatch = new CountDownLatch(numExistingSegments);
serverView.registerTimelineCallback(
Execs.directExecutor(),
new TimelineCallback()
{
@Override
public CallbackAction timelineInitialized()
{
return CallbackAction.CONTINUE;
}
@Override
public CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
segmentLoadLatch.countDown();
return CallbackAction.CONTINUE;
}
@Override
public CallbackAction segmentRemoved(DataSegment segment)
{
return CallbackAction.CONTINUE;
}
@Override
public CallbackAction serverSegmentRemoved(DruidServerMetadata server, DataSegment segment)
{
return CallbackAction.CONTINUE;
}
}
);
addSegmentsToCluster(0, numServers, numExistingSegments);
// Wait for all segments to be loaded in BrokerServerView
Assert.assertTrue(segmentLoadLatch.await(5, TimeUnit.SECONDS));
// Trigger refresh of DruidSchema. This will internally run the heavy work mimicked by the overriden buildDruidTable
Future refreshFuture = exec.submit(() -> {
schema.refresh(
walker.getSegments().stream().map(DataSegment::getId).collect(Collectors.toSet()),
Sets.newHashSet(DATASOURCE)
);
return null;
});
Assert.assertFalse(refreshFuture.isDone());
for (int i = 0; i < 1000; i++) {
Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = exec.submit(
() -> schema.getSegmentMetadataSnapshot()
).get(100, TimeUnit.MILLISECONDS);
Assert.assertFalse(segmentsMetadata.isEmpty());
// We want to call getTimeline while refreshing. Sleep might help with timing.
Thread.sleep(2);
}
refreshFuture.get(10, TimeUnit.SECONDS);
}
private void addSegmentsToCluster(int partitionIdStart, int numServers, int numSegments)
{
for (int i = 0; i < numSegments; i++) {
DataSegment segment = newSegment(i + partitionIdStart);
QueryableIndex index = newQueryableIndex(i + partitionIdStart);
walker.add(segment, index);
int serverIndex = i % numServers;
inventoryView.addServerSegment(newServer("server_" + serverIndex), segment);
}
}
private void addReplicasToCluster(int serverIndexOffFrom, int numServers, int numSegments)
{
for (int i = 0; i < numSegments; i++) {
DataSegment segment = newSegment(i);
int serverIndex = i % numServers + serverIndexOffFrom;
serverIndex = serverIndex < numServers ? serverIndex : serverIndex - numServers;
inventoryView.addServerSegment(newServer("server_" + serverIndex), segment);
}
}
private void removeSegmentsFromCluster(int numServers, int numSegments)
{
for (int i = 0; i < numSegments; i++) {
DataSegment segment = newSegment(i);
int serverIndex = i % numServers;
inventoryView.removeServerSegment(newServer("server_" + serverIndex), segment);
}
}
private static BrokerServerView newBrokerServerView(FilteredServerInventoryView baseView)
{
return new BrokerServerView(
EasyMock.createMock(QueryToolChestWarehouse.class),
EasyMock.createMock(QueryWatcher.class),
new DefaultObjectMapper(),
EasyMock.createMock(HttpClient.class),
baseView,
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
new NoopServiceEmitter(),
new BrokerSegmentWatcherConfig()
);
}
private static DruidServer newServer(String name)
{
return new DruidServer(
name,
"host:8083",
"host:8283",
1000L,
ServerType.HISTORICAL,
"tier",
0
);
}
private DataSegment newSegment(int partitionId)
{
return new DataSegment(
DATASOURCE,
Intervals.of("2012/2013"),
"version1",
null,
ImmutableList.of(),
ImmutableList.of(),
new NumberedShardSpec(partitionId, 0),
null,
1,
100L,
PruneSpecsHolder.DEFAULT
);
}
private QueryableIndex newQueryableIndex(int partitionId)
{
return IndexBuilder.create()
.tmpDir(new File(tmpDir, "" + partitionId))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1")
)
.withRollup(false)
.build()
)
.rows(ROWS1)
.buildMMappedIndex();
}
private static class TestServerInventoryView implements FilteredServerInventoryView
{
private final Map<String, DruidServer> serverMap = new HashMap<>();
private final Map<String, Set<DataSegment>> segmentsMap = new HashMap<>();
private final List<NonnullPair<SegmentCallback, Executor>> segmentCallbacks = new ArrayList<>();
private final List<NonnullPair<ServerRemovedCallback, Executor>> serverRemovedCallbacks = new ArrayList<>();
private void init()
{
segmentCallbacks.forEach(pair -> pair.rhs.execute(pair.lhs::segmentViewInitialized));
}
private void addServerSegment(DruidServer server, DataSegment segment)
{
serverMap.put(server.getName(), server);
segmentsMap.computeIfAbsent(server.getName(), k -> new HashSet<>()).add(segment);
segmentCallbacks.forEach(pair -> pair.rhs.execute(() -> pair.lhs.segmentAdded(server.getMetadata(), segment)));
}
private void removeServerSegment(DruidServer server, DataSegment segment)
{
segmentsMap.computeIfAbsent(server.getName(), k -> new HashSet<>()).remove(segment);
segmentCallbacks.forEach(pair -> pair.rhs.execute(() -> pair.lhs.segmentRemoved(server.getMetadata(), segment)));
}
private void removeServer(DruidServer server)
{
serverMap.remove(server.getName());
segmentsMap.remove(server.getName());
serverRemovedCallbacks.forEach(pair -> pair.rhs.execute(() -> pair.lhs.serverRemoved(server)));
}
@Override
public void registerSegmentCallback(
Executor exec,
SegmentCallback callback,
Predicate<Pair<DruidServerMetadata, DataSegment>> filter
)
{
segmentCallbacks.add(new NonnullPair<>(new FilteringSegmentCallback(callback, filter), exec));
}
@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
serverRemovedCallbacks.add(new NonnullPair<>(callback, exec));
}
@Nullable
@Override
public DruidServer getInventoryValue(String serverKey)
{
return serverMap.get(serverKey);
}
@Override
public Collection<DruidServer> getInventory()
{
return serverMap.values();
}
@Override
public boolean isStarted()
{
return true;
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
Set<DataSegment> segments = segmentsMap.get(serverKey);
return segments != null && segments.contains(segment);
}
}
}