package org.openforis.rmb;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.openforis.rmb.Throttler;
import org.openforis.rmb.Worker;
import org.openforis.rmb.monitor.MessageUpdateConflictEvent;
import org.openforis.rmb.monitor.PollingForMessagesEvent;
import org.openforis.rmb.monitor.TakingMessagesFailedEvent;
import org.openforis.rmb.spi.Clock;
import org.openforis.rmb.spi.MessageProcessingUpdate;
import org.openforis.rmb.spi.MessageRepository;
import org.openforis.rmb.spi.MessageSerializer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/repository-message-broker-core-0.1.3.jar:org/openforis/rmb/MessagePoller.class */
public final class MessagePoller {
    private final MessageRepository repository;
    private final MessageSerializer messageSerializer;
    private final Monitors monitors;
    private final Throttler throttler = new Throttler.DefaultThrottler(new Clock.SystemClock());
    private ConcurrentHashMap<MessageConsumer<?>, AtomicInteger> currentlyProcessingMessageCountByConsumer = new ConcurrentHashMap<>();
    private final ExecutorService messageTaker = Executors.newSingleThreadExecutor(NamedThreadFactory.singleThreadFactory("rmb.MessageTaker"));
    private final ExecutorService workerExecutor = Executors.newCachedThreadPool(NamedThreadFactory.multipleThreadFactory("rmb.WorkerExecutor"));

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagePoller(MessageRepository messageRepository, MessageSerializer messageSerializer, Monitors monitors) {
        this.repository = messageRepository;
        this.messageSerializer = messageSerializer;
        this.monitors = monitors;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerConsumers(Collection<MessageConsumer<?>> collection) {
        Iterator<MessageConsumer<?>> it = collection.iterator();
        while (it.hasNext()) {
            this.currentlyProcessingMessageCountByConsumer.put(it.next(), new AtomicInteger());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void poll() {
        this.messageTaker.execute(new Runnable() { // from class: org.openforis.rmb.MessagePoller.1
            @Override // java.lang.Runnable
            public void run() {
                MessagePoller.this.takeMessages();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void takeMessages() {
        Map<MessageConsumer<?>, Integer> determineMaxCountByConsumer = determineMaxCountByConsumer();
        if (determineMaxCountByConsumer.isEmpty()) {
            return;
        }
        try {
            this.monitors.onEvent(new PollingForMessagesEvent(determineMaxCountByConsumer));
            this.repository.take(determineMaxCountByConsumer, new MessageRepository.MessageTakenCallback() { // from class: org.openforis.rmb.MessagePoller.2
                @Override // org.openforis.rmb.spi.MessageRepository.MessageTakenCallback
                public void taken(MessageProcessingUpdate messageProcessingUpdate, Object obj) {
                    MessagePoller.this.consume(messageProcessingUpdate, obj);
                }
            });
        } catch (Exception e) {
            this.monitors.onEvent(new TakingMessagesFailedEvent(determineMaxCountByConsumer, e));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <M> void consume(final MessageProcessingUpdate<M> messageProcessingUpdate, final Object obj) {
        incrementCurrentlyProcessingMessageCount(messageProcessingUpdate.getConsumer());
        this.workerExecutor.execute(new Runnable() { // from class: org.openforis.rmb.MessagePoller.3
            @Override // java.lang.Runnable
            public void run() {
                Object deserialize = MessagePoller.this.messageSerializer.deserialize(obj);
                try {
                    new Worker(MessagePoller.this.repository, MessagePoller.this.throttler, MessagePoller.this.monitors, messageProcessingUpdate, deserialize).consume();
                } catch (InterruptedException unused) {
                    Thread.currentThread().interrupt();
                } catch (Worker.MessageUpdateConflict unused2) {
                    MessagePoller.this.monitors.onEvent(new MessageUpdateConflictEvent(messageProcessingUpdate, deserialize));
                } finally {
                    MessagePoller.this.decrementCurrentlyProcessingMessageCount(messageProcessingUpdate.getConsumer());
                    MessagePoller.this.poll();
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void decrementCurrentlyProcessingMessageCount(MessageConsumer<?> messageConsumer) {
        currentlyProcessingMessageCount(messageConsumer).decrementAndGet();
    }

    private void incrementCurrentlyProcessingMessageCount(MessageConsumer<?> messageConsumer) {
        currentlyProcessingMessageCount(messageConsumer).incrementAndGet();
    }

    private AtomicInteger currentlyProcessingMessageCount(MessageConsumer<?> messageConsumer) {
        return this.currentlyProcessingMessageCountByConsumer.get(messageConsumer);
    }

    private Map<MessageConsumer<?>, Integer> determineMaxCountByConsumer() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<MessageConsumer<?>, AtomicInteger> entry : this.currentlyProcessingMessageCountByConsumer.entrySet()) {
            MessageConsumer<?> key = entry.getKey();
            int i = key.messagesHandledInParallel - entry.getValue().get();
            if (i > 0) {
                hashMap.put(key, Integer.valueOf(i));
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        ExecutorTerminator.shutdownAndAwaitTermination(this.messageTaker, this.workerExecutor);
    }
}
