blob: e7eb91cfd8297b4c287d41c634fd97d596472229 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.maven.mercury.repository.local.flat;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import org.apache.maven.mercury.artifact.Artifact;
import org.apache.maven.mercury.crypto.api.StreamVerifierFactory;
import org.apache.maven.mercury.logging.IMercuryLogger;
import org.apache.maven.mercury.logging.MercuryLoggerManager;
import org.apache.maven.mercury.repository.api.AbstractRepository;
import org.apache.maven.mercury.repository.api.AbstractRepositoryWriter;
import org.apache.maven.mercury.repository.api.LocalRepository;
import org.apache.maven.mercury.repository.api.Repository;
import org.apache.maven.mercury.repository.api.RepositoryException;
import org.apache.maven.mercury.repository.api.RepositoryWriter;
import org.apache.maven.mercury.transport.api.Server;
import org.apache.maven.mercury.util.FileLockBundle;
import org.apache.maven.mercury.util.FileUtil;
import org.apache.maven.mercury.util.Util;
import org.codehaus.plexus.lang.DefaultLanguage;
import org.codehaus.plexus.lang.Language;
public class LocalRepositoryWriterFlat
extends AbstractRepositoryWriter
implements RepositoryWriter
{
public static final String SYSTEM_PROPERTY_PARALLEL_WORKERS = "mercury.local.repo.workers";
public static final int PARALLEL_WORKERS = Integer.parseInt( System.getProperty( SYSTEM_PROPERTY_PARALLEL_WORKERS, "4" ) );
public static final long SLEEP_FOR_WORKERS_TICK = 20l;
public static final String SYSTEM_PROPERTY_SLEEP_FOR_LOCK = "mercury.local.lock.wait.millis";
public static final long SLEEP_FOR_LOCK = Long.parseLong( System.getProperty( SYSTEM_PROPERTY_SLEEP_FOR_LOCK, "5000" ) );
public static final long SLEEP_FOR_LOCK_TICK = 5l;
private static final IMercuryLogger _log = MercuryLoggerManager.getLogger( LocalRepositoryWriterFlat.class );
private static final Language _lang = new DefaultLanguage( LocalRepositoryWriterFlat.class );
//---------------------------------------------------------------------------------------------------------------
private static final String [] _protocols = new String [] { "file" };
private final LocalRepository _repo;
private final File _repoDir;
private final ArtifactQueue _aq;
private static final ArifactWriteData LAST_ARTIFACT = new ArifactWriteData( null, null );
//---------------------------------------------------------------------------------------------------------------
public LocalRepositoryWriterFlat( LocalRepository repo )
{
if( repo == null )
throw new IllegalArgumentException("localRepo cannot be null");
_repoDir = repo.getDirectory();
if( _repoDir == null )
throw new IllegalArgumentException("localRepo directory cannot be null");
if( !_repoDir.exists() )
throw new IllegalArgumentException("localRepo directory \""+_repoDir.getAbsolutePath()+"\" should exist");
_repo = repo;
_aq = null;
}
//---------------------------------------------------------------------------------------------------------------
private LocalRepositoryWriterFlat( LocalRepository repo, File repoDir, ArtifactQueue aq )
{
_repo = repo;
_repoDir = repoDir;
_aq = aq;
}
//---------------------------------------------------------------------------------------------------------------
public Repository getRepository()
{
return _repo;
}
//---------------------------------------------------------------------------------------------------------------
public boolean canHandle( String protocol )
{
return AbstractRepository.DEFAULT_LOCAL_READ_PROTOCOL.equals( protocol );
}
//---------------------------------------------------------------------------------------------------------------
public String[] getProtocols()
{
return _protocols;
}
//---------------------------------------------------------------------------------------------------------------
public void close()
{
}
//---------------------------------------------------------------------------------------------------------------
public void writeArtifacts( Collection<Artifact> artifacts )
throws RepositoryException
{
if( artifacts == null || artifacts.size() < 1 )
return;
int nWorkers = PARALLEL_WORKERS;
if( artifacts.size() < nWorkers )
nWorkers = artifacts.size();
ArtifactQueue aq = new ArtifactQueue();
LocalRepositoryWriterFlat [] workers = new LocalRepositoryWriterFlat[ nWorkers ];
for( int i=0; i<nWorkers; i++ )
workers[ i ] = new LocalRepositoryWriterFlat( _repo, _repoDir, aq );
for( Artifact artifact : artifacts )
{
Set<StreamVerifierFactory> vFacs = null;
Server server = _repo.getServer();
if( server != null && server.hasWriterStreamVerifierFactories() )
vFacs = server.getWriterStreamVerifierFactories();
if( vFacs == null ) // let it be empty, but not null
vFacs = new HashSet<StreamVerifierFactory>(1);
aq.addArtifact( new ArifactWriteData( artifact, vFacs ) );
}
aq.addArtifact( LAST_ARTIFACT );
for( int i=0; i<nWorkers; i++ )
workers[ i ].start();
boolean alive = true;
while( alive )
{
alive = false;
for( int i=0; i<nWorkers; i++ )
if( workers[ i ].isAlive() )
{
alive = true;
try { sleep( SLEEP_FOR_WORKERS_TICK ); } catch( InterruptedException ie ) {}
}
}
}
//---------------------------------------------------------------------------------------------------------------
/* (non-Javadoc)
* @see java.lang.Thread#run()
*/
@Override
public void run()
{
try
{
for(;;)
{
ArifactWriteData awd = _aq.getArtifact();
if( awd == null || awd.artifact == null )
break;
writeArtifact( awd.artifact, awd.vFacs );
}
}
catch (InterruptedException e)
{
}
catch( RepositoryException e )
{
throw new RuntimeException(e);
}
}
//---------------------------------------------------------------------------------------------------------------
public void writeArtifact( final Artifact artifact, final Set<StreamVerifierFactory> vFacs )
throws RepositoryException
{
if( artifact == null )
return;
boolean isPom = "pom".equals( artifact.getType() );
byte [] pomBlob = artifact.getPomBlob();
boolean hasPomBlob = pomBlob != null && pomBlob.length > 0;
InputStream in = artifact.getStream();
if( in == null )
{
File aFile = artifact.getFile();
if( aFile == null && !isPom )
{
throw new RepositoryException( _lang.getMessage( "artifact.no.stream", artifact.toString() ) );
}
try
{
in = new FileInputStream( aFile );
}
catch( FileNotFoundException e )
{
if( !isPom )
throw new RepositoryException( e );
}
}
String relGroupPath = ((LocalRepositoryFlat)_repo).isCreateGroupFolders() ? artifact.getGroupId() : "";
String versionPath = _repoDir.getAbsolutePath() + (Util.isEmpty( relGroupPath ) ? "" : "/"+relGroupPath);
String lockDir = null;
FileLockBundle fLock = null;
try
{
if( isPom )
{
if( in == null && !hasPomBlob )
throw new RepositoryException( _lang.getMessage( "pom.artifact.no.stream", artifact.toString() ) );
if( in != null )
{
byte [] pomBlobBytes = FileUtil.readRawData( in );
hasPomBlob = pomBlobBytes != null && pomBlobBytes.length > 0;
if( hasPomBlob )
pomBlob = pomBlobBytes;
}
}
// create folders
lockDir = versionPath;
File gav = new File( lockDir );
gav.mkdirs();
fLock = FileUtil.lockDir( lockDir, SLEEP_FOR_LOCK, SLEEP_FOR_LOCK_TICK );
if( fLock == null )
throw new RepositoryException( _lang.getMessage( "cannot.lock.gav", lockDir, ""+SLEEP_FOR_LOCK ) );
String fName = versionPath+'/'+artifact.getBaseName()+'.'+artifact.getType();
if( !isPom ) // first - take care of the binary
FileUtil.writeAndSign( fName, in, vFacs );
// if classier - nothing else to do :)
if( artifact.hasClassifier() )
return;
if( ((LocalRepositoryFlat)_repo).isCreatePoms() && hasPomBlob )
{
FileUtil.writeAndSign( versionPath
+'/'+artifact.getArtifactId()+'-'+artifact.getVersion()+".pom", pomBlob, vFacs
);
}
}
catch( Exception e )
{
throw new RepositoryException( e );
}
finally
{
if( fLock != null )
fLock.release();
}
}
//---------------------------------------------------------------------------------------------------------------
//---------------------------------------------------------------------------------------------------------------
}
//=================================================================================================================
class ArifactWriteData
{
Artifact artifact;
Set<StreamVerifierFactory> vFacs;
public ArifactWriteData(Artifact artifact, Set<StreamVerifierFactory> vFacs)
{
this.artifact = artifact;
this.vFacs = vFacs;
}
}
//=================================================================================================================
class ArtifactQueue
{
LinkedList<ArifactWriteData> queue = new LinkedList<ArifactWriteData>();
boolean empty = false;
public synchronized void addArtifact( ArifactWriteData awd )
{
queue.addLast( awd );
empty = false;
notify();
}
public synchronized ArifactWriteData getArtifact()
throws InterruptedException
{
if( empty )
return null;
while( queue.isEmpty() )
wait();
ArifactWriteData res = queue.removeFirst();
if( res.artifact == null )
{
empty = true;
return null;
}
return res;
}
}
//=================================================================================================================