5

The bottom of this article describe cómo usar GetOrAdd puede causar (si lo entiendo correctamente) resultados corruptos o inesperados.Evitar datos obsoletos (lógicamente corruptos) cuando se utiliza "ConcurrentDictionary.GetOrAdd()", código de reproducción incluido

recorte/

ConcurrentDictionary está diseñado para multiproceso escenarios. No tiene que usar bloqueos en su código para agregar o eliminar elementos de la colección. Sin embargo, siempre es posible que un subproceso recupere un valor y otro subproceso para actualizar inmediatamente la colección al dar a la misma clave un nuevo valor.

Además, aunque todos los métodos de ConcurrentDictionary son hilo de seguridad, no todos los métodos son atómicas, específicamente GetOrAdd y AddOrUpdate. El delegado de usuario que se pasa a estos métodos es invocado fuera del bloqueo interno del diccionario. (Esto se hace para evitar que el código desconocido de bloquear todas las discusiones.) Por lo tanto, es posible para esta secuencia de eventos que se produzca:

1) threadA llama GetOrAdd, no encuentra ningún elemento y crea un nuevo elemento para agregar por invocando el delegado valueFactory.

2) threadB llama GetOrAdd al mismo tiempo, su delegado valueFactory es invocado y que llega a la cerradura interna antes threadA, y así se le añade su nuevo par clave-valor al diccionario.

3) Delegado de usuario de threadA completa, y el hilo llega a la cerradura , pero ahora ve que el elemento ya existe

4) threadA realiza una "Get", y devuelve los datos que había previamente añadió por threadB.

Por lo tanto, no se garantiza que los datos devueltos por GetOrAdd sean los mismos datos creados por valueFactory del subproceso. Una secuencia similar de eventos puede ocurrir cuando se llama a AddOrUpdate .

Pregunta

Cuál es la forma correcta para verificar los datos y vuelva a intentar la actualización? Un buen enfoque sería un método de extensión para intentar/reintentar esta operación en función del contenido del valor anterior.

¿Cómo se implementaría esto? ¿Puedo confiar en el resultado (verify) como estado final válido, o debo volver a intentar y recuperar los valores usando un método diferente?

Código

El siguiente código tiene una condición de carrera en la actualización de los valores. El comportamiento deseado es que AddOrUpdateWithoutRetrieving() incrementará varios valores de diferentes maneras (usando ++ o Interlocked.Increment()).

También quiero realizar múltiples operaciones de campo en una sola unidad y reintentar la actualización si la actualización anterior no "tomó" debido a una condición de carrera.

Ejecute el código y verá que cada valor que aparece en la consola comienza incrementándose en uno, pero cada uno de los valores se desplazará y algunos tendrán algunas iteraciones adelante/atrás.

namespace DictionaryHowTo 
{ 
    using System; 
    using System.Collections.Concurrent; 
    using System.Collections.Generic; 
    using System.Linq; 
    using System.Text; 
    using System.Threading; 
    using System.Threading.Tasks; 

    // The type of the Value to store in the dictionary: 
    class FilterConcurrentDuplicate 
    { 
     // Create a new concurrent dictionary. 
     readonly ConcurrentDictionary<int, TestData> eventLogCache = 
      new ConcurrentDictionary<int, TestData>(); 

     static void Main() 
     { 
      FilterConcurrentDuplicate c = new FilterConcurrentDuplicate(); 

      c.DoRace(null); 
     } 

     readonly ConcurrentDictionary<int, TestData> concurrentCache = 
      new ConcurrentDictionary<int, TestData>(); 
     void DoRace(string[] args) 
     { 
      int max = 1000; 

      // Add some key/value pairs from multiple threads. 
      Task[] tasks = new Task[3]; 

      tasks[0] = Task.Factory.StartNew(() => 
      { 

       System.Random RandNum = new System.Random(); 
       int MyRandomNumber = RandNum.Next(1, 500); 

       Thread.Sleep(MyRandomNumber); 
       AddOrUpdateWithoutRetrieving(); 

      }); 

      tasks[1] = Task.Factory.StartNew(() => 
      { 
       System.Random RandNum = new System.Random(); 
       int MyRandomNumber = RandNum.Next(1, 1000); 

       Thread.Sleep(MyRandomNumber); 

       AddOrUpdateWithoutRetrieving(); 

      }); 

      tasks[2] = Task.Factory.StartNew(() => 
      { 
       AddOrUpdateWithoutRetrieving(); 

      }); 
      // Output results so far. 
      Task.WaitAll(tasks); 

      AddOrUpdateWithoutRetrieving(); 

      Console.WriteLine("Press any key."); 
      Console.ReadKey(); 
     } 
     public class TestData : IEqualityComparer<TestData> 
     { 
      public string aStr1 { get; set; } 
      public Guid? aGud1 { get; set; } 
      public string aStr2 { get; set; } 
      public int aInt1 { get; set; } 
      public long? aLong1 { get; set; } 

      public DateTime aDate1 { get; set; } 
      public DateTime? aDate2 { get; set; } 

      //public int QueryCount { get; set; } 
      public int QueryCount = 0;// 

      public string zData { get; set; } 
      public bool Equals(TestData x, TestData y) 
      { 
       return x.aStr1 == y.aStr1 && 
        x.aStr2 == y.aStr2 && 
         x.aGud1 == y.aGud1 && 
         x.aStr2 == y.aStr2 && 
         x.aInt1 == y.aInt1 && 
         x.aLong1 == y.aLong1 && 
         x.aDate1 == y.aDate1 && 
         x.QueryCount == y.QueryCount ; 
      } 

      public int GetHashCode(TestData obj) 
      { 
       TestData ci = (TestData)obj; 
       // http://stackoverflow.com/a/263416/328397 
       return 
        new { 
         A = ci.aStr1, 
         Aa = ci.aStr2, 
         B = ci.aGud1, 
         C = ci.aStr2, 
         D = ci.aInt1, 
         E = ci.aLong1, 
         F = ci.QueryCount , 
         G = ci.aDate1}.GetHashCode(); 
      } 
     } 
     private void AddOrUpdateWithoutRetrieving() 
     { 
      // Sometime later. We receive new data from some source. 
      TestData ci = new TestData() 
      { 
       aStr1 = "Austin", 
       aGud1 = new Guid(), 
       aStr2 = "System", 
       aLong1 = 100, 
       aInt1 = 1000, 
       QueryCount = 0, 
       aDate1 = DateTime.MinValue 
      }; 

      TestData verify = concurrentCache.AddOrUpdate(123, ci, 
       (key, existingVal) => 
       { 
        existingVal.aStr2 = "test1" + existingVal.QueryCount; 
        existingVal.aDate1 = DateTime.MinValue; 
        Console.WriteLine 
        ("Thread:" + Thread.CurrentThread.ManagedThreadId + 
          " Query Count A:" + existingVal.QueryCount); 
        Interlocked.Increment(ref existingVal.QueryCount); 
        System.Random RandNum = new System.Random(); 
        int MyRandomNumber = RandNum.Next(1, 1000); 

        Thread.Sleep(MyRandomNumber); 
        existingVal.aInt1++; 
        existingVal.aDate1 = 
         existingVal.aDate1.AddSeconds 
         (existingVal.aInt1); 
        Console.WriteLine(
          "Thread:" + Thread.CurrentThread.ManagedThreadId + 
          " Query Count B:" + existingVal.QueryCount); 
        return existingVal; 
       }); 


      // After each run, every value here should be ++ the previous value 
      Console.WriteLine(
       "Thread:"+Thread.CurrentThread.ManagedThreadId + 
       ": Query Count returned:" + verify.QueryCount + 
       " eid:" + verify.aInt1 + " date:" + 
       verify.aDate1.Hour + " " + verify.aDate1.Second + 
       " NAME:" + verify.aStr2 
       ); 
     } 

    } 
} 

salida

Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System 

Thread:12 Query Count A:0 
Thread:13 Query Count A:1 
Thread:12 Query Count B:2 
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11 

Thread:12 Query Count A:2 
Thread:13 Query Count B:3 
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12 

Thread:13 Query Count A:3 
Thread:11 Query Count A:4 
Thread:11 Query Count B:5 
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14 

Thread:11 Query Count A:5 
Thread:13 Query Count B:6 
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15 

....

Thread:11 Query Count A:658 
Thread:11 Query Count B:659 
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658 

Thread:11 Query Count A:659 
Thread:11 Query Count B:660 
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659 

Thread:11 Query Count A:660 
Thread:11 Query Count B:661 
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660 

Thread:11 Query Count A:661 
Thread:11 Query Count B:662 
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661 

En este código "Eid" siempre debe ser más de 1.000 recuento de consultas, pero durante las iteraciones de la diferencia varía de 1 a 7 entre los dos. Esa incoherencia puede provocar que algunas aplicaciones fallen o denuncien datos incorrectos.

+0

'ConcurrentDictionary' es solo seguro para subprocesos con respecto a sus propios invariantes. es decir, no dañará sus propios datos. Si tiene otras invariantes, es imposible que las conozca de antemano o se espera que las compense. Debe definir coherentemente qué es su invariante y proteger como una transacción con algún tipo de sincronización de subprocesos. –

Respuesta

4

Esta presentación se basa en una comprensión incorrecta de los comentarios en la parte inferior del artículo “Cómo agregar y quitar elementos de una ConcurrentDictionaryhttp://msdn.microsoft.com/en-us/library/dd997369.aspx y en un error básico de concurrencia - concurrente modificación no atómica de una objeto compartido

Primero, aclaremos qué dice realmente el artículo vinculado. Usaré AddOrUpdate como ejemplo, pero el razonamiento para GetOrAdd es equivalente.

Digamos que llama AddOrUpdate desde varios subprocesos y especifica la misma clave. Supongamos que una entrada con esa clave ya existe. Aparecerá cada thread, observe que ya hay una entrada con la clave especificada y que la parte Update de AddOrUpdate es relevante. Al hacerlo, ningún hilo bloqueará el diccionario. En cambio, usará algunas instrucciones interconectadas para verificar atómicamente si existe una clave de entrada o no.

Por lo tanto, nuestros varios hilos notaron que la clave existe y que es necesario llamar a la función updateValueFactory. Ese delegado se pasa a AddOrUpdate; toma referencias de la clave y el valor existentes y devuelve el valor de actualización. Ahora, todos los hilos involucrados llamarán a la fábrica al mismo tiempo. Todos se completarán en un orden previamente desconocido y cada hilo intentará usar una operación atómica (usando instrucciones interconectadas) para reemplazar el valor existente con el valor que acaba de calcular. No hay forma de saber qué hilo "ganará". El hilo que gana obtendrá almacenar su valor calculado. Otros notarán que el valor en el diccionario ya no es el valor que se pasó a su updateValueFactory como argumento. En respuesta a esa realización, abandonarán la operación y descartarán el valor calculado.Esto es exactamente lo que quieres que suceda.

A continuación, vamos a aclarar por qué se obtiene valores extraños cuando se ejecuta el código de ejemplo que aparece aquí:

Recordemos que el delegado updateValueFactory pasado a AddOrUpdate se hace referencia a la clave y el valor actual y devuelve la actualización de valores. El ejemplo de código en su método AddOrUpdateWithoutRetrieving() comienza a realizar operaciones directamente desde esa referencia. En lugar de crear un nuevo valor de reemplazo y modificar ESO, modifica los valores de miembro de instancia de existingVal, un objeto que ya está en el diccionario, y luego simplemente devuelve esa referencia. Y lo hace de forma no atómica: lee algunos valores, actualiza algunos valores, lee más, actualiza más. Por supuesto, hemos visto anteriormente que esto sucede en varios hilos simultáneamente, todos modifican el MISMO objeto. No es de extrañar que el resultado sea que en cualquier momento (cuando el ejemplo de código llama a WriteLine), el objeto contiene valores de instancia de miembro que se originaron a partir de diferentes subprocesos.

El diccionario no tiene nada que ver con esto: el código simplemente modifica un objeto que se comparte entre subprocesos de forma no atómica. Este es uno de los errores de concurrencia más comunes. Las dos soluciones más comunes dependen del escenario. Utilice un bloqueo compartido para hacer que la modificación completa del objeto sea atómica o, primero, copie atómicamente el objeto completo y luego modifique la copia local.

En este último caso, trate de añadir esto a la clase TestData:

private Object _copyLock = null; 

private Object GetLock() { 

    if (_copyLock != null) 
     return _copyLock; 

    Object newLock = new Object(); 
    Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null); 
    return (prevLock == null) ? newLock : prevLock; 
} 

public TestData Copy() { 

    lock (GetLock()) { 
     TestData copy = new TestData(); 
     copy.aStr1 = this.aStr1; 
     copy.aStr2 = this.aStr2; 
     copy.aLong1 = this.aLong1; 
     copy.aInt1 = this.aInt1; 
     copy.QueryCount = this.QueryCount; 
     copy.aDate1 = this.aDate1; 
     copy.aDate2 = this.aDate2; 
     copy.zData = this.zData; 

     return copy; 
    } 
} 

A continuación, modifique la fábrica de la siguiente manera:

TestData verify = concurrentCache.AddOrUpdate(123, ci, 
    (key, existingVal) => 
    { 
     TestData newVal = existingVal.Copy(); 
     newVal.aStr2 = "test1" + newVal.QueryCount; 
     newVal.aDate1 = DateTime.MinValue; 
     Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count A:" + newVal.QueryCount); 
     Interlocked.Increment(ref newVal.QueryCount); 
     System.Random RandNum = new System.Random(); 
     int MyRandomNumber = RandNum.Next(1, 1000); 

     Thread.Sleep(MyRandomNumber); 
     newVal.aInt1++; 
     newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1); 
     Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count B:" + newVal.QueryCount); 
     return newVal; 
    }); 

espero que esto ayude.

3

Probablemente la forma correcta es no importar si el valor devuelto no es el creado por el valueFactory. Si esto no es aceptable, debe usar un candado.

+0

Vea el código adjunto ... Creo que se necesita un método de extensión C# para intentar ... vuelva a intentar la actualización hasta que sea consistente. Eso o incorporar un spinlock. No preocuparse no es la respuesta IMO – LamonteCristo

+0

Siempre que valueFactory no tenga efectos secundarios (lo que probablemente sea una buena idea), generalmente no debería importar. – erikkallen

2

No existe una protección general que siempre funcione. Pero una solución común es devolver Lazy<T> en lugar de T. De esta forma, crear perezosos innecesarios no causa ningún daño porque nunca se iniciará. Solo un Lazy hará que sea el valor final correspondiente a la clave. Solo se devolverá una instancia de Lazy particular.

+0

¿Sabes cómo puedo implementar esto en un método de extensión que resuelva el problema? – LamonteCristo

+0

http://blogs.msdn.com/b/pfxteam/archive/2010/04/23/10001621.aspx – usr

1

Puede usar this implementation de GetOrAdd del hombre mismo. Tenga en cuenta que incluso aquí se puede llamar a la fábrica sin que se agregue su resultado al diccionario. Pero obtendrías una indicación de lo que sucedió.

+0

Sí, una función de extensión que ajusta GetOrAdd es lo que estoy buscando. Tengo problemas para crear una sobrecarga que reintenta automáticamente GetOrAdd si se produce una corrupción. – LamonteCristo

Cuestiones relacionadas