2011-06-30 19 views
7

Hay un par de artículos sobre esto, y tengo esto funcionando ... pero quiero saber cómo establecer un número máximo de subprocesos de tareas para mis suscripciones observables a la vez.Procesamiento de colas asíncronas con extensiones reactivas

Tengo el siguiente para paralelizar ahorro asíncrono de entradas de registro:

private BlockingCollection<ILogEntry> logEntryQueue; 

y

logEntryQueue = new BlockingCollection<ILogEntry>(); 
logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry); 

Para programar mi ahorro ... pero ¿cómo puedo especificar los hilos máximos para que el programador usar a la vez?

+0

Si está creando programadores únicamente para los límites en el máximo de subprocesos simultáneos, considere echarle un vistazo a TPL Dataflow. Fue construido específicamente para crear tuberías donde cada bloque en la tubería tiene diferentes límites a la concurrencia. Al menos era más comprensible y mantenible para mí cuando prototipaba ambos métodos. – yzorg

Respuesta

8

Esto no es una función del Observable, sino una función del Programador. El Observable define qué y el planificador define donde.

Tendría que pasar un programador personalizado. Una forma simple de hacer esto sería sub-clase TaskScheduler y anular la propiedad "MaximumConcurrencyLevel".

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

realidad me encontré con una muestra de este en MSDN:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

Editar: Usted preguntó acerca de cómo pasar de TaskScheduler a iScheduler. Otro desarrollador sólo me dio un poquito de información:

var ischedulerForRx = new TaskPoolScheduler 
(
    new TaskFactory 
    (
     //This is your custom scheduler 
     new LimitedConcurrencyLevelTaskScheduler(1) 
    ) 
); 
+1

Impresionante, gracias ... pero ¿cómo obtengo un IScheduler reactivo para mi nuevo TaskScheduler? No quiero volver a implementar toda la funcionalidad. – Jeff

2

Si crea su "trabajo" como IObservable<T> con ejecución diferida (. Es decir, que quieren hacer nada hasta suscrito a), se puede utilizar la sobrecarga de Merge que acepte una cantidad de suscripciones simultáneas máximas:

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize(); 

queue 
    .Select(item => StartWork(item)) 
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes 
    .Subscribe(); 

// To enqueue: 
synchronizedQueue.OnNext(new QueueItem()); 
+0

¿Cómo maneja esto la adición de otros subprocesos a la cola mientras se procesa la cola? ¿No sufriría la colección que se modifica durante la enumeración? – Jeff

+0

El operador 'Merge' es seguro para subprocesos, pero he actualizado mi respuesta para utilizar un tema seguro para subprocesos, por si acaso. –

+0

Supongo que es solo mi confusión aquí ... no es la seguridad del hilo lo que me preocupa, sino más bien que la cola está siendo modificada (otros elementos están en cola) mientras se enumera ... ¿Esto no es un problema? ? – Jeff

Cuestiones relacionadas