All files / src/app/shared/services/websocket websocket.service.ts

12.5% Statements 4/32
0% Branches 0/6
6.25% Functions 1/16
12.5% Lines 4/32

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92                              1x 1x 1x 1x                                                                                                                                                  
/* eslint-disable @typescript-eslint/no-explicit-any */
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
  BehaviorSubject, Observable, of, Subscription
} from 'rxjs';
import { catchError, map, share } from 'rxjs/operators';
import { WebSocketMessage } from 'rxjs/internal/observable/dom/WebSocketSubject';
import { ConnectionStatusWs } from '../../interfaces/websocket-backend.interfaces';
 
interface WsMessage {
  event: string;
  data: any;
}
 
export class WebsocketService {
  protected wsUrl = '';
  private wsSubject$: WebSocketSubject<any> | null = null;
  wsConnected$ = new BehaviorSubject<ConnectionStatusWs>('connecting');
  private wsSubscription: Subscription | null = null;
 
  connect(): void {
    Iif (!this.wsSubject$) {
      this.wsConnected$.next('connecting');
      this.wsSubject$ = webSocket({
        deserializer(event: MessageEvent): any {
          return JSON.parse(event.data);
        },
        serializer(value: any): WebSocketMessage {
          return JSON.stringify(value);
        },
        openObserver: {
          next: () => {
            this.wsConnected$.next('connected');
          }
        },
        url: this.wsUrl
      });
 
      this.wsSubscription = this.wsSubject$
        .subscribe({
          next: () => {
          },
          error: () => {
            this.wsConnected$.next('disconnected');
          },
          complete: () => {
            this.closeConnection();
          }
        });
    }
  }
 
  protected closeConnection(): void {
    this.wsConnected$.next('disconnected');
    Iif (this.wsSubscription) {
      this.wsSubscription.unsubscribe();
    }
    Iif (this.wsSubject$) {
      this.wsSubject$.complete();
      this.wsSubject$ = null;
    }
  }
 
  send(event: string, data: any): void {
    Iif (!this.wsSubject$) {
      this.connect();
    }
 
    this.wsSubject$?.next({ event, data });
  }
 
  getChannel<T>(channelName: string): Observable<T> {
    Iif (!this.wsSubject$) {
      this.connect();
    }
    Iif (!this.wsSubject$) {
      // should never happen, but typescript wants it
      throw new Error('Websocket connection failed and fallback as well.');
    }
    return this.wsSubject$.multiplex(
      () => ({ event: `subscribe:${channelName}` }),
      () => ({ event: `unsubscribe:${channelName}` }),
      message => (message.event === channelName)
    )
      .pipe(
        catchError(() => of()),
        map((event: WsMessage): T => event.data),
        share()
      );
  }
}