2012-03-27 9 views
12

He intentado que funcione lo que creo que es la forma más simple posible de subprocesamiento en mi aplicación, pero simplemente no puedo hacerlo.C# Threading - Lectura y hash de múltiples archivos al mismo tiempo, ¿método más fácil?

Lo que quiero hacer: tengo un formulario principal con una barra de estado y una barra de progreso. Tengo que leer algo entre 3 y 99 archivos y agregar sus hashes a una cadena [] que quiero agregar a una lista de todos los archivos con sus hashes respectivos. Luego tengo que comparar los elementos de esa lista con una base de datos (que viene en archivos de texto). Una vez hecho todo esto, tengo que actualizar un cuadro de texto en el formulario principal y la barra de progreso al 33%; sobre todo, simplemente no quiero que la forma principal se congele durante el procesamiento.

Los archivos con los que trabajo siempre suman 1.2GB (+/- unos pocos MB), lo que significa que debería poder leerlos en bytes [] sy procesarlos desde allí (tengo que calcular CRC32 , MD5 y SHA1 de cada uno de esos archivos, por lo que debería ser más rápido que leerlos todos desde un HDD 3 veces).

También debo tener en cuenta que algunos archivos pueden ser de 1 MB mientras que otro puede ser de 1 GB. Inicialmente, quería crear 99 hilos para 99 archivos, pero parece no ser el mejor, supongo que sería mejor reutilizar los hilos de los archivos pequeños, mientras que los hilos de los archivos aún se están ejecutando. Pero eso me suena bastante complicado, así que no estoy seguro de que sea sabio tampoco.

Hasta ahora he probado workerThreads y backgroundworkers, pero ninguno parece funcionar demasiado bien para mí; al menos los backgroundWorkers funcionaron ALGUNAS veces, pero ni siquiera puedo entender por qué no lo harían las otras veces ... de cualquier manera la forma principal aún se congeló. Ahora que he leído sobre la Biblioteca de tareas paralelas en .NET 4.0, pero pensé que debería preguntarle a alguien que sabe lo que está haciendo antes de perder más tiempo en esto.

Lo que quiero hacer se ve algo como esto (sin enhebrar):

List<string[]> fileSpecifics = new List<string[]>(); 

int fileMaxNumber = 42; // something between 3 and 99, depending on file set 

for (int i = 1; i <= fileMaxNumber; i++) 
{ 
    string fileName = "C:\\path\\to\\file" + i.ToString("D2") + ".ext"; // file01.ext - file99.ext 
    string fileSize = new FileInfo(fileName).Length.ToString(); 
    byte[] file = File.ReadAllBytes(fileName); 
    // hash calculations (using SHA1CryptoServiceProvider() etc., no problems with that so I'll spare you that, return strings) 
    file = null; // I didn't yet check if this made any actual difference but I figured it couldn't hurt 
    fileSpecifics.Add(new string[] { fileName, fileSize, fileCRC, fileMD5, fileSHA1 }); 
} 

// look for files in text database mentioned above, i.e. first check for "file bundles" with the same amount of files I have here; then compare file sizes, then hashes 
// again, no problems with that so I'll spare you that; the database text files are pretty small so parsing them doesn't need to be done in an extra thread. 

qué alguien ser tan amable de apuntar en la dirección correcta? Estoy buscando la forma más fácil de leer y copiar esos archivos rápidamente (creo que el hash toma algún tiempo en el que otros archivos ya puedan leerse) y guardar el resultado en una cadena [], sin congelar la forma principal, nada más , nada menos.

Estoy agradecido por cualquier entrada.

EDITAR para aclarar: por "backgroundWorkers working some of time" Quise decir que (para el mismo conjunto de archivos), tal vez la primera y cuarta ejecución de mi código produce la salida correcta y la UI se descongela en 5 segundos para la segunda, tercera y quinta ejecución congela el formulario (y después de 60 segundos recibo un mensaje de error que dice que un hilo no respondió dentro de ese marco de tiempo) y tengo que detener la ejecución a través de VS.

Gracias por todas sus sugerencias y sugerencias, ya que todos han adivinado correctamente que soy completamente nuevo para enhebrar y tendré que leer en los excelentes enlaces que publicaron. Luego probaré esos métodos y marcaré la respuesta que más me ayudó. ¡Gracias de nuevo!

+1

¿Qué quiere decir por el BackgroundWorker trabajando parte del tiempo? Si se implementa correctamente, el procesamiento realizado en BackgroundWorker no debe provocar que el formulario se congele. – evasilchenko

+0

Si están en 1 disco, solo necesita 1 (extra) subproceso. –

+1

Este artículo puede ser de ayuda para usted: http://www.hanselman.com/blog/BackToParallelBasicsDontBlockYourThreadsMakeAsyncIOWorkForYou.aspx –

Respuesta

18

Con .NET Framework 4.X

  1. Uso Directory.EnumerateFiles Método para archivos eficientes/perezosos enumeración
  2. Uso Parallel.For() a delegar el trabajo paralelismo con el marco PLINQ o utilizar TPL delegar sola tarea por oleoducto Etapa
  3. Uso Pipelines pattern a etapas de canalización siguiente: hashcodes cálculo , comparar con el patrón, actualizar la interfaz de usuario
  4. para evitar UI congelación uso de técnicas apropiadas: para WPF usar Dispatcher.BeginInvoke(), para WinForms utilizan Invoke(), see this SO answer
  5. Teniendo en cuenta que todo esto tiene UI, puede ser útil agregar alguna característica de cancelación para abandonar la operación larga si es necesario, eche un vistazo a la clase CreateLinkedTokenSource que permite activar CancellationToken del "alcance externo" Puedo intentar agregar un ejemplo pero es vale la pena hacerlo tú mismo para que aprendas todo esto en lugar de simplemente copiar/pegar -> funcionó -> lo olvidaste.

PS: lectura obligada - Pipelines paper en MSDN


implementación TPL tubería específica

  • implementación patrón de Pipeline: tres etapas: el cálculo de hash, partido, actualización de la interfaz
  • tres tareas , uno por etapa
  • Dos colas de bloqueo

//

// 1) CalculateHashesImpl() should store all calculated hashes here 
// 2) CompareMatchesImpl() should read input hashes from this queue 
// Tuple.Item1 - hash, Typle.Item2 - file path 
var calculatedHashes = new BlockingCollection<Tuple<string, string>>(); 


// 1) CompareMatchesImpl() should store all pattern matching results here 
// 2) SyncUiImpl() method should read from this collection and update 
// UI with available results 
var comparedMatches = new BlockingCollection<string>(); 

var factory = new TaskFactory(TaskCreationOptions.LongRunning, 
           TaskContinuationOptions.None); 


var calculateHashesWorker = factory.StartNew(() => CalculateHashesImpl(...)); 
var comparedMatchesWorker = factory.StartNew(() => CompareMatchesImpl(...)); 
var syncUiWorker= factory.StartNew(() => SyncUiImpl(...)); 

Task.WaitAll(calculateHashesWorker, comparedMatchesWorker, syncUiWorker); 

CalculateHashesImpl():

private void CalculateHashesImpl(string directoryPath) 
{ 
    foreach (var file in Directory.EnumerateFiles(directoryPath)) 
    { 
     var hash = CalculateHashTODO(file); 
     calculatedHashes.Add(new Tuple<string, string>(hash, file.Path)); 
    } 
} 

CompareMatchesImpl():

private void CompareMatchesImpl() 
{ 
    foreach (var hashEntry in calculatedHashes.GetConsumingEnumerable()) 
    { 
     // TODO: obviously return type is up to you 
     string matchResult = GetMathResultTODO(hashEntry.Item1, hashEntry.Item2); 
     comparedMatches.Add(matchResult); 
    } 
} 

SyncUiImpl():

private void UpdateUiImpl() 
{ 
    foreach (var matchResult in comparedMatches.GetConsumingEnumerable()) 
    { 
     // TODO: track progress in UI using UI framework specific features 
     // to do not freeze it 
    } 
} 

TODO: Considere el uso de CancellationToken como un parámetro para todas las llamadas GetConsumingEnumerable() por lo que fácilmente puede detener una ejecución tubería cuando sea necesario.

+0

Voy a echar un vistazo a eso, muchas gracias! –

+0

Pipeline paper es una muy buena lectura! :) –

17

En primer lugar, debe utilizar un mayor nivel de abstracción para resolver este problema. Tienes que completar un montón de tareas, por lo tanto, utiliza la abstracción de "tareas". Debería utilizar la Biblioteca de tareas paralelas para hacer este tipo de cosas. Deje que el TPL se ocupe de la cuestión de cuántos subprocesos de trabajo crear: la respuesta podría ser tan baja como uno si el trabajo está cerrado en E/S.

Si quiere hacer su propia rosca, un buen consejo:

  • No vuelvas bloque en el hilo de interfaz de usuario. Eso es lo que está congelando su aplicación.Propóngase un protocolo mediante el cual los hilos de trabajo pueden comunicarse con su subproceso de interfaz de usuario, que luego no hace nada excepto para responder a los eventos de la interfaz de usuario. Recuerde que los métodos de controles de interfaz de usuario como barras de finalización de tareas nunca deben ser llamados por ningún otro hilo que no sea el de la interfaz de usuario.

  • No cree 99 hilos para leer 99 archivos. Es como obtener 99 piezas de correo y contratar 99 asistentes para escribir respuestas: una solución extraordinariamente costosa para un problema simple. Si su trabajo es intensivo de CPU entonces no tiene sentido "contratar" más hilos que las CPU para darles servicio. (Es como contratar a 99 asistentes en una oficina que solo tiene cuatro escritorios. Los asistentes pasan la mayor parte de su tiempo esperando a que se siente un escritorio, en lugar de leer su correo.) Si su trabajo es intensivo en disco, la mayoría de esos hilos van estar inactivo la mayor parte del tiempo esperando el disco, que es una pérdida de recursos aún mayor.

+0

+1. Con una tarea no vinculada a SSD/disco flash como esta, es ** probable ** (prototipo y medida) que la adición de subprocesos realmente disminuya la velocidad debido a restricciones de hardware. –

0

Echa un vistazo TPL Dataflow. Puedes usar un ActionBlock acelerado que administrará la parte difícil para ti.

0

Si tengo entendido que está buscando realizar algunas tareas en segundo plano y no bloquear su interfaz de usuario, entonces UI BackgroundWorker sería una opción adecuada. Usted mencionó que lo hizo funcionar algunas veces, por lo que mi recomendación sería tomar lo que tenía en un estado semielaborado y mejorarlo rastreando los fallos. Si mi corazonada es correcta, tu empleado estaba lanzando una excepción, que no parece que estés manejando en tu código. Las excepciones no controladas que brotan de sus hilos que contienen hacen que sucedan cosas malas.

2

En primer lugar, espero que esté utilizando una biblioteca incorporada para calcular hashes. Es posible escribir uno propio, pero es mucho más seguro usar algo que ha existido por un tiempo.

Puede que solo necesite crear tantos subprocesos como CPU si su proceso consume mucha CPU. Si está vinculado por E/S, es posible que pueda salirse con más hilos.

No recomiendo cargar todo el archivo en la memoria. Su biblioteca hash debe permitir la actualización de un fragmento a la vez. Lea un fragmento en la memoria, úselo para actualizar los hash de cada algoritmo, lea el siguiente fragmento y repita hasta el final del archivo. El enfoque fragmentado ayudará a reducir las demandas de memoria de su programa.

Como han sugerido otros, mira en el Task Parallel Library, particularmente Data Parallelism. Podría ser tan fácil como esto:

Parallel.ForEach(fileSpecifics, item => CalculateHashes(item)); 
+0

Su solución de un trazador de líneas es, por supuesto, más intrigante :) Pero, por supuesto, tendré que leer mucho más. Para MD5 y SHA1 utilizo System.Security.Cryptography pero no pude encontrar lo mismo para CRC32, así que utilicé la clase CRC32 de dominio público de Damien Guard. Tendré que verificar si eso admite el cálculo con fragmentos. ¡Gracias! –

0

Este código hash de un archivo (corriente) utilizando dos tareas - una para la lectura, la segunda para hash, de manera más robusta debe leer más trozos adelante.

Dado que el ancho de banda del procesador es mucho mayor que el del disco, a menos que utilice una unidad flash de alta velocidad, no obtendrá nada si mezcla más archivos al mismo tiempo.

public void TransformStream(Stream a_stream, long a_length = -1) 
{ 
    Debug.Assert((a_length == -1 || a_length > 0)); 

    if (a_stream.CanSeek) 
    { 
     if (a_length > -1) 
     { 
      if (a_stream.Position + a_length > a_stream.Length) 
       throw new IndexOutOfRangeException(); 
     } 

     if (a_stream.Position >= a_stream.Length) 
      return; 
    } 

    System.Collections.Concurrent.ConcurrentQueue<byte[]> queue = 
     new System.Collections.Concurrent.ConcurrentQueue<byte[]>(); 
    System.Threading.AutoResetEvent data_ready = new System.Threading.AutoResetEvent(false); 
    System.Threading.AutoResetEvent prepare_data = new System.Threading.AutoResetEvent(false); 

    Task reader = Task.Factory.StartNew(() => 
    { 
     long total = 0; 

     for (; ;) 
     { 
      byte[] data = new byte[BUFFER_SIZE]; 
      int readed = a_stream.Read(data, 0, data.Length); 

      if ((a_length == -1) && (readed != BUFFER_SIZE)) 
       data = data.SubArray(0, readed); 
      else if ((a_length != -1) && (total + readed >= a_length)) 
       data = data.SubArray(0, (int)(a_length - total)); 

      total += data.Length; 

      queue.Enqueue(data); 
      data_ready.Set(); 

      if (a_length == -1) 
      { 
       if (readed != BUFFER_SIZE) 
        break; 
      } 
      else if (a_length == total) 
       break; 
      else if (readed != BUFFER_SIZE) 
       throw new EndOfStreamException(); 

      prepare_data.WaitOne(); 
     } 
    }); 

    Task hasher = Task.Factory.StartNew((obj) => 
    { 
     IHash h = (IHash)obj; 
     long total = 0; 

     for (; ;) 
     { 
      data_ready.WaitOne(); 

      byte[] data; 
      queue.TryDequeue(out data); 

      prepare_data.Set(); 

      total += data.Length; 

      if ((a_length == -1) || (total < a_length)) 
      { 
       h.TransformBytes(data, 0, data.Length); 
      } 
      else 
      { 
       int readed = data.Length; 
       readed = readed - (int)(total - a_length); 
       h.TransformBytes(data, 0, data.Length); 
      } 

      if (a_length == -1) 
      { 
       if (data.Length != BUFFER_SIZE) 
        break; 
      } 
      else if (a_length == total) 
       break; 
      else if (data.Length != BUFFER_SIZE) 
       throw new EndOfStreamException(); 
     } 
    }, this); 

    reader.Wait(); 
    hasher.Wait(); 
} 

Resto de código aquí: http://hashlib.codeplex.com/SourceControl/changeset/view/71730#514336

Cuestiones relacionadas