import { NgZone } from '@angular/core';
import { AutoIncrementingIdentifierDirective } from '../baseClasses/AutoIncrementingIdentifier';
import { Queue } from '../baseClasses/queue';
import { IMessageDistributor } from '../interfaces/IMessageDistributor';
import { ITopicRecipient } from '../interfaces/ITopicRecipient';
import { MessageDistributionPackage } from '../types/messageDistributionPackage';
import { Message } from '../types/Messages/Message';

/** ***********************************************************************************
 *  This class is an implementation of the IMessageDistributionManager interface. This
 *  implementation does not attempt to thread out the messages, performing all messaage
 *  distribution on the same core thread.  This implementation queues all messages and
 *  processes them in the order they are received.
 *  ***********************************************************************************/
export class MessageDistributionService extends AutoIncrementingIdentifierDirective implements IMessageDistributor {
    private static topicsQueue = new Queue<MessageDistributionPackage<Message, ITopicRecipient>>();
    private static processingTopicMessages = false;
    private static zone: NgZone;

    static distributeMessagesHandler() {
        if (MessageDistributionService.topicsQueue.length === 0 || MessageDistributionService.processingTopicMessages === true) {
            return;
        }

        MessageDistributionService.zone.runTask(() => {
            try {
                MessageDistributionService.processingTopicMessages = true;

                do {

                    const messagePkg: MessageDistributionPackage<
                        Message,
                        ITopicRecipient
                    > = MessageDistributionService.topicsQueue.dequeue();
                    messagePkg.recipients.forEach((recipient) => {
                        if (recipient.id !== messagePkg.message.senderId) {
                            recipient.topicMessageReceived(messagePkg.message);
                        }
                    });
                } while (MessageDistributionService.topicsQueue.length > 0);

            } catch (e) {
            } finally {
                MessageDistributionService.processingTopicMessages = false;
            }
        });
    }

    public distributeTopics(recipients: ITopicRecipient[], message: Message) {
        MessageDistributionService.topicsQueue.enqueue(new MessageDistributionPackage(recipients, message));
        if (MessageDistributionService.topicsQueue.length === 1 && MessageDistributionService.processingTopicMessages === false) {
            setTimeout(MessageDistributionService.distributeMessagesHandler, 10);
        }
    }

    constructor(zone: NgZone) {
        super();
        MessageDistributionService.zone = zone;
        MessageDistributionService.distributeMessagesHandler();
    }
}
