8

Soy un poco nuevo en Rx.NET. ¿Es posible atrapar una excepción que puede ser lanzada por cualquiera de los suscriptores? Tome lo siguiente ...Captura de excepciones que pueden arrojarse desde una suscripción OnNext Acción

handler.FooStream.Subscribe(
      _ => throw new Exception("Bar"), 
      _ => { }); 

Actualmente estoy contactando por suscripción con una instancia de lo siguiente. La implementación de la cual solo usa un ManualResetEvent para activar un hilo en espera.

public interface IExceptionCatcher 
{ 
    Action<T> Exec<T>(Action<T> action); 
} 

y usarlo como tal ...

handler.FooStream.Subscribe(
      _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred 
      _ => { }); 

siento que tiene que haber alguna manera mejor. ¿Están todas las capacidades de manejo de errores en Rx.NET específicamente para tratar con errores observables?

EDITAR: Por solicitud, mi implementación es https://gist.github.com/1409829 (interfaz e implementación separadas en diferentes ensamblajes en código prod). Comentarios son bienvenidos. Esto puede parecer tonto, pero estoy usando Castle Windsor para administrar muchos suscriptores Rx diferentes. Este receptor excepción se ha registrado en el contenedor como este

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher)); 

A continuación, se utilizaría como este donde observable es instancia de IObservable:

var exceptionCatcher = 
    new ExceptionCatcher(e => 
           { 
            Logger.FatalException(
             "Exception caught, shutting down.", e); 
            // Deal with unmanaged resources here 
           }, false); 


/* 
* Normally the code below exists in some class managed by an IoC container. 
* 'catcher' would be provided by the container. 
*/ 
observable /* do some filtering, selecting, grouping etc */ 
    .SubscribeWithExceptionCatching(processItems, catcher); 

Respuesta

8

El built-in operadores observables no hace lo que está preguntando por defecto (al igual que los eventos), pero podría hacer un método de extensión que haría esto.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
           this IObservable<T> source 
           ) where TException : Exception 
{ 
    return Observable.CreateWithDisposable<T>(
     o => source.Subscribe(
      v => { try { o.OnNext(v); } 
        catch (TException) { } 
      }, 
      ex => o.OnError(ex), 
      () => o.OnCompleted() 
      )); 
} 

Entonces cualquier observable podría ser envuelto por este método para obtener el comportamiento que describió.

+0

Gracias, usted respondió mi pregunta, pero ¿está seguro de que su intento/captura OnNext detectará excepciones? Uno podría hacer algo fácilmente con el IObservable devuelto que causaría que el código suscrito se ejecutara en otro hilo. Originalmente traté de probar/atrapar mi Subject.OnNext llamada pero las excepciones no fueron detectadas. Sin embargo, podría crear un método SubscribeWithExceptionHandling o algo así. – drstevens

+1

@drstevens Capturará excepciones del mismo hilo. Si su observador está lanzando operaciones asincrónicas propias que generan excepciones, esto no las detectará. –

+1

Bueno, considerando cuántas de las operaciones de Rx resultan en un nuevo hilo (o tarea en el grupo), diría que es muy probable que así sea. Entre el 'handler.FooStream' y' Subscribe' en cuestión hay un 'GroupByUntil (...). SelectMany (...). Buffer (con un tiempo)'. De hecho, terminé creando un 'SubscribeWithCatch' siguiendo su ejemplo que capta Exception y luego utiliza la misma acción que se pasa al manejador OnError. – drstevens

Cuestiones relacionadas