/* eslint-disable camelcase */
import { IClientPublishOptions, IClientSubscribeOptions, MqttClient } from 'mqtt';

type SDPPayload = Omit<RTCSessionDescription, 'toJSON'> & { id: string };
export type ICEPayload = { candidate: string | null; sdpMLineIndex: number };
export type ACKPayload = {
	response: 'OK' | 'NOT_ALLOWED_BUSY';
	iceServers: RTCIceServer;
	/** @deprecated Use `AckPayload.capabilities.super_zoom_1` flag instead */
	GoBeSuperZoom1Enabled?: boolean;
	capabilities?: {
		super_zoom_1?: boolean;
		keepalive_over_mqtt?: boolean;
		mouse_control_with_joystick?: boolean;
		mouse_control_with_slider?: boolean;
	} & Record<string, unknown>;
};

export type SignallingError = 'INVALID_ACK_MESSAGE' | 'INVALID_SDP_MESSAGE' | 'INVALID_ICE_MESSAGE';

export class SignallingErrorACKTimeout extends Error {
	message = 'No ACK received from peer, in response to hello';
}
export class SignalingErrorACKNotAllowedBusy extends Error {
	message = 'Peer is busy, calling is not allowed';
}
export class SignalingErrorInvalidACKMessage extends Error {
	message = 'Got an invalid message as ACK from peer';
}
export class SignalingErrorInvalidICEMessage extends Error {
	message = 'Got an invalid message as ICE candidate from peer';
}

const ACK_RESPONSE_TIMEOUT = 60 * 1000;
class Signaling {
	/** Callback for when sdp is received from peer from peer
	 * @param {string} id - Identifier (from the sender's perspective) for this sdp.
	 * 	When replying back to this SDP, the same identifier must be used
	 */
	public onRemoteSDP: (id: string, sdp: Omit<RTCSessionDescription, 'toJSON'>) => void;
	/** Callback for when ice candidate is received from peer  */
	public onRemoteICE: (payload: ICEPayload) => void;
	/** Callback for when peer notifies of hanging up */
	public onRemoteHangUp: () => void;

	private localId: string;
	private peerId: string;
	private client: MqttClient;
	private uuid: string;
	private localInfo: { name: string; avatar?: string };

	private boundOnMessageFunc: (topic: string, message: string) => void;

	private baseTopics: {
		/** topics for initial outgoing hello message  */
		hello: string;
		/** topics for incoming acknowledgement-related messages */
		ack: string;
		/** topics for incoming session-related messages, ie ice, sdp, hangup */
		sessionIn: string;
		/** topics for outgoing session-related messages, ie ice, sdp, hangup */
		sessionOut: string;
	};

	private stopAutoResubscription: (() => void) | null = null;

	constructor(
		client: MqttClient,
		uuid: string,
		localId: string,
		peerId: string,
		localInfo: { name: string; avatar?: string }
	) {
		this.peerId = peerId;
		this.client = client;

		this.localId = localId.toLowerCase();
		this.uuid = uuid;
		this.localInfo = localInfo;

		this.baseTopics = {
			hello: `hello/${this.peerId}`,
			ack: `ack/${this.peerId}/${this.localId}/${this.uuid}`,
			sessionIn: `session/${this.peerId}/${this.localId}/${this.uuid}`,
			sessionOut: `session/${this.localId}/${this.peerId}/${this.uuid}`,
		};

		this.onRemoteSDP = (id, desc) =>
			console.warn('Unhandled callback on_incoming_sdp', id, desc.type);
		this.onRemoteICE = () => console.warn('Unhandled callback on_incoming_ice');
		this.onRemoteHangUp = () => console.warn('Unhandled callback on_incoming_hangup');

		this.boundOnMessageFunc = this.onMessage.bind(this);
	}

	/**
	 * Send hello to the remote peer - this is how a call is started.
	 * Returns a promise that resolves only if an OK response was received from the peer.
	 * @throws {SignalingErrorACKNotAllowedBusy}
	 * @throws {SignalingErrorInvalidACKMessage}
	 */
	public hello = (initialVolume: number) => {
		console.debug('SESSION_ID', this.uuid);
		this.client.addListener('message', this.boundOnMessageFunc);

		/** Callback for when acknowledgement message is received from peer */
		let onACKMessage: null | ((topic: string, msg: string) => void) = null;

		const self = this;
		const promise = new Promise<
			Required<Omit<ACKPayload, 'response' | 'GoBeSuperZoom1Enabled'>>
		>((resolve, reject) => {
			/** Whether a response was received or timed out */
			let ackCompleted = false;

			onACKMessage = (topic, message) => {
				// ignore messages on other topics besides ack topic
				if (topic !== self.baseTopics.ack) return;

				try {
					const payload = self.messageAsJSON(message) as ACKPayload;
					console.debug('signaling.hello.response ACK', payload);
					if (ackCompleted) {
						console.warn('signaling.hello.response received after timeout');
						return;
					}
					ackCompleted = true; // we now have an acknowledgement from peer

					if (payload.response === 'OK')
						resolve({
							...payload,
							capabilities: {
								...(payload.capabilities || {}),
								super_zoom_1:
									payload.capabilities?.super_zoom_1 ||
									payload.GoBeSuperZoom1Enabled,
							},
						});
					else if (payload.response === 'NOT_ALLOWED_BUSY')
						reject(new SignalingErrorACKNotAllowedBusy());
					else {
						console.error('signaling.hello.response invalid', payload);
						reject(new SignalingErrorInvalidACKMessage());
					}
				} catch (error) {
					console.error('signaling.hello.response error', error);
					reject(error);
				}
			};
			self.client.addListener('message', onACKMessage);

			self
				// subscribe to messages from peer
				// Do this early enough, so that we don't miss any messages from the remote peer
				.subscribeToSignallingTopics()
				.then(() => {
					console.info('signaling.hello Sending hello to remote peer');
					// publish hello to peer
					return self.publish(
						self.baseTopics.hello,
						{
							// add name, avatar in the request
							...self.localInfo,
							initialVolume,
							id: self.localId,
							uuid: self.uuid,
						},
						{ qos: 1 }
					);
				})
				// timeout if peer does not respond after some time
				.then(() => {
					setTimeout(() => {
						if (ackCompleted) return; // we already got a response, ignore
						ackCompleted = true;
						console.warn(`ACK timeout, ${ACK_RESPONSE_TIMEOUT / 1000} seconds`);
						reject(new SignallingErrorACKTimeout());
					}, ACK_RESPONSE_TIMEOUT);
				})
				.catch(error => {
					console.error('signaling.hello error', error);
					reject(error);
				});
		});

		return promise.finally(() => {
			// stop listening for ack message; we expect it to be received only once
			self.client.removeListener('message', onACKMessage!);
			self.unsubscribe(self.baseTopics.ack).catch(err => {
				console.error('Unable to unsubscribe from ack topic ', err);
			});
		});
	};

	public hangUp = () => {
		this.client.removeListener('message', this.boundOnMessageFunc);
		if (this.stopAutoResubscription) this.stopAutoResubscription();

		return Promise.all([
			this.publish(`${this.baseTopics.sessionOut}/hangUp`, '', { qos: 1 }), // ... and then publish hangup to peer
			this.unsubscribe([this.baseTopics.ack, `${this.baseTopics.sessionIn}/#`]), // unsubscribe from topics
		]);
	};

	public sendSDPToPeer = (id: string, desc: Omit<RTCSessionDescription, 'toJSON'>) => {
		const data: SDPPayload = {
			id,
			// NB: spread operator does not work for RTCSessionDescription instances
			type: desc.type, // so have to manually add the fields
			sdp: desc.sdp, // and this one too
		};
		console.info(`signaling.sendSDP Sending SDP '${desc.type}' to remote peer`, data);
		return this.publish(`${this.baseTopics.sessionOut}/sdp`, data, { qos: 1 });
	};

	public sendICE = (data: ICEPayload) => {
		console.info('signaling.sendICE Sending local ICE to remote peer');
		return this.publish(`${this.baseTopics.sessionOut}/ice`, data, { qos: 1 });
	};

	/** Notify the remote peer that we are ready for the initial offer sdp.
	 * This is sent after we have received an OK acknowledgement
	 */
	public sendREADY = () => {
		console.info('signaling.sendREADY Sending ready-for-initial-offer to remote peer');
		return this.publish(`${this.baseTopics.sessionOut}/ready`, '', { qos: 2 });
	};

	/** Prompt the remote peer to trigger an ICE restart sequence.
	 * The remote peer is then expected to send an new offer, in response to this prompt.
	 */
	public promptICERestart = () => {
		console.info('signaling.promptICERestart Sending a ice-restart-prompt to the remote peer');
		const message = { type: 'iceRestart' };
		return this.publish(`${this.baseTopics.sessionOut}/sdp`, message, { qos: 1 });
	};

	private publish(topic: string, data: any, options?: IClientPublishOptions) {
		return new Promise<void>((resolve, reject) => {
			let parsedData = data;
			try {
				parsedData = JSON.stringify(data);
				// eslint-disable-next-line no-empty
			} catch (error) {}
			this.client.publish(topic, parsedData, options || { qos: 0 }, (err?: Error) => {
				if (err) {
					reject(err);
					console.error(`signaling.publish ERROR ${topic}`, err);
				} else {
					console.debug(`signaling.published`, topic);
					resolve();
				}
			});
		});
	}

	/**
	 * Wrapper around mqtt client subscribe function to make it promise-like.
	 * Automatically resubcribes when client reconnects.
	 * @param topic
	 * @param options
	 * @returns A promise that resolves function to stop auto resubscribes
	 */
	private subscribe(topic: string | string[], options?: IClientSubscribeOptions) {
		const client = this.client;

		const logSuccess = () => console.debug(`signaling.subscribe`, topic);

		return new Promise<() => void>((resolve, reject) => {
			client.subscribe(topic, options || { qos: 0 }, error => {
				if (error) {
					console.error('signaling.subscribe ERROR', topic, error);
					reject(error);
				} else {
					logSuccess();

					const reSubscribe = () => {
						logSuccess();
						client.subscribe(topic, options || { qos: 0 });
					};
					// Subscribe again when the client becomes connected again
					client.addListener('connect', reSubscribe);

					const stopAutoReSubcription = () => {
						console.debug('signaling.stopAutoReSubcription');
						client.removeListener('connect', reSubscribe);
					};

					resolve(stopAutoReSubcription);
				}
			});
		});
	}

	private subscribeToSignallingTopics = async () => {
		const stopAutoReSubcription = await this.subscribe(
			[this.baseTopics.ack, `${this.baseTopics.sessionIn}/#`],
			{ qos: 1 }
		);
		if (this.stopAutoResubscription) this.stopAutoResubscription();
		this.stopAutoResubscription = stopAutoReSubcription;
	};

	private unsubscribe(topic: string | string[]) {
		return new Promise<void>((resolve, reject) => {
			this.client.unsubscribe(topic, {}, error => {
				if (error) {
					console.error('signaling.unsubscribe ERROR', topic, error);
					reject(error);
				} else resolve();
			});
		});
	}

	private messageAsJSON(message: string) {
		let parsedMessage: unknown = message;
		try {
			parsedMessage = JSON.parse(message);
		} catch (error) {
			console.warn('Unable to parse message as JSON', error);
		}
		return parsedMessage;
	}

	/** Handler for messages received from mqtt broker */
	private onMessage(topic: string, message: string) {
		let parsedMessage = this.messageAsJSON(message);

		switch (topic) {
			case this.baseTopics.ack:
				// NB: This is handled as part of the HELLO command in this.hello()
				//  Thus we just ignore it here
				break;
			case `${this.baseTopics.sessionIn}/sdp`: {
				console.debug('signaling.onMessage remote SDP');
				const { id, type, sdp }: SDPPayload = parsedMessage as SDPPayload;
				this.onRemoteSDP(id, { type, sdp });
				break;
			}
			case `${this.baseTopics.sessionIn}/ice`:
				console.debug('signaling.onMessage remote ICE');
				this.onRemoteICE(parsedMessage as ICEPayload);
				break;
			case `${this.baseTopics.sessionIn}/hangUp`:
				console.debug('signaling.onMessage remote HANG_UP');
				this.onRemoteHangUp && this.onRemoteHangUp();
				break;
		}
	}
}

export default Signaling;
