blob: 261672df4300992d589d2d768a4b5ee70a402421 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.maven.mercury.spi.http.client.deploy;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Set;
import javax.servlet.http.HttpServletResponse;
import org.apache.maven.mercury.crypto.api.StreamObserver;
import org.apache.maven.mercury.logging.IMercuryLogger;
import org.apache.maven.mercury.logging.MercuryLoggerManager;
import org.apache.maven.mercury.spi.http.client.FileExchange;
import org.apache.maven.mercury.spi.http.client.HttpClientException;
import org.apache.maven.mercury.spi.http.client.ObservableInputStream;
import org.apache.maven.mercury.transport.api.Binding;
import org.apache.maven.mercury.transport.api.Server;
import org.mortbay.io.Buffer;
import org.mortbay.jetty.HttpMethods;
import org.mortbay.jetty.client.HttpClient;
/**
* FilePutExchange
* <p/>
* Asynchronously PUT a file to a remote server. The file that is being uploaded can also
* have it's SHA-1 digest calculated as it is being streamed up.
*/
public abstract class FilePutExchange extends FileExchange
{
private static final int __readLimit = 1024;
private static final IMercuryLogger log = MercuryLoggerManager.getLogger(FilePutExchange.class);
private String _batchId;
private InputStream _inputStream;
private String _remoteRepoUrl;
private String _remoteBatchId;
private Set<StreamObserver> _observers = new HashSet<StreamObserver>();
public abstract void onFileComplete( String url, File localFile );
public abstract void onFileError( String url, Exception e );
public FilePutExchange( Server server, String batchId, Binding binding, File localFile, Set<StreamObserver> observers, HttpClient client )
{
super( server, binding, localFile, client );
_observers.addAll(observers);
_batchId = batchId;
}
/**
* Start the upload. Ensure that the id of the mercury is set as a request header
* so all files part of the same mercury can be identified as an atomic unit.
*/
public void send()
{
try
{
setMethod( HttpMethods.PUT );
setRequestHeader( "Content-Type", "application/octet-stream" );
if (_binding.isFile())
{
setRequestHeader( "Content-Length", String.valueOf( _localFile.length() ) );
if (log.isDebugEnabled())
log.debug("PUT of "+_localFile.length()+" bytes");
for (StreamObserver o: _observers)
o.setLength(_localFile.length());
}
if (log.isDebugEnabled())
log.debug("PUT: "+getURI());
setRequestContentSource( getInputStream() );
setRequestHeader( __BATCH_HEADER, _batchId );
super.send();
}
catch ( Exception e )
{
onFileError( _url, e );
}
}
public boolean isRemoteJetty()
{
return _remoteRepoUrl != null;
}
public String getRemoteJettyUrl()
{
return _remoteRepoUrl;
}
protected void onResponseHeader( Buffer name, Buffer value )
{
if ( name.toString().equalsIgnoreCase( __BATCH_SUPPORTED_HEADER ) )
{
_remoteRepoUrl = value.toString();
}
else if ( name.toString().equalsIgnoreCase( __BATCH_HEADER ) )
{
_remoteBatchId = value.toString();
}
}
protected void onResponseComplete()
{
try
{
if (_inputStream != null)
_inputStream.close();
if ( _status != HttpServletResponse.SC_OK && _status != HttpServletResponse.SC_CREATED && _status != HttpServletResponse.SC_NO_CONTENT )
{
onFileError( _url, new HttpClientException( _binding, "Http status code=" + _status ) );
return;
}
if ( _remoteBatchId != null && !_batchId.equals( _remoteBatchId ) )
{
onFileError( _url, new HttpClientException( _binding,
"Non matching mercury ids. Sent=" + _batchId + " received=" + _remoteBatchId ) );
return;
}
//we've uploaded the file
onFileComplete( _url, _localFile );
}
catch ( Exception e )
{
onFileError( _url, new HttpClientException( _binding, e.getLocalizedMessage() ) );
}
}
private InputStream getInputStream()
throws IOException
{
if ( _inputStream == null )
{
InputStream is = null;
if (_binding.isFile())
is = new FileInputStream( _localFile );
else if (_binding.isInMemory())
{
is = _binding.getLocalInputStream();
if (!getRetryStatus())
{
if (is.markSupported())
is.mark(__readLimit);
}
else
{
if (is.markSupported())
is.reset();
}
}
//if this request is being retried, then don't set up the observers a second
//time?
if (!getRetryStatus())
{
ObservableInputStream ois = new ObservableInputStream( is );
_inputStream = ois;
ois.addObservers(_observers);
}
else
_inputStream = is;
}
return _inputStream;
}
protected void onRetry() throws IOException
{
super.onRetry();
if (_inputStream != null)
_inputStream.close();
_inputStream = null;
setRequestContent(null);
setRequestContentSource(getInputStream());
}
}