2012-06-07 9 views
10

Tengo una aplicación que en algunos puntos genera 1000 eventos casi al mismo tiempo. Lo que me gustaría hacer es agrupar los eventos en fragmentos de 50 elementos y comenzar a procesarlos cada 10 segundos. No es necesario esperar a que se complete un lote antes de comenzar un nuevo procesamiento por lotes.Extensiones reactivas: Procesar eventos en lotes + agregar retraso entre cada lote

Por ejemplo:

10:00:00: 10000 new events received 
10:00:00: StartProcessing (events.Take(50)) 
10:00:10: StartProcessing (events.Skip(50).Take(50)) 
10:00:15: StartProcessing (events.Skip(100).Take(50)) 

Alguna idea de cómo lograr esto? Supongo que Reactive Extensions es el camino a seguir, pero también son aceptables otras soluciones.

traté de empezar desde aquí:

 var bufferedItems = eventAsObservable 
      .Buffer(15) 
      .Delay(TimeSpan.FromSeconds(5) 

Pero se dio cuenta de que el retraso no funcionó como yo esperaba y en su lugar todos los lotes inicia simultáneamente, aunque retraso de 5 segundos.

También probé el método de ventana, pero no noté ninguna diferencia en el comportamiento. Supongo que el TimeSpan en realidad significa que la ventana "tomar cada evento que se produce en los próximos 10 segundos:.

 var bufferedItems = eventAsObservable 
      .Window(TimeSpan.FromSeconds(10), 5) 
      .SelectMany(x => x) 
      .Subscribe(DoProcessing); 

estoy usando el Rx-Main 2.0.20304-beta

Respuesta

20

Si prefiere no dormir hilos, usted puede hacer esto:

var tick = Observable.Interval(TimeSpan.FromSeconds(5)); 

eventAsObservable 
.Buffer(50) 
.Zip(tick, (res, _) => res) 
.Subscribe(DoProcessing); 
+1

¡Ojalá pudiera haber hablado más de una vez! Solo pasé tres horas tratando de descubrirlo. –

1

Prueba esto:

var bufferedItems = eventAsObservable 
     .Buffer(50) 
     .Do(_ => { Thread.Sleep(10000); }); 
Cuestiones relacionadas