The bottom of this article describe cómo usar GetOrAdd puede causar (si lo entiendo correctamente) resultados corruptos o inesperados.Evitar datos obsoletos (lógicamente corruptos) cuando se utiliza "ConcurrentDictionary.GetOrAdd()", código de reproducción incluido
recorte/
ConcurrentDictionary está diseñado para multiproceso escenarios. No tiene que usar bloqueos en su código para agregar o eliminar elementos de la colección. Sin embargo, siempre es posible que un subproceso recupere un valor y otro subproceso para actualizar inmediatamente la colección al dar a la misma clave un nuevo valor.
Además, aunque todos los métodos de ConcurrentDictionary son hilo de seguridad, no todos los métodos son atómicas, específicamente GetOrAdd y AddOrUpdate. El delegado de usuario que se pasa a estos métodos es invocado fuera del bloqueo interno del diccionario. (Esto se hace para evitar que el código desconocido de bloquear todas las discusiones.) Por lo tanto, es posible para esta secuencia de eventos que se produzca:
1) threadA llama GetOrAdd, no encuentra ningún elemento y crea un nuevo elemento para agregar por invocando el delegado valueFactory.
2) threadB llama GetOrAdd al mismo tiempo, su delegado valueFactory es invocado y que llega a la cerradura interna antes threadA, y así se le añade su nuevo par clave-valor al diccionario.
3) Delegado de usuario de threadA completa, y el hilo llega a la cerradura , pero ahora ve que el elemento ya existe
4) threadA realiza una "Get", y devuelve los datos que había previamente añadió por threadB.
Por lo tanto, no se garantiza que los datos devueltos por GetOrAdd sean los mismos datos creados por valueFactory del subproceso. Una secuencia similar de eventos puede ocurrir cuando se llama a AddOrUpdate .
Pregunta
Cuál es la forma correcta para verificar los datos y vuelva a intentar la actualización? Un buen enfoque sería un método de extensión para intentar/reintentar esta operación en función del contenido del valor anterior.
¿Cómo se implementaría esto? ¿Puedo confiar en el resultado (verify
) como estado final válido, o debo volver a intentar y recuperar los valores usando un método diferente?
Código
El siguiente código tiene una condición de carrera en la actualización de los valores. El comportamiento deseado es que AddOrUpdateWithoutRetrieving() incrementará varios valores de diferentes maneras (usando ++
o Interlocked.Increment()
).
También quiero realizar múltiples operaciones de campo en una sola unidad y reintentar la actualización si la actualización anterior no "tomó" debido a una condición de carrera.
Ejecute el código y verá que cada valor que aparece en la consola comienza incrementándose en uno, pero cada uno de los valores se desplazará y algunos tendrán algunas iteraciones adelante/atrás.
namespace DictionaryHowTo
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
// The type of the Value to store in the dictionary:
class FilterConcurrentDuplicate
{
// Create a new concurrent dictionary.
readonly ConcurrentDictionary<int, TestData> eventLogCache =
new ConcurrentDictionary<int, TestData>();
static void Main()
{
FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();
c.DoRace(null);
}
readonly ConcurrentDictionary<int, TestData> concurrentCache =
new ConcurrentDictionary<int, TestData>();
void DoRace(string[] args)
{
int max = 1000;
// Add some key/value pairs from multiple threads.
Task[] tasks = new Task[3];
tasks[0] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 500);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[1] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[2] = Task.Factory.StartNew(() =>
{
AddOrUpdateWithoutRetrieving();
});
// Output results so far.
Task.WaitAll(tasks);
AddOrUpdateWithoutRetrieving();
Console.WriteLine("Press any key.");
Console.ReadKey();
}
public class TestData : IEqualityComparer<TestData>
{
public string aStr1 { get; set; }
public Guid? aGud1 { get; set; }
public string aStr2 { get; set; }
public int aInt1 { get; set; }
public long? aLong1 { get; set; }
public DateTime aDate1 { get; set; }
public DateTime? aDate2 { get; set; }
//public int QueryCount { get; set; }
public int QueryCount = 0;//
public string zData { get; set; }
public bool Equals(TestData x, TestData y)
{
return x.aStr1 == y.aStr1 &&
x.aStr2 == y.aStr2 &&
x.aGud1 == y.aGud1 &&
x.aStr2 == y.aStr2 &&
x.aInt1 == y.aInt1 &&
x.aLong1 == y.aLong1 &&
x.aDate1 == y.aDate1 &&
x.QueryCount == y.QueryCount ;
}
public int GetHashCode(TestData obj)
{
TestData ci = (TestData)obj;
// http://stackoverflow.com/a/263416/328397
return
new {
A = ci.aStr1,
Aa = ci.aStr2,
B = ci.aGud1,
C = ci.aStr2,
D = ci.aInt1,
E = ci.aLong1,
F = ci.QueryCount ,
G = ci.aDate1}.GetHashCode();
}
}
private void AddOrUpdateWithoutRetrieving()
{
// Sometime later. We receive new data from some source.
TestData ci = new TestData()
{
aStr1 = "Austin",
aGud1 = new Guid(),
aStr2 = "System",
aLong1 = 100,
aInt1 = 1000,
QueryCount = 0,
aDate1 = DateTime.MinValue
};
TestData verify = concurrentCache.AddOrUpdate(123, ci,
(key, existingVal) =>
{
existingVal.aStr2 = "test1" + existingVal.QueryCount;
existingVal.aDate1 = DateTime.MinValue;
Console.WriteLine
("Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count A:" + existingVal.QueryCount);
Interlocked.Increment(ref existingVal.QueryCount);
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
existingVal.aInt1++;
existingVal.aDate1 =
existingVal.aDate1.AddSeconds
(existingVal.aInt1);
Console.WriteLine(
"Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count B:" + existingVal.QueryCount);
return existingVal;
});
// After each run, every value here should be ++ the previous value
Console.WriteLine(
"Thread:"+Thread.CurrentThread.ManagedThreadId +
": Query Count returned:" + verify.QueryCount +
" eid:" + verify.aInt1 + " date:" +
verify.aDate1.Hour + " " + verify.aDate1.Second +
" NAME:" + verify.aStr2
);
}
}
}
salida
Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System
Thread:12 Query Count A:0
Thread:13 Query Count A:1
Thread:12 Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11
Thread:12 Query Count A:2
Thread:13 Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12
Thread:13 Query Count A:3
Thread:11 Query Count A:4
Thread:11 Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14
Thread:11 Query Count A:5
Thread:13 Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15
....
Thread:11 Query Count A:658
Thread:11 Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658
Thread:11 Query Count A:659
Thread:11 Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659
Thread:11 Query Count A:660
Thread:11 Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660
Thread:11 Query Count A:661
Thread:11 Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661
En este código "Eid" siempre debe ser más de 1.000 recuento de consultas, pero durante las iteraciones de la diferencia varía de 1 a 7 entre los dos. Esa incoherencia puede provocar que algunas aplicaciones fallen o denuncien datos incorrectos.
'ConcurrentDictionary' es solo seguro para subprocesos con respecto a sus propios invariantes. es decir, no dañará sus propios datos. Si tiene otras invariantes, es imposible que las conozca de antemano o se espera que las compense. Debe definir coherentemente qué es su invariante y proteger como una transacción con algún tipo de sincronización de subprocesos. –