| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.s4.deploy; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.InetSocketAddress; |
| import java.net.URI; |
| import java.util.concurrent.Executors; |
| |
| import org.jboss.netty.bootstrap.ClientBootstrap; |
| import org.jboss.netty.buffer.ChannelBuffer; |
| import org.jboss.netty.buffer.ChannelBufferInputStream; |
| import org.jboss.netty.channel.Channel; |
| import org.jboss.netty.channel.ChannelFuture; |
| import org.jboss.netty.channel.ChannelHandlerContext; |
| import org.jboss.netty.channel.ChannelPipeline; |
| import org.jboss.netty.channel.ChannelPipelineFactory; |
| import org.jboss.netty.channel.Channels; |
| import org.jboss.netty.channel.MessageEvent; |
| import org.jboss.netty.channel.SimpleChannelUpstreamHandler; |
| import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; |
| import org.jboss.netty.handler.codec.http.DefaultHttpRequest; |
| import org.jboss.netty.handler.codec.http.HttpChunk; |
| import org.jboss.netty.handler.codec.http.HttpClientCodec; |
| import org.jboss.netty.handler.codec.http.HttpContentDecompressor; |
| import org.jboss.netty.handler.codec.http.HttpHeaders; |
| import org.jboss.netty.handler.codec.http.HttpMethod; |
| import org.jboss.netty.handler.codec.http.HttpRequest; |
| import org.jboss.netty.handler.codec.http.HttpResponse; |
| import org.jboss.netty.handler.codec.http.HttpVersion; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.io.ByteStreams; |
| |
| /** |
| * <p> |
| * Fetches S4R archive through HTTP. |
| * </p> |
| * <p> |
| * The underlying implementation uses Netty, and borrows code from the Netty snoop example.</br> |
| * |
| * @see <a href="http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/package-summary.html">Netty |
| * snoop example</a> |
| * |
| * </p> |
| */ |
| public class HttpS4RFetcher implements S4RFetcher { |
| |
| private static Logger logger = LoggerFactory.getLogger(HttpS4RFetcher.class); |
| |
| @Override |
| public InputStream fetch(URI uri) throws DeploymentFailedException { |
| logger.debug("Fetching file through http: {}", uri.toString()); |
| |
| String host = uri.getHost(); |
| int port = uri.getPort(); |
| if (port == -1) { |
| if (uri.getScheme().equalsIgnoreCase("http")) { |
| port = 80; |
| } else if (uri.getScheme().equalsIgnoreCase("https")) { |
| port = 443; |
| } |
| } |
| |
| ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory( |
| Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); |
| File tmpFile; |
| try { |
| tmpFile = File.createTempFile("http", "download"); |
| } catch (IOException e) { |
| throw new DeploymentFailedException("Cannot create temporary file for fetching s4r data from http server", |
| e); |
| } |
| clientBootstrap.setPipelineFactory(new HttpClientPipelineFactory(tmpFile)); |
| ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(host, port)); |
| // TODO timeout? |
| Channel channel = channelFuture.awaitUninterruptibly().getChannel(); |
| if (!channelFuture.isSuccess()) { |
| clientBootstrap.releaseExternalResources(); |
| throw new DeploymentFailedException("Cannot connect to http uri [" + uri.toString() + "]", |
| channelFuture.getCause()); |
| } |
| |
| HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getPath()); |
| request.setHeader(HttpHeaders.Names.HOST, host); |
| request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); |
| request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); |
| |
| channel.write(request); |
| |
| channel.getCloseFuture().awaitUninterruptibly(); |
| |
| clientBootstrap.releaseExternalResources(); |
| |
| logger.debug("Finished downloading s4r file through http {}", uri.toString()); |
| try { |
| return new FileInputStream(tmpFile); |
| } catch (FileNotFoundException e) { |
| throw new DeploymentFailedException("Cannot get input stream from temporary file with s4r data [" |
| + tmpFile.getAbsolutePath() + "]"); |
| } |
| } |
| |
| private class HttpClientPipelineFactory implements ChannelPipelineFactory { |
| |
| File tmpFile; |
| |
| public HttpClientPipelineFactory(File tmpFile) { |
| this.tmpFile = tmpFile; |
| } |
| |
| @Override |
| public ChannelPipeline getPipeline() throws Exception { |
| // Create a default pipeline implementation. |
| ChannelPipeline pipeline = Channels.pipeline(); |
| |
| pipeline.addLast("codec", new HttpClientCodec()); |
| |
| // Remove the following line if you don't want automatic content decompression. |
| pipeline.addLast("inflater", new HttpContentDecompressor()); |
| |
| pipeline.addLast("handler", new HttpResponseHandler(tmpFile)); |
| return pipeline; |
| } |
| } |
| |
| // see http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/http/snoop/HttpResponseHandler.html |
| private class HttpResponseHandler extends SimpleChannelUpstreamHandler { |
| |
| private boolean readingChunks; |
| FileOutputStream fos; |
| |
| public HttpResponseHandler(File tmpFile) throws FileNotFoundException { |
| this.fos = new FileOutputStream(tmpFile); |
| } |
| |
| @Override |
| public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { |
| if (!readingChunks) { |
| HttpResponse response = (HttpResponse) e.getMessage(); |
| |
| if (response.isChunked()) { |
| readingChunks = true; |
| } else { |
| copyContentToTmpFile(response.getContent()); |
| } |
| } else { |
| HttpChunk chunk = (HttpChunk) e.getMessage(); |
| if (chunk.isLast()) { |
| readingChunks = false; |
| fos.close(); |
| } else { |
| copyContentToTmpFile(chunk.getContent()); |
| } |
| } |
| |
| } |
| |
| private void copyContentToTmpFile(ChannelBuffer content) throws IOException, FileNotFoundException { |
| ChannelBufferInputStream cbis = new ChannelBufferInputStream(content); |
| ByteStreams.copy(cbis, fos); |
| } |
| } |
| } |