2010-12-02 13 views
11

Antes de usar la Biblioteca de tareas paralelas, a menudo he usado CorrelationManager.ActivityId para realizar un seguimiento del informe de seguimiento/error con varios subprocesos.¿Cómo afectan las Tareas en la Biblioteca paralela de tareas a ActivityID?

ActivityId se almacena en Thread Local Storage, por lo que cada thread obtiene su propia copia. La idea es que cuando inicie un hilo (actividad), asigne un nuevo ActivityId. El ActivityId se escribirá en los registros con cualquier otra información de seguimiento, lo que permite seleccionar la información de seguimiento para una sola 'Actividad'. Esto es realmente útil con WCF ya que el ActivityId se puede trasladar al componente de servicio.

Aquí es un ejemplo de lo que estoy hablando:

static void Main(string[] args) 
{ 
    ThreadPool.QueueUserWorkItem(new WaitCallback((o) => 
    { 
     DoWork(); 
    })); 
} 

static void DoWork() 
{ 
    try 
    { 
     Trace.CorrelationManager.ActivityId = Guid.NewGuid(); 
     //The functions below contain tracing which logs the ActivityID. 
     CallFunction1(); 
     CallFunction2(); 
     CallFunction3(); 
    } 
    catch (Exception ex) 
    { 
     Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString()); 
    } 
} 

Ahora, con el TPL, mi entendimiento es que múltiples hilos compartir tareas. ¿Significa esto que ActivityId es propenso a ser reiniciado a mitad de la tarea (por otra tarea)? ¿Hay un nuevo mecanismo para tratar el seguimiento de actividad?

+0

No tengo nada que ofrecer, pero también estoy interesado en este tema. Parece que la misma pregunta también se aplica al conjunto de información que utiliza CallContext.LogicalSetData en general ya que esa es la tecnología que Trace.CorrelationManager utiliza para almacenar el ActivityId y el LogicalOperationStack. – wageoghe

+0

@wageohe - Finalmente llegué a probar esto hoy, he publicado mis resultados :) –

+0

Publiqué algunos más detalles en mi respuesta. También publiqué un enlace a otra respuesta aquí en SO, una nueva pregunta que hice aquí en SO, así como una pregunta que hice (pero aún no se ha respondido el 1 de enero de 2011) en el foro de extensiones paralelas de Microsoft. . Quizás encuentres la información útil, tal vez no. – wageoghe

Respuesta

6

Realicé algunos experimentos y resulta que la suposición en mi pregunta es incorrecta: las tareas múltiples creadas con el TPL no se ejecutan en el mismo subproceso al mismo tiempo.

ThreadLocalStorage es seguro de usar con TPL en .NET 4.0, ya que un hilo solo puede ser utilizado por una tarea a la vez.

La suposición de que las tareas se pueden compartir hilos simultáneamente se basó en una entrevista que oí hablar C# 5.0 en DotNetRocks (lo siento, no puedo recordar que muestran que era) - así que mi pregunta puede (o no) ser relevante pronto

Mi experimento inicia una serie de tareas y registra cuántas tareas se ejecutaron, cuánto tiempo tomaron y cuántos subprocesos se consumieron. El código está debajo si alguien quisiera repetirlo.

class Program 
{ 
    static void Main(string[] args) 
    { 
     int totalThreads = 100; 
     TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; 
     Task task = null; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 
     Task[] allTasks = new Task[totalThreads]; 
     for (int i = 0; i < totalThreads; i++) 
     { 
      task = Task.Factory.StartNew(() => 
      { 
       DoLongRunningWork(); 
      }, taskCreationOpt); 

      allTasks[i] = task; 
     } 

     Task.WaitAll(allTasks); 
     stopwatch.Stop(); 

     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.ReadKey(); 
    } 


    private static List<int> threadIds = new List<int>(); 
    private static object locker = new object(); 
    private static void DoLongRunningWork() 
    { 
     lock (locker) 
     { 
      //Keep a record of the managed thread used. 
      if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) 
       threadIds.Add(Thread.CurrentThread.ManagedThreadId); 
     } 
     Guid g1 = Guid.NewGuid(); 
     Trace.CorrelationManager.ActivityId = g1; 
     Thread.Sleep(3000); 
     Guid g2 = Trace.CorrelationManager.ActivityId; 
     Debug.Assert(g1.Equals(g2)); 
    } 
} 

La salida (por supuesto, esto dependerá de la máquina) era:

Completed 100 tasks in 23097 milliseconds 
Used 23 threads 

Cambio taskCreationOpt a TaskCreationOptions.LongRunning dieron resultados diferentes:

Completed 100 tasks in 3458 milliseconds 
Used 100 threads 
+0

Interesantes resultados. He encontrado algo interesante sobre Trace.CorrelationManager.ActivityId usando un programa de prueba basado en tu código. Usando su código como está (más o menos), y usando Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation, puedo obtener resultados "buenos". Es decir, al usar Tareas de la misma manera en que usted demuestra y coloca entre corchetes cada tarea (dentro del delegado) con StartLogicalOperation/StopLogicalOperation, LogicalOperationStack siempre parece estar sincronizado. Sin embargo, el uso de Parallel.For puede generar malos resultados. Voy a publicar mi prueba como una respuesta, ya que tiene el código – wageoghe

+0

Gran respuesta; gracias por publicar. Sin embargo, una obviedad pedante: el campo 'List threadIds' probablemente debería ser un ConcurrentBag o ConcurrentDictionary en lugar de una lista genérica no sincronizada (el diccionario, por ejemplo, si desea asociar el taskId con el threadId, o simplemente usarlo como un hashtable concurrente & ignorar el valor.) –

3

Por favor, perdona mi fijación de este como respuesta, ya que no es realmente una respuesta a su pregunta, sin embargo, está relacionada con su pregunta, ya que se trata del comportamiento de CorrelationManager y de los hilos/tareas/etc. He estado buscando utilizando los métodos LogicalOperationStack de CorrelationManager (y StartLogicalOperation/StopLogicalOperation) para proporcionar contexto adicional en escenarios de subprocesamiento múltiple.

Tomé su ejemplo y lo modifiqué ligeramente para agregar la capacidad de realizar trabajos en paralelo usando Parallel.For. Además, uso StartLogicalOperation/StopLogicalOperation para colocar (internamente) DoLongRunningWork. Conceptualmente, DoLongRunningWork hace algo como esto cada vez que se ejecuta:

DoLongRunningWork 
    StartLogicalOperation 
    Thread.Sleep(3000) 
    StopLogicalOperation 

He encontrado que si añado estas operaciones lógicas a su código (más o menos como es), todos los operatins lógicas permanecen en sincronía (siempre el número esperado de operaciones en la pila y los valores de las operaciones en la pila son siempre los esperados).

En algunas de mis propias pruebas encontré que esto no fue siempre el caso. La pila de operaciones lógicas se estaba "corrompiendo". La mejor explicación que podría surgir es que la "fusión" de la información de CallContext en el contexto de subproceso "principal" cuando el subproceso "secundario" sale causaba que la "vieja" información de contexto de subproceso secundario (operación lógica) " heredado "por otro hilo de niño hermano.

El problema también podría estar relacionado con el hecho de que Parallel.For aparentemente usa el hilo principal (al menos en el código de ejemplo, como está escrito) como uno de los "hilos de trabajo" (o lo que sea que se les llame en el dominio paralelo). Cada vez que se ejecuta DoLongRunningWork, se inicia una nueva operación lógica (al principio) y se detiene (al final) (es decir, se inserta en LogicalOperationStack y se abre de nuevo). Si el hilo principal ya tiene una operación lógica, en efecto, y si DoLongRunningWork se ejecuta en el hilo principal, a continuación, se inicia una nueva operación lógica por lo LogicalOperationStack del hilo principal tiene ahora dos operaciones. Todas las ejecuciones posteriores de DoLongRunningWork (siempre y cuando este "iteración" de DoLongRunningWork está ejecutando en el hilo principal) serán (aparentemente) heredar LogicalOperationStack del hilo principal (que ahora tiene dos operaciones en él, en lugar de sólo la operación de espera).

Me tomó mucho tiempo para averiguar por qué el comportamiento de la LogicalOperationStack era diferente en mi ejemplo que en mi versión modificada de su ejemplo. Finalmente vi que en mi código había puesto entre corchetes todo el programa en una operación lógica, mientras que en mi versión modificada de su programa de prueba no lo hice. La implicación es que en mi programa de prueba, cada vez que se realizaba mi "trabajo" (análogo a DoLongRunningWork), ya había una operación lógica en vigor. En mi versión modificada de su programa de prueba, no había puesto entre corchetes todo el programa en una operación lógica.

Así, cuando lo modifico su programa de pruebas para poner entre paréntesis el programa completo en una operación lógica Y si estoy usando Parallel.For, me encontré con exactamente el mismo problema.

Usando el modelo conceptual anterior, este se ejecutará correctamente:

Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 

Aunque esto va a valer el tiempo debido a una aparentemente por LogicalOperationStack sincronización:

StartLogicalOperation 
Parallel.For 
    DoLongRunningWork 
    StartLogicalOperation 
    Sleep(3000) 
    StopLogicalOperation 
StopLogicalOperation 

Aquí es mi programa de ejemplo. Es similar a la tuya ya que tiene un método DoLongRunningWork que manipula ActivityId y LogicalOperationStack. También tengo dos sabores de patear DoLongRunningWork. Un sabor usa Tareas que uno usa Paralelo.Para. Cada sabor también puede ejecutarse de manera que toda la operación paralelizada se encerra en una operación lógica o no. Entonces, hay un total de 4 formas de ejecutar la operación paralela. Para probar cada uno, simplemente elimine el comentario del método "Usar ..." deseado, recompile y ejecute. UseTasks, UseTasks(true) y UseParallelFor todos deben ejecutarse hasta su finalización. UseParallelFor(true) se confirmará en algún momento porque LogicalOperationStack no tiene el número esperado de entradas.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Diagnostics; 
using System.Threading; 
using System.Threading.Tasks; 

namespace CorrelationManagerParallelTest 
{ 
    class Program 
    {  
    static void Main(string[] args)  
    { 
     //UseParallelFor(true) will assert because LogicalOperationStack will not have expected 
     //number of entries, all others will run to completion. 

     UseTasks(); //Equivalent to original test program with only the parallelized 
         //operation bracketed in logical operation. 
     ////UseTasks(true); //Bracket entire UseTasks method in logical operation 
     ////UseParallelFor(); //Equivalent to original test program, but use Parallel.For 
          //rather than Tasks. Bracket only the parallelized 
          //operation in logical operation. 
     ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation 
    }  

    private static List<int> threadIds = new List<int>();  
    private static object locker = new object();  

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId; 

    private static int mainThreadUsedInDelegate = 0; 

    // baseCount is the expected number of entries in the LogicalOperationStack 
    // at the time that DoLongRunningWork starts. If the entire operation is bracketed 
    // externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise, 
    // it will be 0. 
    private static void DoLongRunningWork(int baseCount)  
    { 
     lock (locker) 
     { 
     //Keep a record of the managed thread used.    
     if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId)) 
      threadIds.Add(Thread.CurrentThread.ManagedThreadId); 

     if (Thread.CurrentThread.ManagedThreadId == mainThreadId) 
     { 
      mainThreadUsedInDelegate++; 
     } 
     }   

     Guid lo1 = Guid.NewGuid(); 
     Trace.CorrelationManager.StartLogicalOperation(lo1); 

     Guid g1 = Guid.NewGuid();   
     Trace.CorrelationManager.ActivityId = g1; 

     Thread.Sleep(3000);   

     Guid g2 = Trace.CorrelationManager.ActivityId; 
     Debug.Assert(g1.Equals(g2)); 

     //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation 
     //in effect when the Parallel.For operation was started. 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1)); 
     Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1)); 

     Trace.CorrelationManager.StopLogicalOperation(); 
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     TaskCreationOptions taskCreationOpt = TaskCreationOptions.None; 
     Task task = null; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Task[] allTasks = new Task[totalThreads]; 
     for (int i = 0; i < totalThreads; i++) 
     { 
     task = Task.Factory.StartNew(() => 
     { 
      DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }, taskCreationOpt); 
     allTasks[i] = task; 
     } 
     Task.WaitAll(allTasks); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    private static void UseParallelFor(bool encloseInLogicalOperation = false) 
    { 
     int totalThreads = 100; 
     Stopwatch stopwatch = new Stopwatch(); 
     stopwatch.Start(); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StartLogicalOperation(); 
     } 

     Parallel.For(0, totalThreads, i => 
     { 
     DoLongRunningWork(encloseInLogicalOperation ? 1 : 0); 
     }); 

     if (encloseInLogicalOperation) 
     { 
     Trace.CorrelationManager.StopLogicalOperation(); 
     } 

     stopwatch.Stop(); 
     Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds)); 
     Console.WriteLine(String.Format("Used {0} threads", threadIds.Count)); 
     Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate)); 

     Console.ReadKey(); 
    } 

    } 
} 

Todo este asunto de si LogicalOperationStack se puede utilizar con Parallel.For (y/u otro roscado/Tarea construye) o la forma en que se puede utilizar probablemente merece su propia pregunta. Tal vez publique una pregunta. Mientras tanto, me pregunto si tienes alguna idea al respecto (o, me pregunto si habías considerado usar LogicalOperationStack ya que ActivityId parece estar seguro).

[EDIT]

Véase mi respuesta a this question para obtener más información sobre el uso LogicalOperationStack y/o CallContext.LogicalSetData con algunos de los diversos contstructs Tema/ThreadPool/Tareas/paralelo.

Véase también mi pregunta aquí en lo que alrededor de LogicalOperationStack y extensiones paralelas: Is CorrelationManager.LogicalOperationStack compatible with Parallel.For, Tasks, Threads, etc

Por último, véase también mi pregunta aquí en el foro Extensiones paralelo de Microsoft: http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

En mis pruebas, parece que Trace. CorrelationManager.LogicalOperationStack puede dañarse al usar Parallel.For o Parallel.Invoke SI inicia una operación lógica en el hilo principal y luego inicia/detiene operaciones lógicas en el delegado. En mis pruebas (vea cualquiera de los dos enlaces anteriores) LogicalOperationStack siempre debe tener exactamente 2 entradas cuando se ejecuta DoLongRunningWork (si comienzo una operación lógica en el hilo principal antes de patear DoLongRunningWork usando varias técnicas). Entonces, por "corrupto" me refiero a que LogicalOperationStack eventualmente tendrá muchas más de 2 entradas.

Por lo que puedo decir, esto es probablemente porque Parallel.For y Parallel.Invoke utilizan el hilo principal como uno de los hilos "worker" para realizar la acción DoLongRunningWork.

El uso de una pila almacenada en CallContext.LogicalSetData para imitar el comportamiento de la LogicalOperationStack (similar a logicalThreadContext.Stacks de log4net que se almacena mediante CallContext.SetData) produce resultados aún peores. Si estoy usando una pila así para mantener el contexto, se corrompe (es decir, no tiene el número esperado de entradas) en casi todos los escenarios donde tengo una "operación lógica" en el hilo principal y una operación lógica en cada iteración/ejecución del delegado DoLongRunningWork.

+2

¿Voto no aceptado? ¿Sin comentarios? Gracias. – wageoghe

Cuestiones relacionadas