blob: 538cc2f526f3935cc07afce630abef07783b56b4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.client;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DataSource;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
/**
* ServerView of coordinator for the state of segments being loaded in the cluster.
*/
@ManageLifecycle
public class CoordinatorServerView implements InventoryView
{
private static final Logger log = new Logger(CoordinatorServerView.class);
private final Object lock = new Object();
private final Map<SegmentId, SegmentLoadInfo> segmentLoadInfos;
private final Map<String, VersionedIntervalTimeline<String, SegmentLoadInfo>> timelines;
private final ServerInventoryView baseView;
private final CoordinatorSegmentWatcherConfig segmentWatcherConfig;
private final CountDownLatch initialized = new CountDownLatch(1);
@Inject
public CoordinatorServerView(
ServerInventoryView baseView,
CoordinatorSegmentWatcherConfig segmentWatcherConfig
)
{
this.baseView = baseView;
this.segmentWatcherConfig = segmentWatcherConfig;
this.segmentLoadInfos = new HashMap<>();
this.timelines = new HashMap<>();
ExecutorService exec = Execs.singleThreaded("CoordinatorServerView-%s");
baseView.registerSegmentCallback(
exec,
new ServerView.SegmentCallback()
{
@Override
public ServerView.CallbackAction segmentAdded(DruidServerMetadata server, DataSegment segment)
{
serverAddedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentRemoved(final DruidServerMetadata server, DataSegment segment)
{
serverRemovedSegment(server, segment);
return ServerView.CallbackAction.CONTINUE;
}
@Override
public ServerView.CallbackAction segmentViewInitialized()
{
initialized.countDown();
return ServerView.CallbackAction.CONTINUE;
}
}
);
baseView.registerServerRemovedCallback(
exec,
new ServerView.ServerRemovedCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
removeServer(server);
return ServerView.CallbackAction.CONTINUE;
}
}
);
}
@LifecycleStart
public void start() throws InterruptedException
{
if (segmentWatcherConfig.isAwaitInitializationOnStart()) {
final long startMillis = System.currentTimeMillis();
log.info("%s waiting for initialization.", getClass().getSimpleName());
initialized.await();
log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), System.currentTimeMillis() - startMillis);
}
}
private void removeServer(DruidServer server)
{
for (DataSegment segment : server.iterateAllSegments()) {
serverRemovedSegment(server.getMetadata(), segment);
}
}
private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
{
SegmentId segmentId = segment.getId();
synchronized (lock) {
log.debug("Adding segment[%s] for server[%s]", segment, server);
SegmentLoadInfo segmentLoadInfo = segmentLoadInfos.get(segmentId);
if (segmentLoadInfo == null) {
// servers escape the scope of this object so use ConcurrentSet
segmentLoadInfo = new SegmentLoadInfo(segment);
VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = timelines.get(segment.getDataSource());
if (timeline == null) {
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
timelines.put(segment.getDataSource(), timeline);
}
timeline.add(
segment.getInterval(),
segment.getVersion(),
segment.getShardSpec().createChunk(segmentLoadInfo)
);
segmentLoadInfos.put(segmentId, segmentLoadInfo);
}
segmentLoadInfo.addServer(server);
}
}
private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
{
SegmentId segmentId = segment.getId();
synchronized (lock) {
log.debug("Removing segment[%s] from server[%s].", segmentId, server);
final SegmentLoadInfo segmentLoadInfo = segmentLoadInfos.get(segmentId);
if (segmentLoadInfo == null) {
log.warn("Told to remove non-existant segment[%s]", segmentId);
return;
}
segmentLoadInfo.removeServer(server);
if (segmentLoadInfo.isEmpty()) {
VersionedIntervalTimeline<String, SegmentLoadInfo> timeline = timelines.get(segment.getDataSource());
segmentLoadInfos.remove(segmentId);
final PartitionChunk<SegmentLoadInfo> removedPartition = timeline.remove(
segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(
new SegmentLoadInfo(
segment
)
)
);
if (removedPartition == null) {
log.warn(
"Asked to remove timeline entry[interval: %s, version: %s] that doesn't exist",
segment.getInterval(),
segment.getVersion()
);
}
}
}
}
public VersionedIntervalTimeline<String, SegmentLoadInfo> getTimeline(DataSource dataSource)
{
String table = Iterables.getOnlyElement(dataSource.getTableNames());
synchronized (lock) {
return timelines.get(table);
}
}
public Map<SegmentId, SegmentLoadInfo> getSegmentLoadInfos()
{
return segmentLoadInfos;
}
@Override
public DruidServer getInventoryValue(String serverKey)
{
return baseView.getInventoryValue(serverKey);
}
@Override
public Collection<DruidServer> getInventory()
{
return baseView.getInventory();
}
@Override
public boolean isStarted()
{
return baseView.isStarted();
}
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
return baseView.isSegmentLoadedByServer(serverKey, segment);
}
}