Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | 1x | import { Inject, Injectable, OnDestroy, SkipSelf } from '@angular/core'; import { BehaviorSubject, Observable, Subscription } from 'rxjs'; import { map, tap } from 'rxjs/operators'; import { HttpClient, HttpResponse } from '@angular/common/http'; import { WebsocketService } from '../websocket/websocket.service'; import { ConnectionStatus, ConnectionStatusWs } from '../../interfaces/websocket-backend.interfaces'; @Injectable() export abstract class WebsocketBackendService<T> extends WebsocketService implements OnDestroy { protected abstract pollingEndpoint: string; protected abstract pollingInterval: number; protected abstract wsChannelName: string; protected abstract initialData: T; data$: BehaviorSubject<T> | null = null; connectionStatus$: BehaviorSubject<ConnectionStatus> = new BehaviorSubject<ConnectionStatus>('initial'); private wsConnectionStatusSubscription: Subscription | null = null; private wsDataSubscription: Subscription | null = null; private pollingTimeoutId: number | null = null; protected connectionClosed = true; constructor( @Inject('BACKEND_URL') protected serverUrl: string, @SkipSelf() protected http: HttpClient ) { super(); } ngOnDestroy(): void { this.cutConnection(); } protected observeEndpointAndChannel(): Observable<T> { Iif (!this.data$) { this.data$ = new BehaviorSubject<T>(this.initialData); this.pollNext(); } return this.data$; } private pollNext(): void { this.connectionClosed = false; this.unsubscribeFromWebsocket(); this.connectionStatus$.next('polling-fetch'); this.http .get<T>(this.serverUrl + this.pollingEndpoint, { observe: 'response' }) .subscribe((response: HttpResponse<T>) => { Iif (!this.data$) { return; } Iif (!response.body) { return; } this.data$.next(response.body); if (response.headers.has('SubscribeURI')) { this.wsUrl = response.headers.get('SubscribeURI') as string; this.subScribeToWsChannel(); } else { this.connectionStatus$.next('polling-sleep'); this.scheduleNextPoll(); } }); } cutConnection(): void { this.unsubscribeFromWebsocket(); this.closeConnection(); Iif (this.pollingTimeoutId) { clearTimeout(this.pollingTimeoutId); this.pollingTimeoutId = null; } this.data$ = null; } private scheduleNextPoll(): void { Iif (this.pollingTimeoutId) { clearTimeout(this.pollingTimeoutId); } this.pollingTimeoutId = window.setTimeout( () => { Iif (!this.connectionClosed) { this.pollNext(); } }, this.pollingInterval ); } private unsubscribeFromWebsocket() { Iif (this.wsConnectionStatusSubscription) { this.wsConnectionStatusSubscription.unsubscribe(); this.wsConnectionStatusSubscription = null; } Iif (this.wsDataSubscription) { this.wsDataSubscription.unsubscribe(); this.wsDataSubscription = null; } } private subScribeToWsChannel() { Iif (this.wsDataSubscription) { return; } this.wsDataSubscription = this.getChannel<T>(this.wsChannelName) .subscribe((dataObject: T) => this.data$?.next(dataObject)); // subscribe only next, not complete! this.wsConnectionStatusSubscription = this.wsConnected$ .pipe( tap((wsConnected: ConnectionStatusWs) => { Iif (wsConnected === 'disconnected') { this.scheduleNextPoll(); } }), map((wsc: ConnectionStatusWs): ConnectionStatus => (wsc === 'connected' ? 'ws-online' : 'ws-offline')) ) .subscribe(this.connectionStatus$); } } |