import { Injectable } from "@angular/core";
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { catchError, tap,map } from 'rxjs/operators';
import { EMPTY, Subject } from 'rxjs';
import { AppConfigService } from "./app-config.service";
import { RxStompConfig } from '@stomp/rx-stomp';
import { RxStomp } from '@stomp/rx-stomp';

@Injectable({
    providedIn: "root",
})
export class WebSocketApiService {
    private apiUrl = `${AppConfigService.env.api_endpoint}api/public/inframob/web-socket/ws/notifications`;
    private topic = '/topic/notifications';
    private messageMapping = 'notificationsWS';
    private messagesSubject$ = new Subject();
    private socket$: WebSocketSubject<any>;
    private myRxStompConfig: RxStompConfig;
    private rxStomp: RxStomp;
    constructor() {
        this.myRxStompConfig = {
            // Which server?
            brokerURL: `${AppConfigService.env.ws_endpoint}api/public/ws/notifications`,
            // brokerURL: 'ws://localhost:8080/api/public/ws/notifications',

            // Headers
            // Typical keys: login, passcode, host
           /* connectHeaders: {
                login: 'guest',
                passcode: 'guest',
            },*/

            // How often to heartbeat?
            // Interval in milliseconds, set to 0 to disable
            heartbeatIncoming: 0, // Typical value 0 - disabled
            heartbeatOutgoing: 20000, // Typical value 20000 - every 20 seconds

            // Wait in milliseconds before attempting auto reconnect
            // Set to 0 to disable
            // Typical value 500 (500 milli seconds)
            reconnectDelay: 2000,

            // Will log diagnostics on console
            // It can be quite verbose, not recommended in production
            // Skip this key to stop logging to console
            debug: (msg: string): void => {
                console.log(new Date(), msg);
            },
        };

    }

    public connect() {
        if (!this.rxStomp || !this.rxStomp.active){
            this.rxStomp = new RxStomp();
            
            this.rxStomp.configure(this.myRxStompConfig);
            this.rxStomp.activate();
        }
        return this.rxStomp;
    }

    public dataUpdates$(){
        return this.connect().watch('/topic/notifications').pipe(map(function (message) {
            console.log(message.body);
            return typeof message.body =='string'?message.body: JSON.parse(message.body);
          }));
    }
    public closeConnection(){
        this.connect().deactivate();
    }

}
