blob: 35953e26b0cb587556af1c370057f5ce27c2eeba [file] [log] [blame]
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.master;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentChangeRequest;
import com.metamx.druid.coordination.SegmentChangeRequestDrop;
import com.metamx.druid.coordination.SegmentChangeRequestLoad;
import com.metamx.phonebook.PhoneBook;
import com.metamx.phonebook.PhoneBookPeon;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class LoadQueuePeon implements PhoneBookPeon<Map>
{
private static final Logger log = new Logger(LoadQueuePeon.class);
private static final int DROP = 0;
private static final int LOAD = 1;
private final Object lock = new Object();
private final PhoneBook yp;
private final String basePath;
private final ScheduledExecutorService zkWritingExecutor;
private final AtomicLong queuedSize = new AtomicLong(0);
private static Comparator<SegmentHolder> segmentHolderComparator = new Comparator<SegmentHolder>()
{
private Comparator<DataSegment> comparator = Comparators.inverse(DataSegment.bucketMonthComparator());
@Override
public int compare(SegmentHolder lhs, SegmentHolder rhs)
{
return comparator.compare(lhs.getSegment(), rhs.getSegment());
}
};
private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>(
segmentHolderComparator
);
private final ConcurrentSkipListSet<SegmentHolder> segmentsToDrop = new ConcurrentSkipListSet<SegmentHolder>(
segmentHolderComparator
);
private volatile SegmentHolder currentlyLoading = null;
LoadQueuePeon(
PhoneBook yp,
String basePath,
ScheduledExecutorService zkWritingExecutor
)
{
this.yp = yp;
this.basePath = basePath;
this.zkWritingExecutor = zkWritingExecutor;
}
@Override
public Class<Map> getObjectClazz()
{
return Map.class;
}
@Override
public void newEntry(String name, Map properties)
{
synchronized (lock) {
if (currentlyLoading == null) {
log.warn(
"Server[%s] a new entry[%s] appeared, even though nothing is currently loading[%s]",
basePath,
name,
currentlyLoading
);
} else {
if (!name.equals(currentlyLoading.getSegmentIdentifier())) {
log.warn(
"Server[%s] a new entry[%s] appeared that is not the currently loading entry[%s]",
basePath,
name,
currentlyLoading
);
} else {
log.info("Server[%s]'s currently loading entry[%s] appeared.", basePath, name);
}
}
}
}
@Override
public void entryRemoved(String name)
{
synchronized (lock) {
if (currentlyLoading == null) {
log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, name);
return;
}
if (!name.equals(currentlyLoading.getSegmentIdentifier())) {
log.warn(
"Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
basePath,
name,
currentlyLoading
);
return;
}
actionCompleted();
log.info("Server[%s] done processing [%s]", basePath, name);
}
doNext();
}
public Set<DataSegment> getSegmentsToLoad()
{
return new ConcurrentSkipListSet<DataSegment>(
Collections2.transform(
segmentsToLoad,
new Function<SegmentHolder, DataSegment>()
{
@Override
public DataSegment apply(SegmentHolder input)
{
return input.getSegment();
}
}
)
);
}
public Set<DataSegment> getSegmentsToDrop()
{
return new ConcurrentSkipListSet<DataSegment>(
Collections2.transform(
segmentsToDrop,
new Function<SegmentHolder, DataSegment>()
{
@Override
public DataSegment apply(SegmentHolder input)
{
return input.getSegment();
}
}
)
);
}
public long getLoadQueueSize()
{
return queuedSize.get();
}
public void loadSegment(
DataSegment segment,
LoadPeonCallback callback
)
{
synchronized (lock) {
if ((currentlyLoading != null) &&
currentlyLoading.getSegmentIdentifier().equals(segment.getIdentifier())) {
if (callback != null) {
currentlyLoading.addCallback(callback);
}
return;
}
}
SegmentHolder holder = new SegmentHolder(segment, LOAD, Arrays.asList(callback));
synchronized (lock) {
if (segmentsToLoad.contains(holder)) {
if ((callback != null)) {
currentlyLoading.addCallback(callback);
}
return;
}
}
log.info("Asking server peon[%s] to load segment[%s]", basePath, segment);
queuedSize.addAndGet(segment.getSize());
segmentsToLoad.add(holder);
doNext();
}
public void dropSegment(
DataSegment segment,
LoadPeonCallback callback
)
{
synchronized (lock) {
if ((currentlyLoading != null) &&
currentlyLoading.getSegmentIdentifier().equals(segment.getIdentifier())) {
if (callback != null) {
currentlyLoading.addCallback(callback);
}
return;
}
}
SegmentHolder holder = new SegmentHolder(segment, DROP, Arrays.asList(callback));
synchronized (lock) {
if (segmentsToDrop.contains(holder)) {
if (callback != null) {
currentlyLoading.addCallback(callback);
}
return;
}
}
log.info("Asking server peon[%s] to drop segment[%s]", basePath, segment);
segmentsToDrop.add(holder);
doNext();
}
private void doNext()
{
synchronized (lock) {
if (currentlyLoading == null) {
if (!segmentsToDrop.isEmpty()) {
currentlyLoading = segmentsToDrop.first();
log.info("Server[%s] dropping [%s]", basePath, currentlyLoading);
} else if (!segmentsToLoad.isEmpty()) {
currentlyLoading = segmentsToLoad.first();
log.info("Server[%s] loading [%s]", basePath, currentlyLoading);
} else {
return;
}
submitExecutable();
} else {
log.info(
"Server[%s] skipping doNext() because something is currently loading[%s].", basePath, currentlyLoading
);
}
}
}
private void submitExecutable()
{
final SegmentHolder currentlyLoadingRef = currentlyLoading;
final AtomicBoolean postedEphemeral = new AtomicBoolean(false);
zkWritingExecutor.execute(
new Runnable()
{
@Override
public void run()
{
synchronized (lock) {
try {
if (currentlyLoading == null) {
log.error("Crazy race condition! server[%s]", basePath);
postedEphemeral.set(true);
actionCompleted();
doNext();
return;
}
log.info("Server[%s] adding segment[%s]", basePath, currentlyLoading.getSegmentIdentifier());
yp.postEphemeral(
basePath,
currentlyLoading.getSegmentIdentifier(),
currentlyLoading.getChangeRequest()
);
postedEphemeral.set(true);
}
catch (Throwable e) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading);
// Act like it was completed so that the master gives it to someone else
postedEphemeral.set(true);
actionCompleted();
doNext();
}
}
}
}
);
zkWritingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
synchronized (lock) {
String path = yp.combineParts(Arrays.asList(basePath, currentlyLoadingRef.getSegmentIdentifier()));
if (!postedEphemeral.get()) {
log.info("Ephemeral hasn't been posted yet for [%s], rescheduling.", path);
zkWritingExecutor.schedule(this, 60, TimeUnit.SECONDS);
}
if (currentlyLoadingRef == currentlyLoading) {
if (yp.lookup(path, Object.class) == null) {
log.info("Looks like [%s] was created and deleted without the watchers finding out.", path);
entryRemoved(currentlyLoadingRef.getSegmentIdentifier());
} else {
log.info("Path[%s] still out on ZK, rescheduling.", path);
zkWritingExecutor.schedule(this, 60, TimeUnit.SECONDS);
}
}
}
}
},
60,
TimeUnit.SECONDS
);
}
private void actionCompleted()
{
if (currentlyLoading != null) {
switch (currentlyLoading.getType()) {
case LOAD:
segmentsToLoad.remove(currentlyLoading);
queuedSize.addAndGet(-currentlyLoading.getSegmentSize());
break;
case DROP:
segmentsToDrop.remove(currentlyLoading);
break;
default:
throw new UnsupportedOperationException();
}
currentlyLoading.executeCallbacks();
currentlyLoading = null;
}
}
public void stop()
{
synchronized (lock) {
if (currentlyLoading != null) {
currentlyLoading.executeCallbacks();
currentlyLoading = null;
}
if (!segmentsToDrop.isEmpty()) {
for (SegmentHolder holder : segmentsToDrop) {
holder.executeCallbacks();
}
}
segmentsToDrop.clear();
if (!segmentsToLoad.isEmpty()) {
for (SegmentHolder holder : segmentsToLoad) {
holder.executeCallbacks();
}
}
segmentsToLoad.clear();
queuedSize.set(0L);
}
}
private class SegmentHolder
{
private final DataSegment segment;
private final DataSegmentChangeRequest changeRequest;
private final int type;
private final List<LoadPeonCallback> callbacks = Lists.newArrayList();
private SegmentHolder(
DataSegment segment,
int type,
Collection<LoadPeonCallback> callbacks
)
{
this.segment = segment;
this.type = type;
this.changeRequest = (type == LOAD)
? new SegmentChangeRequestLoad(segment)
: new SegmentChangeRequestDrop(segment);
this.callbacks.addAll(callbacks);
}
public DataSegment getSegment()
{
return segment;
}
public int getType()
{
return type;
}
public String getSegmentIdentifier()
{
return segment.getIdentifier();
}
public long getSegmentSize()
{
return segment.getSize();
}
public void addCallbacks(Collection<LoadPeonCallback> newCallbacks)
{
synchronized (callbacks) {
callbacks.addAll(newCallbacks);
}
}
public void addCallback(LoadPeonCallback newCallback)
{
synchronized (callbacks) {
callbacks.add(newCallback);
}
}
public void executeCallbacks()
{
synchronized (callbacks) {
for (LoadPeonCallback callback : callbacks) {
if (callback != null) {
callback.execute();
}
}
callbacks.clear();
}
}
public DataSegmentChangeRequest getChangeRequest()
{
return changeRequest;
}
@Override
public String toString()
{
return changeRequest.toString();
}
}
}