package org.jboss.pnc.rest.notifications.websockets;

import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.ws.rs.core.Response;
import org.jboss.pnc.bpm.BpmEventType;
import org.jboss.pnc.bpm.BpmManager;
import org.jboss.pnc.bpm.BpmTask;
import org.jboss.pnc.bpm.task.BpmBuildTask;
import org.jboss.pnc.common.Configuration;
import org.jboss.pnc.common.json.ConfigurationParseException;
import org.jboss.pnc.common.json.moduleconfig.SystemConfig;
import org.jboss.pnc.common.json.moduleprovider.PncConfigProvider;
import org.jboss.pnc.rest.restmodel.bpm.BpmNotificationRest;
import org.jboss.pnc.rest.restmodel.bpm.ProcessProgressUpdate;
import org.jboss.pnc.rest.restmodel.response.error.ErrorResponseRest;
import org.jboss.pnc.rest.utils.JsonOutputConverterMapper;
import org.jboss.pnc.spi.events.BuildCoordinationStatusChangedEvent;
import org.jboss.pnc.spi.events.BuildSetStatusChangedEvent;
import org.jboss.pnc.spi.notifications.AttachedClient;
import org.jboss.pnc.spi.notifications.MessageCallback;
import org.jboss.pnc.spi.notifications.Notifier;
import org.jboss.pnc.spi.notifications.model.NotificationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:WEB-INF/classes/org/jboss/pnc/rest/notifications/websockets/DefaultNotifier.class */
public class DefaultNotifier implements Notifier {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private Set<AttachedClient> attachedClients = Collections.synchronizedSet(new HashSet());
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private final MessageCallback messageCallback = new MessageCallback() { // from class: org.jboss.pnc.rest.notifications.websockets.DefaultNotifier.1
        public void successful(AttachedClient attachedClient) {
        }

        public void failed(AttachedClient attachedClient, Throwable th) {
            DefaultNotifier.logger.error("Notification client threw an error, removing it", th);
            DefaultNotifier.this.detachClient(attachedClient);
        }
    };

    @Inject
    private NotificationFactory notificationFactory;
    Optional<BpmManager> bpmManager;

    @Inject
    Instance<BpmManager> bpmManagerInstance;

    @Inject
    Configuration configuration;

    @PostConstruct
    public void init() {
        String str;
        this.scheduler.scheduleAtFixedRate(this::cleanUp, 1L, 1L, TimeUnit.HOURS);
        try {
            str = ((SystemConfig) this.configuration.getModuleConfig(new PncConfigProvider(SystemConfig.class))).getBuildSchedulerId();
        } catch (ConfigurationParseException e) {
            logger.warn("Cannot read system config buildSchedulerId");
            str = "does-not-match";
        }
        if (!"bpm-build-scheduler".equals(str) || this.bpmManagerInstance.isUnsatisfied() || this.bpmManagerInstance.isAmbiguous()) {
            this.bpmManager = Optional.empty();
            return;
        }
        this.bpmManager = Optional.of(this.bpmManagerInstance.get());
        logger.debug("Subscribing listener for new tasks.");
        ((BpmManager) this.bpmManagerInstance.get()).subscribeToNewTasks(bpmTask -> {
            onNewTaskCreated(bpmTask);
        });
    }

    public void attachClient(AttachedClient attachedClient) {
        synchronized (this.attachedClients) {
            this.attachedClients.add(attachedClient);
        }
    }

    public void detachClient(AttachedClient attachedClient) {
        try {
            synchronized (this.attachedClients) {
                this.attachedClients.remove(attachedClient);
            }
        } catch (ConcurrentModificationException e) {
            logger.error("Error while removing attached client: ", (Throwable) e);
        }
    }

    public int getAttachedClientsCount() {
        return this.attachedClients.size();
    }

    public Optional<AttachedClient> getAttachedClient(String str) {
        return this.attachedClients.stream().filter(attachedClient -> {
            return attachedClient.getSessionId().equals(str);
        }).findAny();
    }

    public MessageCallback getCallback() {
        return this.messageCallback;
    }

    public void sendMessage(Object obj) {
        try {
            for (AttachedClient attachedClient : this.attachedClients) {
                if (attachedClient.isEnabled()) {
                    try {
                        attachedClient.sendMessage(obj, this.messageCallback);
                    } catch (Exception e) {
                        logger.error("Unable to send message, detaching client.", (Throwable) e);
                        detachClient(attachedClient);
                    }
                }
            }
        } catch (ConcurrentModificationException e2) {
            logger.warn("Error while removing attached client: ", (Throwable) e2);
        }
    }

    public void sendToSubscribers(Object obj, String str, String str2) {
        try {
            for (AttachedClient attachedClient : this.attachedClients) {
                if (attachedClient.isEnabled() && attachedClient.isSubscribed(str, str2)) {
                    try {
                        attachedClient.sendMessage(obj, this.messageCallback);
                    } catch (Exception e) {
                        logger.error("Unable to send message, detaching client.", (Throwable) e);
                        detachClient(attachedClient);
                    }
                }
            }
        } catch (ConcurrentModificationException e2) {
            logger.warn("Error while removing attached client: ", (Throwable) e2);
        }
    }

    public void onClientSubscribe(AttachedClient attachedClient, String str) {
        if (this.bpmManager.isPresent()) {
            Optional bpmTaskByBuildTaskId = BpmBuildTask.getBpmTaskByBuildTaskId(this.bpmManager.get(), Integer.valueOf(str));
            if (!bpmTaskByBuildTaskId.isPresent()) {
                attachedClient.sendMessage(JsonOutputConverterMapper.apply(JsonOutputConverterMapper.apply(new ErrorResponseRest(Integer.toString(Response.Status.NO_CONTENT.getStatusCode()), "No process for id: " + str))), this.messageCallback);
                return;
            }
            Optional reduce = ((BpmTask) bpmTaskByBuildTaskId.get()).getEvents().stream().reduce((bpmNotificationRest, bpmNotificationRest2) -> {
                return bpmNotificationRest2;
            });
            if (reduce.isPresent()) {
                attachedClient.sendMessage((BpmNotificationRest) reduce.get(), this.messageCallback);
            } else {
                attachedClient.sendMessage(JsonOutputConverterMapper.apply(JsonOutputConverterMapper.apply(new ErrorResponseRest(Integer.toString(Response.Status.NO_CONTENT.getStatusCode()), "No events for id: " + str))), this.messageCallback);
            }
        }
    }

    public void cleanUp() {
        synchronized (this.attachedClients) {
            Iterator<AttachedClient> it = this.attachedClients.iterator();
            while (it.hasNext()) {
                if (!it.next().isEnabled()) {
                    it.remove();
                }
            }
        }
    }

    private void onNewTaskCreated(BpmTask bpmTask) {
        if (bpmTask instanceof BpmBuildTask) {
            logger.debug("Adding listener for PROCESS_PROGRESS_UPDATEs to bpmTask {}.", bpmTask.getTaskId());
            BpmBuildTask bpmBuildTask = (BpmBuildTask) bpmTask;
            bpmTask.addListener(BpmEventType.PROCESS_PROGRESS_UPDATE, processProgressUpdate -> {
                notifySubscribers(Integer.toString(bpmBuildTask.getBuildTask().getId()), processProgressUpdate);
            });
        }
    }

    private void notifySubscribers(String str, ProcessProgressUpdate processProgressUpdate) {
        logger.debug("Sending update for buildTaskId: {}. processProgressUpdate: {}.", str, processProgressUpdate.toString());
        sendToSubscribers(processProgressUpdate, "component-build", str);
    }

    public void collectBuildStatusChangedEvent(@Observes BuildCoordinationStatusChangedEvent buildCoordinationStatusChangedEvent) {
        logger.debug("Observed new status changed event {}.", buildCoordinationStatusChangedEvent);
        sendMessage(this.notificationFactory.createNotification(buildCoordinationStatusChangedEvent));
        logger.debug("Status changed event processed {}.", buildCoordinationStatusChangedEvent);
    }

    public void collectBuildSetStatusChangedEvent(@Observes BuildSetStatusChangedEvent buildSetStatusChangedEvent) {
        sendMessage(this.notificationFactory.createNotification(buildSetStatusChangedEvent));
    }
}
