package org.openforis.rmb;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.openforis.rmb.RepositoryMessageBroker;
import org.openforis.rmb.monitor.MessagePublishedEvent;
import org.openforis.rmb.monitor.MessageQueueCreatedEvent;
import org.openforis.rmb.spi.MessageRepository;
import org.openforis.rmb.spi.MessageSerializer;
import org.openforis.rmb.spi.TransactionSynchronizer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/repository-message-broker-core-0.1.3.jar:org/openforis/rmb/MessageQueueManager.class */
public final class MessageQueueManager {
    private final MessageRepository repository;
    private final TransactionSynchronizer transactionSynchronizer;
    private final MessagePoller messagePoller;
    private final MessageSerializer messageSerializer;
    private final Monitors monitors;
    private final MessageRepositoryWatcher repositoryWatcher;
    private final Map<String, List<MessageConsumer<?>>> consumersByQueueId = new ConcurrentHashMap();
    private final Set<String> queueIds = new HashSet();
    private final Set<String> consumerIds = new HashSet();
    private final AtomicBoolean started = new AtomicBoolean();

    public MessageQueueManager(RepositoryMessageBroker.Config config) {
        this.repository = config.messageRepository;
        this.transactionSynchronizer = config.transactionSynchronizer;
        this.messagePoller = new MessagePoller(this.repository, config.messageSerializer, config.monitors);
        this.messageSerializer = config.messageSerializer;
        this.monitors = config.monitors;
        this.repositoryWatcher = new MessageRepositoryWatcher(this.messagePoller, config);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <M> void publish(String str, M m) {
        if (!this.started.get()) {
            throw new IllegalStateException("MessageBroker has not been started");
        }
        assertInTransaction(str, m);
        this.repository.add(str, this.consumersByQueueId.get(str), this.messageSerializer.serialize(m));
        this.monitors.onEvent(new MessagePublishedEvent(str, m));
        pollForMessagesOnCommit();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerQueue(String str, List<MessageConsumer<?>> list) {
        assertQueueIdUniqueness(str);
        assertConsumerUniqueness(list);
        this.repositoryWatcher.includeQueue(str, list);
        this.consumersByQueueId.put(str, list);
        this.messagePoller.registerConsumers(list);
        this.monitors.onEvent(new MessageQueueCreatedEvent(str, list));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.started.set(true);
        this.repositoryWatcher.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.repositoryWatcher.stop();
        this.messagePoller.stop();
    }

    private void pollForMessagesOnCommit() {
        this.transactionSynchronizer.notifyOnCommit(new TransactionSynchronizer.CommitListener() { // from class: org.openforis.rmb.MessageQueueManager.1
            @Override // org.openforis.rmb.spi.TransactionSynchronizer.CommitListener
            public void committed() {
                MessageQueueManager.this.messagePoller.poll();
            }
        });
    }

    private void assertInTransaction(String str, Object obj) {
        if (!this.transactionSynchronizer.isInTransaction()) {
            throw new IllegalStateException("Trying to publish a message outside of a transaction. Queue: " + str + ", message: " + obj);
        }
    }

    private void assertQueueIdUniqueness(String str) {
        if (!this.queueIds.add(str)) {
            throw new IllegalArgumentException("Duplicate queue id: " + str);
        }
    }

    private void assertConsumerUniqueness(List<MessageConsumer<?>> list) {
        for (MessageConsumer<?> messageConsumer : list) {
            if (!this.consumerIds.add(messageConsumer.id)) {
                throw new IllegalArgumentException("Duplicate consumer id: " + messageConsumer.id);
            }
        }
    }
}
