2012-01-13 34 views
21

Estoy escribiendo un programa C# para generar y cargar medio millón de archivos a través de FTP. Quiero procesar 4 archivos en paralelo ya que la máquina tiene 4 núcleos y la generación de archivos lleva mucho más tiempo. ¿Es posible convertir el siguiente ejemplo de Powershell en C#? ¿O hay algún marco mejor, como el marco Actor en C# (como F # MailboxProcessor)?Limite el número de hilos paralelos en C#

Powershell example

$maxConcurrentJobs = 3; 

# Read the input and queue it up 
$jobInput = get-content .\input.txt 
$queue = [System.Collections.Queue]::Synchronized((New-Object System.Collections.Queue)) 
foreach($item in $jobInput) 
{ 
    $queue.Enqueue($item) 
} 

# Function that pops input off the queue and starts a job with it 
function RunJobFromQueue 
{ 
    if($queue.Count -gt 0) 
    { 
     $j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue() 
     Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null 
    } 
} 

# Start up to the max number of concurrent jobs 
# Each job will take care of running the rest 
for($i = 0; $i -lt $maxConcurrentJobs; $i++) 
{ 
    RunJobFromQueue 
} 

Actualización:
La conexión con el servidor FTP remoto puede ser lento por lo que quieren limitar el procesamiento de envío por FTP.

+0

Si desea limitar el número de tareas paralelas, ¿por qué no utilizar el TPL? –

+1

El grupo de subprocesos debe ser lo suficientemente inteligente como para manejar esto por usted. ¿Por qué intentar administrarlo usted mismo? –

+3

Puede usar [PLINQ] (http://msdn.microsoft.com/en-us/library/dd460688.aspx) y establecer [WithDegreeOfParallelism] (http://msdn.microsoft.com/en-us/library/ dd383719.aspx) en consecuencia. –

Respuesta

5

Si está utilizando .Net 4.0 se puede utilizar el Parallel library

Suponiendo que estás iterando throug el medio millón de archivos que puede "paralelo" la iteración utilizando un Parallel Foreach for instance o puede have a look to PLinq Aquí un comparison between the two

+0

Por favor, justifique el -1. –

+0

Esta pregunta está etiquetada con C# -4.0, es obvio que está familiarizado con las extensiones y usa .NET 4. Una sola oración NO responde a su pregunta. –

+0

Es obvio que está usando C# 4.0 pero NO es obvio que esté familiarizado con la biblioteca de Parallel, pero no hará una pregunta. Además, mi respuesta contiene más o menos la misma información de la otra. Por favor, justifique el -1 por favor. –

16

Biblioteca de tareas paralelas es tu amigo aquí. Vea el enlace this que describe lo que está disponible para usted. Básicamente, el framework 4 viene con él, que optimiza estos subprocesos agrupados de hilos esencialmente en segundo plano para la cantidad de procesadores en la máquina en ejecución.

Tal vez algo en la línea de:

ParallelOptions options = new ParallelOptions(); 

options.MaxDegreeOfParallelism = 4; 

A continuación, en su bucle algo como:

Parallel.Invoke(options, 
() => new WebClient().Upload("http://www.linqpad.net", "lp.html"), 
() => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html")); 
2

Esencialmente, usted va a querer crear una acción o tarea para cada archivo para cargar , colóquelos en una lista y luego procese esa lista, limitando el número que se puede procesar en paralelo.

My blog post muestra cómo hacerlo con Tareas y con Acciones, y proporciona un ejemplo de proyecto que puede descargar y ejecutar para ver ambos en acción.

Con acciones

Si Uso de acciones, puede utilizar la función incorporada en .Net Parallel.Invoke. Aquí lo limitamos a ejecutar como máximo 4 hilos en paralelo.

var listOfActions = new List<Action>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(() => UploadFile(localFile))); 
} 

var options = new ParallelOptions {MaxDegreeOfParallelism = 4}; 
Parallel.Invoke(options, listOfActions.ToArray()); 

Esta opción no soporta asíncrono sin embargo, y estoy asumiendo que usted está FileUpload función será, por lo que es posible que desee utilizar el ejemplo de tareas a continuación.

con tareas

con las tareas no existe una función incorporada. Sin embargo, puede usar el que brindo en mi blog.

/// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); 
    } 

    /// <summary> 
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. 
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> 
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> 
    /// </summary> 
    /// <param name="tasksToRun">The tasks to run.</param> 
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> 
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) 
    { 
     // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. 
     var tasks = tasksToRun.ToList(); 

     using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) 
     { 
      var postTaskTasks = new List<Task>(); 

      // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. 
      tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); 

      // Start running each task. 
      foreach (var task in tasks) 
      { 
       // Increment the number of tasks currently running and wait if too many are running. 
       await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); 

       cancellationToken.ThrowIfCancellationRequested(); 
       task.Start(); 
      } 

      // Wait for all of the provided tasks to complete. 
      // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. 
      await Task.WhenAll(postTaskTasks.ToArray()); 
     } 
    } 

Y a continuación, crear su lista de tareas y llamar a la función a que se ejecuten, con decir un máximo de 4 simultánea a la vez, usted puede hacer esto:

var listOfTasks = new List<Task>(); 
foreach (var file in files) 
{ 
    var localFile = file; 
    // Note that we create the Task here, but do not start it. 
    listOfTasks.Add(new Task(async() => await UploadFile(localFile))); 
} 
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4); 

Además, debido a esta método admite asincrónico, no bloqueará el subproceso de la interfaz de usuario como usar Parallel.Invoke o Parallel.ForEach.

0

He codificado debajo de la técnica donde uso BlockingCollection como administrador de conteo de hilos. Es bastante simple de implementar y maneja el trabajo. Acepta simplemente Objetos de tarea y agrega un valor entero a la lista de bloqueo, aumentando el conteo de subprocesos en ejecución en 1. Cuando el subproceso finaliza, dequeue el objeto y libera el bloque en la operación de agregar para las próximas tareas.

 public class BlockingTaskQueue 
     { 
      private BlockingCollection<int> threadManager { get; set; } = null; 
      public bool IsWorking 
      { 
       get 
       { 
        return threadManager.Count > 0 ? true : false; 
       } 
      } 

      public BlockingTaskQueue(int maxThread) 
      { 
       threadManager = new BlockingCollection<int>(maxThread); 
      } 

      public async Task AddTask(Task task) 
      { 
       Task.Run(() => 
       { 
        Run(task); 
       }); 
      } 

      private bool Run(Task task) 
      { 
       try 
       { 
        threadManager.Add(1); 
        task.Start(); 
        task.Wait(); 
        return true; 

       } 
       catch (Exception ex) 
       { 
        return false; 
       } 
       finally 
       { 
        threadManager.Take(); 
       } 

      } 

     } 
Cuestiones relacionadas