Skip to content

[MRESOLVER-32] Parallel deploy and more #237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.aether.ConfigurationProperties;
Expand Down Expand Up @@ -71,7 +67,7 @@
import org.eclipse.aether.util.ConfigUtils;
import org.eclipse.aether.util.FileUtils;
import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
import org.eclipse.aether.util.concurrency.ThreadsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -84,6 +80,8 @@ final class BasicRepositoryConnector

private static final String CONFIG_PROP_THREADS = "aether.connector.basic.threads";

private static final String CONFIG_PROP_PARALLEL_PUT = "aether.connector.basic.parallelPut";

private static final String CONFIG_PROP_SMART_CHECKSUMS = "aether.connector.smartChecksums";

private static final Logger LOGGER = LoggerFactory.getLogger( BasicRepositoryConnector.class );
Expand All @@ -104,6 +102,8 @@ final class BasicRepositoryConnector

private final int maxThreads;

private final boolean parallelPut;

private final boolean smartChecksums;

private final boolean persistedChecksums;
Expand Down Expand Up @@ -145,31 +145,28 @@ final class BasicRepositoryConnector
this.providedChecksumsSources = providedChecksumsSources;
this.closed = new AtomicBoolean( false );

maxThreads = ConfigUtils.getInteger( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
maxThreads = ThreadsUtils.threadCount( session, 5, CONFIG_PROP_THREADS, "maven.artifact.threads" );
parallelPut = ConfigUtils.getBoolean( session, false, CONFIG_PROP_PARALLEL_PUT );
smartChecksums = ConfigUtils.getBoolean( session, true, CONFIG_PROP_SMART_CHECKSUMS );
persistedChecksums =
ConfigUtils.getBoolean( session, ConfigurationProperties.DEFAULT_PERSISTED_CHECKSUMS,
ConfigurationProperties.PERSISTED_CHECKSUMS );
}

private Executor getExecutor( Collection<?> artifacts, Collection<?> metadatas )
private Executor getExecutor( int tasks )
{
if ( maxThreads <= 1 )
{
return DirectExecutor.INSTANCE;
return ThreadsUtils.DIRECT_EXECUTOR;
}
int tasks = safe( artifacts ).size() + safe( metadatas ).size();
if ( tasks <= 1 )
{
return DirectExecutor.INSTANCE;
return ThreadsUtils.DIRECT_EXECUTOR;
}
if ( executor == null )
{
executor =
new ThreadPoolExecutor( maxThreads, maxThreads, 3L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new WorkerThreadFactory( getClass().getSimpleName() + '-'
+ repository.getHost() + '-' ) );
executor = ThreadsUtils.threadPool( maxThreads,
getClass().getSimpleName() + '-' + repository.getHost() + '-' );
}
return executor;
}
Expand All @@ -193,10 +190,7 @@ public void close()
{
if ( closed.compareAndSet( false, true ) )
{
if ( executor instanceof ExecutorService )
{
( (ExecutorService) executor ).shutdown();
}
ThreadsUtils.shutdown( executor );
transporter.close();
}
}
Expand All @@ -215,11 +209,14 @@ public void get( Collection<? extends ArtifactDownload> artifactDownloads,
{
failIfClosed();

Executor executor = getExecutor( artifactDownloads, metadataDownloads );
Collection<? extends ArtifactDownload> safeArtifactDownloads = safe( artifactDownloads );
Collection<? extends MetadataDownload> safeMetadataDownloads = safe( metadataDownloads );

Executor executor = getExecutor( safeArtifactDownloads.size() + safeMetadataDownloads.size() );
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();

for ( MetadataDownload transfer : safe( metadataDownloads ) )
for ( MetadataDownload transfer : safeMetadataDownloads )
{
URI location = layout.getLocation( transfer.getMetadata(), false );

Expand All @@ -239,7 +236,7 @@ public void get( Collection<? extends ArtifactDownload> artifactDownloads,
executor.execute( errorForwarder.wrap( task ) );
}

for ( ArtifactDownload transfer : safe( artifactDownloads ) )
for ( ArtifactDownload transfer : safeArtifactDownloads )
{
Map<String, String> providedChecksums = Collections.emptyMap();
for ( ProvidedChecksumsSource providedChecksumsSource : providedChecksumsSources.values() )
Expand Down Expand Up @@ -289,7 +286,13 @@ public void put( Collection<? extends ArtifactUpload> artifactUploads,
{
failIfClosed();

for ( ArtifactUpload transfer : safe( artifactUploads ) )
Collection<? extends ArtifactUpload> safeArtifactUploads = safe( artifactUploads );
Collection<? extends MetadataUpload> safeMetadataUploads = safe( metadataUploads );

Executor executor = getExecutor( parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1 );
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();

for ( ArtifactUpload transfer : safeArtifactUploads )
{
URI location = layout.getLocation( transfer.getArtifact(), true );

Expand All @@ -302,10 +305,13 @@ public void put( Collection<? extends ArtifactUpload> artifactUploads,

Runnable task = new PutTaskRunner( location, transfer.getFile(), transfer.getFileTransformer(),
checksumLocations, listener );
task.run();

executor.execute( errorForwarder.wrap( task ) );
}

for ( MetadataUpload transfer : safe( metadataUploads ) )
errorForwarder.await();

for ( MetadataUpload transfer : safeMetadataUploads )
{
URI location = layout.getLocation( transfer.getMetadata(), true );

Expand All @@ -317,8 +323,11 @@ public void put( Collection<? extends ArtifactUpload> artifactUploads,
layout.getChecksumLocations( transfer.getMetadata(), true, location );

Runnable task = new PutTaskRunner( location, transfer.getFile(), checksumLocations, listener );
task.run();

executor.execute( errorForwarder.wrap( task ) );
}

errorForwarder.await();
}

private static <T> Collection<T> safe( Collection<T> items )
Expand Down Expand Up @@ -622,18 +631,4 @@ private void uploadChecksum( URI location, Object checksum )
}

}

private static class DirectExecutor
implements Executor
{

static final Executor INSTANCE = new DirectExecutor();

@Override
public void execute( Runnable command )
{
command.run();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@
import java.util.Map;
import static java.util.Objects.requireNonNull;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.inject.Named;
Expand Down Expand Up @@ -71,9 +67,8 @@
import org.eclipse.aether.transfer.MetadataTransferException;
import org.eclipse.aether.transfer.NoRepositoryConnectorException;
import org.eclipse.aether.transfer.RepositoryOfflineException;
import org.eclipse.aether.util.ConfigUtils;
import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
import org.eclipse.aether.util.concurrency.WorkerThreadFactory;
import org.eclipse.aether.util.concurrency.ThreadsUtils;

/**
*/
Expand Down Expand Up @@ -374,8 +369,9 @@ else if ( exception == null )

if ( !tasks.isEmpty() )
{
int threads = ConfigUtils.getInteger( session, 4, CONFIG_PROP_THREADS );
Executor executor = getExecutor( Math.min( tasks.size(), threads ) );
int threads = ThreadsUtils.threadCount( session, 4, CONFIG_PROP_THREADS );
Executor executor = ThreadsUtils.executor(
Math.min( tasks.size(), threads ), getClass().getSimpleName() + '-' );
try
{
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
Expand Down Expand Up @@ -407,7 +403,7 @@ else if ( exception == null )
}
finally
{
shutdown( executor );
ThreadsUtils.shutdown( executor );
}
for ( ResolveTask task : tasks )
{
Expand Down Expand Up @@ -531,27 +527,6 @@ private void metadataDownloaded( RepositorySystemSession session, RequestTrace t
repositoryEventDispatcher.dispatch( event.build() );
}

private Executor getExecutor( int threads )
{
if ( threads <= 1 )
{
return command -> command.run();
}
else
{
return new ThreadPoolExecutor( threads, threads, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new WorkerThreadFactory( null ) );
}
}

private void shutdown( Executor executor )
{
if ( executor instanceof ExecutorService )
{
( (ExecutorService) executor ).shutdown();
}
}

class ResolveTask
implements Runnable
{
Expand Down
Loading