webentwicklung-frage-antwort-db.com.de

RxJs Mit WebSocket beobachtbar

Meine angular Anwendung verwendet einen Websocket, um mit dem Backend zu kommunizieren.

In meinem Testfall habe ich 2 Clientkomponenten. Der Observable-Timer druckt erwartungsgemäß zwei verschiedene Client-IDs.

Jedes ngOnInit () gibt auch die ID seines Clients aus.

JETZT wird aus irgendeinem Grund das Abonnement von websocketService.observeClient () für jede Nachricht zweimal aufgerufen, aber this.client.id gibt immer den Wert des zweiten Mandanten aus.

Hier ist meine Client-Komponente

@Component({
...
})
export class ClientComponent implements OnInit {

  @Input() client: Client;

  constructor(public websocketService: WebsocketService) {
    Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id));
  }

  ngOnInit() {

    console.log(this.client.id);
    this.websocketService.observeClient().subscribe(data => {
      console.log('message', this.client.id);
    });

  }

}

Und mein Websocket-Service

@Injectable()
export class WebsocketService {

  private observable: Observable<MessageEvent>;
  private observer: Subject<Message>;

  constructor() {

    const socket = new WebSocket('ws://localhost:9091');

    this.observable = Observable.create(
      (observer: Observer<MessageEvent>) => {
        socket.onmessage = observer.next.bind(observer);
        socket.onerror = observer.error.bind(observer);
        socket.onclose = observer.complete.bind(observer);
        return socket.close.bind(socket);
      }
    );

    this.observer = Subject.create({
      next: (data: Message) => {
        if (socket.readyState === WebSocket.OPEN) {
          socket.send(JSON.stringify(data));
        }
      }
    });

  }

  observeClient(): Observable<MessageEvent> {
    return this.observable;
  }

}

Bearbeiten

Ok, soweit ich das gelesen habe, hat es damit zu tun, dass Observables Unicast-Objekte sind und ich dafür einen Betreff verwenden muss, aber ich weiß nicht, wie ich den Betreff erstellen soll.

11
Pascal

Ab rxjs 5 können Sie die integrierte Websocket-Funktion verwenden, mit der das Thema für Sie erstellt wird. Die Verbindung wird auch wiederhergestellt, wenn Sie den Stream nach einem Fehler erneut abonnieren. Bitte beziehen Sie sich auf diese Antwort:

https://stackoverflow.com/a/44067972/5522

TLDR:

 let subject = Observable.webSocket('ws://localhost:8081');
 subject
   .retry()
   .subscribe(
      (msg) => console.log('message received: ' + msg),
      (err) => console.log(err),
      () => console.log('complete')
    );
 subject.next(JSON.stringify({ op: 'hello' }));
20
Herman

Nun, ich habe lange mit den Websockets mit angular und python gelitten. Ich hatte einige Websockets für jede Komponente und sie wurden nicht geschlossen, also alle Als ich durch einen Tab zum nächsten ging (zwischen den Komponenten wechselte), wurde der Websocket im Hintergrund neu erstellt und ich erhielt zweimal oder mehrmals den gleichen.

Es gibt viele Tutorials, die erklären, wie man Websockets macht, aber nicht viele, wie man schließt oder wie man mehrere davon hat.

Hier mein Stück Brot: Bestes Tuto, das ich bisher gefunden habe: https://medium.com/@lwojciechowski/websockets-with-angular2-and-rxjs-8b6c5be02fac Aber noch nicht vollständig. Es schließt den WS nicht richtig

Folgendes habe ich getan: (Meine Klasse WebSocketTestService)

@Injectable()
export class WebSocketTestService {
    public messages: Subject<any>  = new Subject<any>();

    constructor(private wsService: WebSocketService) {
        console.log('constructyor ws synop service')
    }

    public connect() {
        this.messages = <Subject<any>>this.wsService
            .connect('ws://localhost:8000/mytestwsid')
            .map((response: any): any => {
                return JSON.parse(response.data);
            })
    }

    public close() {
        this.wsService.close()
    }
} // end class 

Und hier (in einer anderen Datei für mich) die Websocket-Klasse

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';
import {Subject} from 'rxjs/Subject';
import {Observer} from "rxjs/Observer";

@Injectable()
export class WebSocketService {
    private subject: Subject<MessageEvent>;
    private subjectData: Subject<number>;
  private ws: any;
    // For chat box
    public connect(url: string): Subject<MessageEvent> {
        if (!this.subject) {
            this.subject = this.create(url);
        }
        return this.subject;
    }

    private create(url: string): Subject<MessageEvent> {
        this.ws = new WebSocket(url);

        let observable = Observable.create(
            (obs: Observer<MessageEvent>) => {
                this.ws.onmessage = obs.next.bind(obs);
                this.ws.onerror   = obs.error.bind(obs);
                this.ws.onclose   = obs.complete.bind(obs);

                return this.ws.close.bind(this.ws);
            });

        let observer = {
            next: (data: Object) => {
                if (this.ws.readyState === WebSocket.OPEN) {
                    this.ws.send(JSON.stringify(data));
                }
            }
        };

        return Subject.create(observer, observable);
  }

  public close() {
    console.log('on closing WS');
    this.ws.close()
    this.subject = null
  }

} // end class WebSocketService

Und zum Schluss mein Aufruf an den WSTestService in der Komponente

this.webSocketTestService.connect();
this.webSocketTestService.messages.subscribe(message => {
      console.log(message);

})

und im wichtigsten Teil das ngOnDestroy ()

ngOnDestroy() {
    this.webSocketTestService.messages.unsubscribe();
    this.webSocketTestService.close()

Natürlich fügen Sie beide Dienste zum Abschnitt "Anbieter" im app.module hinzu

Es ist die einzige Möglichkeit, das Websocket beim Wechseln zwischen Ansichten ordnungsgemäß zu schließen und Komponenten zu zerstören. Ich bin mir nicht sicher, ob es Ihr Fall ist, aber es fällt mir schwer, ein Beispiel zu finden, das mit mehreren Ansichten und Websockets richtig funktioniert. Ich hatte ähnliche Probleme wie die, die Sie gefragt haben, also hoffe ich, dass es für Sie funktioniert. Lassen Sie mich wissen, ob mein Ansatz für Sie funktioniert.

Was ich wirklich nicht verstehe, ist, warum Angular ruft jedes ngOnInit jeder Komponente auf, auch wenn sie beim Start der App nicht angezeigt wird. Ich möchte die WS nur erstellen, wenn ich in die Ansicht eintrete , nicht wenn ich die App starte. Wenn Sie eine Lösung dafür finden, lassen Sie es mich wissen.

viel Glück!

3
Javier

Sie müssen den Operator share verwenden, um ihn für die Abonnenten freizugeben.

this.observable = Observable.create(
    (observer: Observer<MessageEvent>) => {
       socket.onmessage = observer.next.bind(observer);
       socket.onerror = observer.error.bind(observer);
       socket.onclose = observer.complete.bind(observer);
       return socket.close.bind(socket);
    }
).share();

Stellen Sie außerdem sicher, dass dieser Dienst ein Singleton ist.

Doc: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/share.md

1
eko