2011-11-27 15 views
6

¿Cómo puedo hacer que la siguiente repetición observable hasta stream.DataAvailable sea falsa? Actualmente parece que nunca se detiene.Fast Repeat TakeWhile causa un bucle infinito

AsyncReadChunk y Observable.Return dentro de la sección Defer hacer OnNext call then OnCompleted call. Cuando Repeat recibe la llamada OnNext, la pasa a TakeWhile. Cuando TakeWhile no está satisfecho completa el observable, pero creo que el OnCompleted que viene justo después de OnNext es tan rápido que hace que Repeat se vuelva a suscribir a lo observable y cause el ciclo infinito.

¿Cómo puedo corregir este comportamiento?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
{ 
    return Observable.Defer(() => 
     { 
      try 
      { 
       return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]); 
      } 
      catch (Exception) 
      { 
       return Observable.Return(new byte[0]); 
      } 
     }) 
     .Repeat() 
     .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
} 
+4

Bien hecho para descubrir cómo resolver su problema, y ​​gracias por compartir su solución. Sin embargo, ¿podría publicar la solución a su pregunta como una respuesta en lugar de editar su pregunta? –

+0

Samet, he trasladado su respuesta automática de la pregunta a una respuesta separada, marcada como wiki de la comunidad. –

Respuesta

2

respuesta Cuenta: (continuación se muestra una respuesta Publicado por Samet, el autor de la pregunta Sin embargo, él fijó la respuesta como parte de la pregunta que me estoy moviendo en un aparte.. respuesta, marcando como wiki de la comunidad, ya que el autor no se ha movido por sí mismo.)


I descubierto por refactorización que se trata de un problema con los programadores. La devolución utiliza el planificador inmediato mientras que Repeat usa CurrentThread. El código fijo está abajo.

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize) 
    { 
     return Observable.Defer(() => 
            { 
             try 
             { 
              return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
             catch (Exception) 
             { 
              return Observable.Return(new byte[0], Scheduler.CurrentThread); 
             } 
            }) 
      .Repeat() 
      .TakeWhile((dataChunk, index) => dataChunk.Length > 0); 
    }