| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server.coordinator; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.ObjectWriter; |
| import com.google.common.collect.Iterators; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.RE; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.http.client.HttpClient; |
| import org.apache.druid.java.util.http.client.Request; |
| import org.apache.druid.server.coordination.DataSegmentChangeCallback; |
| import org.apache.druid.server.coordination.DataSegmentChangeHandler; |
| import org.apache.druid.server.coordination.DataSegmentChangeRequest; |
| import org.apache.druid.server.coordination.SegmentChangeRequestDrop; |
| import org.apache.druid.server.coordination.SegmentChangeRequestLoad; |
| import org.apache.druid.server.coordination.SegmentLoadDropHandler; |
| import org.apache.druid.timeline.DataSegment; |
| import org.jboss.netty.handler.codec.http.HttpHeaders; |
| import org.jboss.netty.handler.codec.http.HttpMethod; |
| import org.joda.time.Duration; |
| |
| import javax.servlet.http.HttpServletResponse; |
| import javax.ws.rs.core.MediaType; |
| import java.io.InputStream; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| /** |
| */ |
| public class HttpLoadQueuePeon extends LoadQueuePeon |
| { |
| public static final TypeReference<List<DataSegmentChangeRequest>> REQUEST_ENTITY_TYPE_REF = |
| new TypeReference<List<DataSegmentChangeRequest>>() |
| { |
| }; |
| |
| public static final TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>> RESPONSE_ENTITY_TYPE_REF = |
| new TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>() |
| { |
| }; |
| |
| private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class); |
| |
| private final AtomicLong queuedSize = new AtomicLong(0); |
| private final AtomicInteger failedAssignCount = new AtomicInteger(0); |
| |
| private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>( |
| DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST |
| ); |
| private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>( |
| DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST |
| ); |
| private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>( |
| DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST |
| ); |
| |
| private final ScheduledExecutorService processingExecutor; |
| |
| private volatile boolean stopped = false; |
| |
| private final Object lock = new Object(); |
| |
| private final DruidCoordinatorConfig config; |
| |
| private final ObjectMapper jsonMapper; |
| private final HttpClient httpClient; |
| private final URL changeRequestURL; |
| private final String serverId; |
| |
| private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false); |
| private final ExecutorService callBackExecutor; |
| |
| private final ObjectWriter requestBodyWriter; |
| |
| public HttpLoadQueuePeon( |
| String baseUrl, |
| ObjectMapper jsonMapper, |
| HttpClient httpClient, |
| DruidCoordinatorConfig config, |
| ScheduledExecutorService processingExecutor, |
| ExecutorService callBackExecutor |
| ) |
| { |
| this.jsonMapper = jsonMapper; |
| this.requestBodyWriter = jsonMapper.writerWithType(REQUEST_ENTITY_TYPE_REF); |
| this.httpClient = httpClient; |
| this.config = config; |
| this.processingExecutor = processingExecutor; |
| this.callBackExecutor = callBackExecutor; |
| |
| this.serverId = baseUrl; |
| try { |
| this.changeRequestURL = new URL( |
| new URL(baseUrl), |
| StringUtils.nonStrictFormat( |
| "druid-internal/v1/segments/changeRequests?timeout=%d", |
| config.getHttpLoadQueuePeonHostTimeout().getMillis() |
| ) |
| ); |
| } |
| catch (MalformedURLException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| private void doSegmentManagement() |
| { |
| if (stopped || !mainLoopInProgress.compareAndSet(false, true)) { |
| log.trace("[%s]Ignoring tick. Either in-progress already or stopped.", serverId); |
| return; |
| } |
| |
| final int batchSize = config.getHttpLoadQueuePeonBatchSize(); |
| |
| final List<DataSegmentChangeRequest> newRequests = new ArrayList<>(batchSize); |
| |
| synchronized (lock) { |
| Iterator<Map.Entry<DataSegment, SegmentHolder>> iter = Iterators.concat( |
| segmentsToDrop.entrySet().iterator(), |
| segmentsToLoad.entrySet().iterator() |
| ); |
| |
| while (newRequests.size() < batchSize && iter.hasNext()) { |
| Map.Entry<DataSegment, SegmentHolder> entry = iter.next(); |
| if (entry.getValue().hasTimedOut()) { |
| entry.getValue().requestFailed("timed out"); |
| iter.remove(); |
| } else { |
| newRequests.add(entry.getValue().getChangeRequest()); |
| } |
| } |
| } |
| |
| if (newRequests.size() == 0) { |
| log.trace( |
| "[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].", |
| serverId, |
| segmentsToLoad.size(), |
| segmentsToDrop.size(), |
| config.getHttpLoadQueuePeonBatchSize() |
| ); |
| mainLoopInProgress.set(false); |
| return; |
| } |
| |
| try { |
| log.trace("Sending [%d] load/drop requests to Server[%s].", newRequests.size(), serverId); |
| BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); |
| ListenableFuture<InputStream> future = httpClient.go( |
| new Request(HttpMethod.POST, changeRequestURL) |
| .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON) |
| .addHeader(HttpHeaders.Names.CONTENT_TYPE, MediaType.APPLICATION_JSON) |
| .setContent(requestBodyWriter.writeValueAsBytes(newRequests)), |
| responseHandler, |
| new Duration(config.getHttpLoadQueuePeonHostTimeout().getMillis() + 5000) |
| ); |
| |
| Futures.addCallback( |
| future, |
| new FutureCallback<InputStream>() |
| { |
| @Override |
| public void onSuccess(InputStream result) |
| { |
| boolean scheduleNextRunImmediately = true; |
| try { |
| if (responseHandler.getStatus() == HttpServletResponse.SC_NO_CONTENT) { |
| log.trace("Received NO CONTENT reseponse from [%s]", serverId); |
| } else if (HttpServletResponse.SC_OK == responseHandler.getStatus()) { |
| try { |
| List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses = |
| jsonMapper.readValue(result, RESPONSE_ENTITY_TYPE_REF); |
| log.trace("Server[%s] returned status response [%s].", serverId, statuses); |
| synchronized (lock) { |
| if (stopped) { |
| log.trace("Ignoring response from Server[%s]. We are already stopped.", serverId); |
| scheduleNextRunImmediately = false; |
| return; |
| } |
| |
| for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e : statuses) { |
| switch (e.getStatus().getState()) { |
| case SUCCESS: |
| case FAILED: |
| handleResponseStatus(e.getRequest(), e.getStatus()); |
| break; |
| case PENDING: |
| log.trace("Request[%s] is still pending on server[%s].", e.getRequest(), serverId); |
| break; |
| default: |
| scheduleNextRunImmediately = false; |
| log.error("Server[%s] returned unknown state in status[%s].", serverId, e.getStatus()); |
| } |
| } |
| } |
| } |
| catch (Exception ex) { |
| scheduleNextRunImmediately = false; |
| logRequestFailure(ex); |
| } |
| } else { |
| scheduleNextRunImmediately = false; |
| logRequestFailure(new RE("Unexpected Response Status.")); |
| } |
| } |
| finally { |
| mainLoopInProgress.set(false); |
| |
| if (scheduleNextRunImmediately) { |
| processingExecutor.execute(HttpLoadQueuePeon.this::doSegmentManagement); |
| } |
| } |
| } |
| |
| @Override |
| public void onFailure(Throwable t) |
| { |
| try { |
| logRequestFailure(t); |
| } |
| finally { |
| mainLoopInProgress.set(false); |
| } |
| } |
| |
| private void logRequestFailure(Throwable t) |
| { |
| log.error( |
| t, |
| "Request[%s] Failed with status[%s]. Reason[%s].", |
| changeRequestURL, |
| responseHandler.getStatus(), |
| responseHandler.getDescription() |
| ); |
| } |
| }, |
| processingExecutor |
| ); |
| } |
| catch (Throwable th) { |
| log.error(th, "Error sending load/drop request to [%s].", serverId); |
| mainLoopInProgress.set(false); |
| } |
| } |
| |
| private void handleResponseStatus(DataSegmentChangeRequest changeRequest, SegmentLoadDropHandler.Status status) |
| { |
| changeRequest.go( |
| new DataSegmentChangeHandler() |
| { |
| @Override |
| public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) |
| { |
| updateSuccessOrFailureInHolder(segmentsToLoad.remove(segment), status); |
| } |
| |
| @Override |
| public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) |
| { |
| updateSuccessOrFailureInHolder(segmentsToDrop.remove(segment), status); |
| } |
| |
| private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentLoadDropHandler.Status status) |
| { |
| if (holder == null) { |
| return; |
| } |
| |
| if (status.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) { |
| holder.requestFailed(status.getFailureCause()); |
| } else { |
| holder.requestSucceeded(); |
| } |
| } |
| }, null |
| ); |
| } |
| |
| @Override |
| public void start() |
| { |
| synchronized (lock) { |
| if (stopped) { |
| throw new ISE("Can't start."); |
| } |
| |
| ScheduledExecutors.scheduleAtFixedRate( |
| processingExecutor, |
| config.getHttpLoadQueuePeonRepeatDelay(), |
| () -> { |
| if (!stopped) { |
| doSegmentManagement(); |
| } |
| |
| if (stopped) { |
| return ScheduledExecutors.Signal.STOP; |
| } else { |
| return ScheduledExecutors.Signal.REPEAT; |
| } |
| } |
| ); |
| } |
| } |
| |
| @Override |
| public void stop() |
| { |
| synchronized (lock) { |
| if (stopped) { |
| return; |
| } |
| |
| stopped = true; |
| |
| for (SegmentHolder holder : segmentsToDrop.values()) { |
| holder.requestFailed("Stopping load queue peon."); |
| } |
| |
| for (SegmentHolder holder : segmentsToLoad.values()) { |
| holder.requestFailed("Stopping load queue peon."); |
| } |
| |
| segmentsToDrop.clear(); |
| segmentsToLoad.clear(); |
| queuedSize.set(0L); |
| failedAssignCount.set(0); |
| } |
| } |
| |
| @Override |
| public void loadSegment(DataSegment segment, LoadPeonCallback callback) |
| { |
| synchronized (lock) { |
| if (stopped) { |
| log.warn( |
| "Server[%s] cannot load segment[%s] because load queue peon is stopped.", |
| serverId, |
| segment.getId() |
| ); |
| callback.execute(); |
| return; |
| } |
| |
| SegmentHolder holder = segmentsToLoad.get(segment); |
| |
| if (holder == null) { |
| log.trace("Server[%s] to load segment[%s] queued.", serverId, segment.getId()); |
| segmentsToLoad.put(segment, new LoadSegmentHolder(segment, callback)); |
| processingExecutor.execute(this::doSegmentManagement); |
| } else { |
| holder.addCallback(callback); |
| } |
| } |
| } |
| |
| @Override |
| public void dropSegment(DataSegment segment, LoadPeonCallback callback) |
| { |
| synchronized (lock) { |
| if (stopped) { |
| log.warn( |
| "Server[%s] cannot drop segment[%s] because load queue peon is stopped.", |
| serverId, |
| segment.getId() |
| ); |
| callback.execute(); |
| return; |
| } |
| SegmentHolder holder = segmentsToDrop.get(segment); |
| |
| if (holder == null) { |
| log.trace("Server[%s] to drop segment[%s] queued.", serverId, segment.getId()); |
| segmentsToDrop.put(segment, new DropSegmentHolder(segment, callback)); |
| processingExecutor.execute(this::doSegmentManagement); |
| } else { |
| holder.addCallback(callback); |
| } |
| } |
| } |
| |
| @Override |
| public Set<DataSegment> getSegmentsToLoad() |
| { |
| return Collections.unmodifiableSet(segmentsToLoad.keySet()); |
| } |
| |
| @Override |
| public Set<DataSegment> getSegmentsToDrop() |
| { |
| return Collections.unmodifiableSet(segmentsToDrop.keySet()); |
| } |
| |
| @Override |
| public Set<DataSegment> getTimedOutSegments() |
| { |
| return Collections.emptySet(); |
| } |
| |
| @Override |
| public long getLoadQueueSize() |
| { |
| return queuedSize.get(); |
| } |
| |
| @Override |
| public int getAndResetFailedAssignCount() |
| { |
| return failedAssignCount.getAndSet(0); |
| } |
| |
| @Override |
| public void markSegmentToDrop(DataSegment dataSegment) |
| { |
| segmentsMarkedToDrop.add(dataSegment); |
| } |
| |
| @Override |
| public void unmarkSegmentToDrop(DataSegment dataSegment) |
| { |
| segmentsMarkedToDrop.remove(dataSegment); |
| } |
| |
| @Override |
| public int getNumberOfSegmentsInQueue() |
| { |
| return segmentsToLoad.size(); |
| } |
| |
| @Override |
| public Set<DataSegment> getSegmentsMarkedToDrop() |
| { |
| return Collections.unmodifiableSet(segmentsMarkedToDrop); |
| } |
| |
| private abstract class SegmentHolder |
| { |
| private final DataSegment segment; |
| private final DataSegmentChangeRequest changeRequest; |
| private final List<LoadPeonCallback> callbacks = new ArrayList<>(); |
| |
| // Time when this request was sent to target server the first time. |
| private volatile long scheduleTime = -1; |
| |
| private SegmentHolder( |
| DataSegment segment, |
| DataSegmentChangeRequest changeRequest, |
| LoadPeonCallback callback |
| ) |
| { |
| this.segment = segment; |
| this.changeRequest = changeRequest; |
| |
| if (callback != null) { |
| this.callbacks.add(callback); |
| } |
| } |
| |
| public void addCallback(LoadPeonCallback newCallback) |
| { |
| synchronized (callbacks) { |
| if (newCallback != null) { |
| callbacks.add(newCallback); |
| } |
| } |
| } |
| |
| public DataSegment getSegment() |
| { |
| return segment; |
| } |
| |
| public DataSegmentChangeRequest getChangeRequest() |
| { |
| return changeRequest; |
| } |
| |
| public boolean hasTimedOut() |
| { |
| if (scheduleTime < 0) { |
| scheduleTime = System.currentTimeMillis(); |
| return false; |
| } else if (System.currentTimeMillis() - scheduleTime > config.getLoadTimeoutDelay().getMillis()) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| public void requestSucceeded() |
| { |
| log.trace( |
| "Server[%s] Successfully processed segment[%s] request[%s].", |
| serverId, |
| segment.getId(), |
| changeRequest.getClass().getSimpleName() |
| ); |
| |
| callBackExecutor.execute(() -> { |
| for (LoadPeonCallback callback : callbacks) { |
| if (callback != null) { |
| callback.execute(); |
| } |
| } |
| }); |
| } |
| |
| public void requestFailed(String failureCause) |
| { |
| log.error( |
| "Server[%s] Failed segment[%s] request[%s] with cause [%s].", |
| serverId, |
| segment.getId(), |
| changeRequest.getClass().getSimpleName(), |
| failureCause |
| ); |
| |
| failedAssignCount.getAndIncrement(); |
| |
| callBackExecutor.execute(() -> { |
| for (LoadPeonCallback callback : callbacks) { |
| if (callback != null) { |
| callback.execute(); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return changeRequest.toString(); |
| } |
| } |
| |
| private class LoadSegmentHolder extends SegmentHolder |
| { |
| public LoadSegmentHolder(DataSegment segment, LoadPeonCallback callback) |
| { |
| super(segment, new SegmentChangeRequestLoad(segment), callback); |
| queuedSize.addAndGet(segment.getSize()); |
| } |
| |
| @Override |
| public void requestSucceeded() |
| { |
| queuedSize.addAndGet(-getSegment().getSize()); |
| super.requestSucceeded(); |
| } |
| |
| @Override |
| public void requestFailed(String failureCause) |
| { |
| queuedSize.addAndGet(-getSegment().getSize()); |
| super.requestFailed(failureCause); |
| } |
| } |
| |
| private class DropSegmentHolder extends SegmentHolder |
| { |
| public DropSegmentHolder(DataSegment segment, LoadPeonCallback callback) |
| { |
| super(segment, new SegmentChangeRequestDrop(segment), callback); |
| } |
| } |
| |
| } |