blob: 0f70fb3d69831b433c7c7ab3a2042883b1d33c6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.frame.file;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
/**
* An {@link HttpResponseHandler} that streams data into a {@link ReadableByteChunksFrameChannel}.
*
* It is not required that the request actually fetches an entire frame file. The return object for this handler,
* {@link FrameFilePartialFetch}, allows callers to tell how much of the file was read in a particular request.
*
* This handler implements backpressure through {@link FrameFilePartialFetch#backpressureFuture()}. This allows callers
* to back off from issuing the next request, if appropriate. However: the handler does not implement backpressure
* through the {@link HttpResponseHandler.TrafficCop} mechanism. Therefore, it is important that each request retrieve
* a modest amount of data.
*
* The last fetch must be empty (zero content bytes) and must have the header {@link #HEADER_LAST_FETCH_NAME} set to
* {@link #HEADER_LAST_FETCH_VALUE}. Under these conditions, {@link FrameFilePartialFetch#isLastFetch()} returns true.
*/
public class FrameFileHttpResponseHandler implements HttpResponseHandler<FrameFilePartialFetch, FrameFilePartialFetch>
{
public static final String HEADER_LAST_FETCH_NAME = "X-Druid-Frame-Last-Fetch";
public static final String HEADER_LAST_FETCH_VALUE = "yes";
private final ReadableByteChunksFrameChannel channel;
public FrameFileHttpResponseHandler(final ReadableByteChunksFrameChannel channel)
{
this.channel = Preconditions.checkNotNull(channel, "channel");
}
@Override
public ClientResponse<FrameFilePartialFetch> handleResponse(final HttpResponse response, final TrafficCop trafficCop)
{
if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
// Note: if the error body is chunked, we will discard all future chunks due to setting exceptionCaught here.
// This is OK because we don't need the body; just the HTTP status code.
final ClientResponse<FrameFilePartialFetch> clientResponse =
ClientResponse.unfinished(new FrameFilePartialFetch(false));
exceptionCaught(clientResponse, new ISE("Server for [%s] returned [%s]", channel.getId(), response.getStatus()));
return clientResponse;
} else {
final boolean lastFetchHeaderSet = HEADER_LAST_FETCH_VALUE.equals(response.headers().get(HEADER_LAST_FETCH_NAME));
final ClientResponse<FrameFilePartialFetch> clientResponse =
ClientResponse.unfinished(new FrameFilePartialFetch(lastFetchHeaderSet));
return response(clientResponse, response.getContent());
}
}
@Override
public ClientResponse<FrameFilePartialFetch> handleChunk(
final ClientResponse<FrameFilePartialFetch> clientResponse,
final HttpChunk chunk,
final long chunkNum
)
{
return response(clientResponse, chunk.getContent());
}
@Override
public ClientResponse<FrameFilePartialFetch> done(final ClientResponse<FrameFilePartialFetch> clientResponse)
{
return ClientResponse.finished(clientResponse.getObj());
}
@Override
public void exceptionCaught(
final ClientResponse<FrameFilePartialFetch> clientResponse,
final Throwable e
)
{
clientResponse.getObj().exceptionCaught(e);
}
private ClientResponse<FrameFilePartialFetch> response(
final ClientResponse<FrameFilePartialFetch> clientResponse,
final ChannelBuffer content
)
{
final FrameFilePartialFetch clientResponseObj = clientResponse.getObj();
if (clientResponseObj.isExceptionCaught()) {
// If there was an exception, exit early without updating "channel". Important because "handleChunk" can be
// called after "handleException" in two cases: it can be called after an error "response", if the error body
// is chunked; and it can be called when "handleChunk" is called after "exceptionCaught". In neither case do
// we want to add that extra chunk to the channel.
return ClientResponse.finished(clientResponseObj);
}
final byte[] chunk = new byte[content.readableBytes()];
content.getBytes(content.readerIndex(), chunk);
try {
final ListenableFuture<?> backpressureFuture = channel.addChunk(chunk);
if (backpressureFuture != null) {
clientResponseObj.setBackpressureFuture(backpressureFuture);
}
clientResponseObj.addBytesRead(chunk.length);
}
catch (Exception e) {
clientResponseObj.exceptionCaught(e);
}
return ClientResponse.unfinished(clientResponseObj);
}
}