import { MqttClient } from 'mqtt';
import PeerConnectionWithSignalling from './peerConnection';

const KEEPALIVE_PERIOD_MS = 5 * 1000;

/** Once in the reconnection state, reconnect every this number of milliseconds */
const RECONNECT_PERIOD_MS = 1000;

type KeepaliveMessage = { id: string; timestamp: number };
class MqttReconnector {
	private publishTopic: string = '';
	private subscribeTopic: string = '';
	constructor(
		private mqttClient: MqttClient,
		private peerConnection: PeerConnectionWithSignalling
	) {}

	private lastSentKeepalive: KeepaliveMessage = { id: '', timestamp: Date.now() };
	private lastReceivedKeepalive: KeepaliveMessage = { id: '', timestamp: Date.now() };
	private onKeepaliveMessage = (message_str: string) => {
		try {
			const message: KeepaliveMessage = JSON.parse(message_str);
			if (this.lastSentKeepalive.id === message.id) {
				this.lastReceivedKeepalive = message;
				// console.debug('KEEPALIVE', this.lastReceivedKeepalive);
			}
		} catch (error) {
			console.error(`Failed to parse keepalive message '${message_str}'`, error);
		}
	};

	private checkKeepaliveTimeoutId: ReturnType<typeof setTimeout> | null = null;
	private sendKeepaliveMessage = () => {
		if (this.checkKeepaliveTimeoutId !== null) clearTimeout(this.checkKeepaliveTimeoutId);

		const id = Math.random().toString(16);
		this.lastSentKeepalive = { id, timestamp: Date.now() };
		// TODO: Dont send if the mqttClient is not connected at the moment?
		this.mqttClient.publish(this.publishTopic, JSON.stringify(this.lastSentKeepalive));
		this.checkKeepaliveTimeoutId = setTimeout(this.checkKeepalive, KEEPALIVE_PERIOD_MS);
	};

	private checkKeepalive = () => {
		const didReceiveCorrespondingACK =
			this.lastSentKeepalive.id === this.lastReceivedKeepalive.id;
		if (didReceiveCorrespondingACK) {
			this.sendKeepaliveMessage();
		} else {
			this.reconnectMqttClient();
		}
	};

	private reconnectIntervalId: ReturnType<typeof setInterval> | null = null;
	private reconnectMqttClient = () => {
		this.cancelReconnection();
		this.mqttClient.addListener('connect', this.onMqttClientConnect);
		this.reconnectIntervalId = setInterval(
			() => this.mqttClient.reconnect(),
			RECONNECT_PERIOD_MS
		);
		this.mqttClient.reconnect(); // leading reconnect() call
	};
	/** Cancel any ongoing reconnection */
	private cancelReconnection = () => {
		if (this.reconnectIntervalId !== null) clearInterval(this.reconnectIntervalId);
		this.mqttClient.removeListener('connect', this.onMqttClientConnect);
		this.reconnectIntervalId = null;
	};
	private onMqttClientConnect = () => {
		if (this.stopped) return;

		this.cancelReconnection();
		this.sendKeepaliveMessage();
		this.peerConnection.promptIceRestart();
	};

	private onMqttMessage = (topic: string, payload: string) => {
		if (this.stopped) return;

		if (topic === this.subscribeTopic) this.onKeepaliveMessage(payload);
	};

	public start = () => {
		this.stopped = false;

		const { uuid, localId, peerId } = this.peerConnection;
		this.publishTopic = `session/${localId}/${peerId}/${uuid}/keepalive`;
		this.subscribeTopic = `session/${peerId}/${localId}/${uuid}/keepalive`;

		this.mqttClient.addListener('message', this.onMqttMessage);
		this.mqttClient.subscribe(this.subscribeTopic, { qos: 0 });
		this.sendKeepaliveMessage();
	};
	private stopped = false;
	/** This function is somewhat idempotent - can be called many times consecutively but only the first will take effect */
	public stop = () => {
		if (this.stopped) return;
		this.stopped = true;

		if (this.checkKeepaliveTimeoutId !== null) clearTimeout(this.checkKeepaliveTimeoutId);
		this.cancelReconnection();
		this.mqttClient.removeListener('message', this.onMqttMessage);
		this.mqttClient.unsubscribe(this.subscribeTopic);
	};
}

export default MqttReconnector;
