2011-11-09 14 views
5

Estoy tratando de hacer un IObservable<bool> que devuelve true si se ha recibido un mensaje UDP en los últimos 5 segundos y si se produce un tiempo de espera, se devuelve un falso.Extensiones reactivas ¿Tiempo de espera que no detiene la secuencia?

Hasta ahora tengo esto:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    var udp = BaseComms.UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Select(s => true); 

    return Observable 
     .Timeout(udp, TimeSpan.FromSeconds(5)) 
     .Catch(Observable.Return(false)); 
} 

Los problemas con esto son: -

  • Una vez que se devuelve una falsa, la secuencia se detiene
  • lo único que realmente necesitan true o false en cambios de estado.

que podría utilizar un Subject<T> pero hay que tener cuidado para disponer de la UDPBaseStringListener observable cuando no hay más suscriptores.

actualización

Cada vez que sale un mensaje UDP me gustaría que vuelva un true. Si no he recibido un mensaje UDP en los últimos 5 segundos, me gustaría que devuelva un false.

+0

FYI, 'Timeout' tiene una sobrecarga que lleva un suplente observables para cuando llegue el momento se produce el tiempo de espera en lugar de "lanzar" y necesita 'Catch'. –

+0

Los lectores también pueden estar interesados ​​en [1] (http://stackoverflow.com/q/23394441/1267663), [2] (http://stackoverflow.com/q/12786901/1267663) y [3] (http://stackoverflow.com/q/35873244/1267663). – Whymarrh

Respuesta

3

Como se señala en Bj0, la solución con BufferWithTime no devolverá el punto de datos tan pronto como se reciba y el tiempo de espera del búfer no se restablecerá después de recibir un punto de datos.

con extensiones 2.0 Rx, su puede resolver ambos problemas con un nuevo buffer sobrecarga de aceptar tanto un tiempo de espera y una talla:

static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return BaseComms 
     .UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Buffer(TimeSpan.FromSeconds(5), 1) 
     .Select(s => s.Count > 0) 
     .DistinctUntilChanged(); 
} 
1

Sugeriría evitar el uso de Timeout - causa excepciones y la codificación con excepciones es mala.

Además, parece tener sentido que su observable se detenga después de un valor. Es posible que necesite explicar más sobre lo que quiere que sea el comportamiento.

Mi solución actual a su problema es:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return Observable.Create<bool>(o => 
    { 
     var subject = new AsyncSubject<bool>(); 
     return new CompositeDisposable(
      Observable.Amb(
       BaseComms 
        .UDPBaseStringListener(localEP) 
        .Where(msg => msg.Data.Contains("running")) 
        .Select(s => true), 
       Observable 
        .Timer(TimeSpan.FromMilliseconds(10.0)) 
        .Select(_ => false) 
      ).Take(1).Subscribe(subject), subject.Subscribe(o)); 
    }); 
} 

ayuda eso?

+0

Esto no se compila. Observable.Create espera que se devuelva una Acción ... – Tim

+0

'Timeout' solo causa excepciones por defecto. Tiene una sobrecarga que toma un observable alternativo para ser utilizado si se alcanza el tiempo de espera, que la sobrecarga por defecto llama con 'Observable.Trow 'como el alternativo. –

+0

@Jim - Debe estar usando una versión diferente de Rx - compiló muy bien con v1.1.10621. – Enigmativity

1

Si no desea que la secuencia de parar, simplemente se envuelve en Defer + Repetir:

Observable.Defer(() => GettingUDPMessages(endpoint) 
    .Repeat(); 
2

El problema con el tampón es que el intervalo de "tiempo de espera" no consigue restablecer cuando se obtiene una nuevo valor, las ventanas del búfer son solo porciones de tiempo (5 en este caso) que se suceden. Esto significa que, dependiendo de cuándo reciba su último valor, es posible que deba esperar casi el doble del valor de tiempo de espera. Esto también puede pasar por alto los tiempos de espera:

   should timeout here 
         v 
0s   5s   10s  15s 
|x - x - x | x - - - - | - - - x -| ... 
      true  true  true 

IObservable.Throttle, sin embargo, se reinicia cada vez que un nuevo valor entra y sólo produce un valor después de que haya transcurrido el intervalo de tiempo (el último valor de entrada). Esto se puede utilizar como un tiempo de espera y se fusionó con el IObservable para insertar valores "tiempo de espera" en la corriente:

var obs = BaseComms.UDPBaseStringListener(localEP) 
      .Where(msg => msg.Data.Contains("running")); 

return obs.Merge(obs.Throttle(TimeSpan.FromSeconds(5)) 
         .Select(x => false)) 
      .DistinctUntilChanged(); 

Un ejemplo LINQPad trabajo:

var sub = new Subject<int>(); 

var script = sub.Timestamp() 
    .Merge(sub.Throttle(TimeSpan.FromSeconds(2)).Select(i => -1).Timestamp()) 
    .Subscribe(x => 
{ 
    x.Dump("val"); 
}); 


Thread.Sleep(1000); 

sub.OnNext(1); 
sub.OnNext(2); 

Thread.Sleep(10000); 

sub.OnNext(5); 

A -1 se inserta en la corriente después de un tiempo de espera de 2 segundos.

Cuestiones relacionadas