Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | 1x 1x 1x 1x | /* eslint-disable @typescript-eslint/no-explicit-any */ import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { BehaviorSubject, Observable, of, Subscription } from 'rxjs'; import { catchError, map, share } from 'rxjs/operators'; import { WebSocketMessage } from 'rxjs/internal/observable/dom/WebSocketSubject'; import { ConnectionStatusWs } from '../../interfaces/websocket-backend.interfaces'; interface WsMessage { event: string; data: any; } export class WebsocketService { protected wsUrl = ''; private wsSubject$: WebSocketSubject<any> | null = null; wsConnected$ = new BehaviorSubject<ConnectionStatusWs>('connecting'); private wsSubscription: Subscription | null = null; connect(): void { Iif (!this.wsSubject$) { this.wsConnected$.next('connecting'); this.wsSubject$ = webSocket({ deserializer(event: MessageEvent): any { return JSON.parse(event.data); }, serializer(value: any): WebSocketMessage { return JSON.stringify(value); }, openObserver: { next: () => { this.wsConnected$.next('connected'); } }, url: this.wsUrl }); this.wsSubscription = this.wsSubject$ .subscribe({ next: () => { }, error: () => { this.wsConnected$.next('disconnected'); }, complete: () => { this.closeConnection(); } }); } } protected closeConnection(): void { this.wsConnected$.next('disconnected'); Iif (this.wsSubscription) { this.wsSubscription.unsubscribe(); } Iif (this.wsSubject$) { this.wsSubject$.complete(); this.wsSubject$ = null; } } send(event: string, data: any): void { Iif (!this.wsSubject$) { this.connect(); } this.wsSubject$?.next({ event, data }); } getChannel<T>(channelName: string): Observable<T> { Iif (!this.wsSubject$) { this.connect(); } Iif (!this.wsSubject$) { // should never happen, but typescript wants it throw new Error('Websocket connection failed and fallback as well.'); } return this.wsSubject$.multiplex( () => ({ event: `subscribe:${channelName}` }), () => ({ event: `unsubscribe:${channelName}` }), message => (message.event === channelName) ) .pipe( catchError(() => of()), map((event: WsMessage): T => event.data), share() ); } } |