webentwicklung-frage-antwort-db.com.de

RxJava2 Observable Take wirft UndeliverableException

Wie ich verstehe, erstellt RxJava2 values.take(1) ein weiteres Observable, das nur ein Element aus dem ursprünglichen Observable enthält. Wobei MUSS NICHT eine Ausnahme auslösen, da diese durch den Effekt von take(1) herausgefiltert wird, als es als zweites passiert ist.

wie im folgenden Code-Snippet

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

Ausgabe

1
Completed
io.reactivex.exceptions.UndeliverableException: Java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.Java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.Java:83)
    at ch02.lambda$main$0(ch02.Java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.Java:40)
    at io.reactivex.Observable.subscribe(Observable.Java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.Java:30)
    at io.reactivex.Observable.subscribe(Observable.Java:10841)
    at io.reactivex.Observable.subscribe(Observable.Java:10827)
    at io.reactivex.Observable.subscribe(Observable.Java:10787)
    at ch02.main(ch02.Java:32)
Caused by: Java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: Java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.Java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.Java:83)
    at ch02.lambda$main$0(ch02.Java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.Java:40)
    at io.reactivex.Observable.subscribe(Observable.Java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.Java:30)
    at io.reactivex.Observable.subscribe(Observable.Java:10841)
    at io.reactivex.Observable.subscribe(Observable.Java:10827)
    at io.reactivex.Observable.subscribe(Observable.Java:10787)
    at ch02.main(ch02.Java:32)
Caused by: Java.lang.Exception: Oops
    ... 8 more

Meine Fragen :

  1. Verstehe ich es richtig?
  2. Was ist wirklich passiert, um die Ausnahme zu verursachen.
  3. Wie löst man das vom Verbraucher?
35
  1. Ja, aber da das beobachtbare 'ends' nicht bedeutet, dass der Code, der in create(...) ausgeführt wird, gestoppt ist. Um in diesem Fall völlig sicher zu sein, müssen Sie o.isDisposed() verwenden, um zu sehen, ob die Observable stromabwärts geendet hat.
  2. Die Ausnahme ist da, weil RxJava 2 die Richtlinie hat, NIEMALS zuzulassen, dass ein onError -Aufruf verloren geht. Es wird entweder nachgelagert geliefert oder als globales UndeliverableException geworfen, wenn die Observable bereits beendet wurde. Es ist Sache des Erstellers der Observable, den Fall, in dem die Observable beendet wurde und eine Exception auftritt, 'richtig' zu behandeln.
  3. Das Problem ist, dass der Produzent (Observable) und der Konsument (Subscriber) sich nicht einig sind, wann der Stream endet. Da der Produzent in diesem Fall den Konsumenten überlebt, kann das Problem nur im Produzenten behoben werden.
51
Kiskae

@ Kiskae im vorherigen Kommentar richtig beantwortet über den Grund, warum eine solche Ausnahme auftreten kann.

Hier der Link zum offiziellen Dokument zu diesem Thema: RxJava2-wiki .

Manchmal können Sie dieses Verhalten nicht ändern, sodass es eine Möglichkeit gibt, mit diesen UndeliverableException umzugehen. Hier ist ein Codeausschnitt, wie Sie Abstürze und Fehlverhalten vermeiden können:

RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

Dieser Code stammt aus dem obigen Link.

Wichtige Notiz. Dieser Ansatz setzt die globale Fehlerbehandlungsroutine auf RxJava. Wenn Sie also diese Ausnahmen beseitigen können, ist dies die bessere Option.

11
Ilia Kurtov