ActiveMQ StompJs 연결및 사용 예시 (6) - OhMinsSup/tip-review GitHub Wiki

const Stomp, { Client } = require('@stomp/stompjs')

class ActiveMQClient {
  constructor() {
    this.url = '';
    this.idx = '';
    this.socketRoomId = '';

    this.debug = false;
    this.reconnect = 0;
    this.connecting = false;

    this.received = false;
    this.callback = undefined;

    /** @type {Client} */
    this.activeMQClient = null;
    this.activeMQType = 'CONNECTED';
  }

  open(url, idx, debug, socketRoomId, activeMQType, callback) {
    if (![url, idx].every(Boolean)) {
      throw new Error('"ActiveMQClient Creational Fail, check to arguments.')
    }

    const stompOptions = {
      brokerURL: url,
      connectHeaders: {
        login: '',
        password: '',   
        messageType: '',       
      },
      reconnectDelay: 5 * 1000,
      // heartbeat 전송/수신의 interval 을 설정할 수 있는데 기본값은 10 초이다. 
      // heartbeats 이 활성화 되어 있다면, client 는 일반적으로 10초 안에 broker가 
      // 응답하지 않음을 알게 될 것이다. Clients 는 스스로 reconnect logic 을 구현해야할 필요가 있다.
      heartbeatIncoming: 15000,
      heartbeatOutgoing: 15000,
    }
    
    if (debug) {
      stompOptions.debug = (msg) => {
        console.log(`ActiveMQClient DEBUG => ${message}`);
      };
    }

    this.url = url;
    this.idx = idx;
    this.socketRoomId = socketRoomId;
    this.debug = debug || false;
    if (activeMQType) {
      stompOptions.connectHeaders.messageType = activeMQType;
      this.activeMQClient = activeMQType;
    }
    if (callback) this.callback = callback;

    this.activeMQClient = new Stomp.Client(stompOptions);
    this.connect();
   }

  async connect() {
    // return Promise.resolve => then의 result 값으로 반환
    if (this.activeMQClient.connected) return Promise.resolve();
    // 연결이 된 후 2초후 값을 반환
    if (this.connecting) return new Promise(resolve => setTimeout(resolve, 200));
    this.connecting = true;
    return new Promise((resolve, reject) => {
      // activeMQ에 연결전에 호출되는 함수로 여기서
      // Disconnect를 호출해야 안정적으로 연결해제
      this.activeMQClient.beforeConnect = () => {
        if (this.reconnect++ <= 3) {
          this.onMessageLogger(`BEFORE_CONNECT => reconnect ${this.reconnect}`);
        } else {
          this.onDisconnect();
        }
      }

      // reconnect를 포함해서 연결할때 마다 onConnect를 호출
      // 모든 연결을 다시 구독해야함
      this.activeMQClient.onConnect = (frame) => {
        this.onConnect(frame);
        resolve();
      }

      this.activeMQClient.onStompError = (frame) => {
        this.onError(frame);
        reject();
      }
    
      // reconnectDelay에 따라 연결이 끊어지면 계속 다시 연결을 시도
      this.activeMQClient.activate();
    });
  }

  onConnect(frame) {
    this.reconnect = 0;
    this.connecting = false;
    this.onSubscribe();
  }

  onError(frame) {
    // 에러를 발생하면 브로커(서버)와의 연결을 종료 시킴
    console.error('ActiveMQ ERROR REPORTED =>' + frame.headers['message']);
    console.error('ActiveMQ ERROR DETAILS => ' + frame.body);
    this.connecting = false;
  }

  onDisconnect() {
    // reconnect를 해제하는 함수
    this.activeMQClient.deactivate();
    // 실제로 연결된 경우에만 disconnect
    if (this.activeMQClient.connected) {
      this.activeMQClient.onDisconnect();
    }
  }

  onMessageLogger(status, message) {
    if (this.debug) {
      console.info(`${status} => ${message}`);
    }
  }

  onSubscribe() {
    // subscribe 구독 
    // publish 메세지 전달
    switch (this.activeMQType) {
      case "A":
        this.activeMQClient.subscribe(``, (message) => {
          this.received = true;
          this.onRender(message.body);
        });

        this.activeMQClient.publish({
          destination: '',
          body: JSON.stringify({
            type: this.activeMQType,
            serialNo: this.idx,
          }),
        });
        break;
      case "B":
        this.activeMQClient.subscribe(``, (message) => {
          this.received = true;
          this.onRender(message.body);
        });

        this.activeMQClient.publish({
          destination: '',
          body: JSON.stringify({
            type: this.activeMQType,
            serialNo: this.idx,
          }),
        });
        break;
      case "C":
        this.activeMQClient.subscribe(``, (message) => {
          this.received = true;
          this.onRender(message.body);
        });  

        this.activeMQClient.publish({
          destination: '',
          body: JSON.stringify({
            type: this.activeMQType,
            serialNo: this.idx,
          }),
        });
        break;
      default:
        this.activeMQClient.subscribe(``, (message) => {
          this.received = true;
          this.onRender(message.body);
        });  

        this.activeMQClient.publish({
          destination: '',
          body: JSON.stringify({
            type: this.activeMQType,
            socketRoomId: this.socketRoomId,
            concertGateIdx: this.idx,
          }),
        });
        break;
    }
  }

  onRender(message) {
    const msg = JSON.parse(message);
    switch (msg.type) {
      case 'A':
        this.onMessageLogger(...msg);
        this.onARender(message);
        break;
      case 'B':
        this.onMessageLogger(...msg);
        this.onBRender(message);
        break;
      case 'C':
        this.onMessageLogger(...msg);
        this.onCRender(message);
        break;
      case 'D':
        this.onMessageLogger(...msg);
        this.onDRender(message);
        break;
      case 'F':
        this.onMessageLogger(...msg);
        this.onERender(message);
        break;
      default:
        break;  
    }
  }

  onARender(message) {}

  onBRender(message) {}

  onCRender(message) {}

  onDRender(message) {}

  onERender(message) {}
}

export default ActiveMQClient;