package org.openforis.rmb;

import org.openforis.rmb.monitor.ConsumingNewMessageEvent;
import org.openforis.rmb.monitor.ConsumingTimedOutMessageEvent;
import org.openforis.rmb.monitor.MessageConsumedEvent;
import org.openforis.rmb.monitor.MessageConsumptionFailedEvent;
import org.openforis.rmb.monitor.MessageKeptAliveEvent;
import org.openforis.rmb.monitor.RetryingMessageConsumptionEvent;
import org.openforis.rmb.monitor.ThrottlingMessageRetryEvent;
import org.openforis.rmb.spi.Clock;
import org.openforis.rmb.spi.MessageProcessingStatus;
import org.openforis.rmb.spi.MessageProcessingUpdate;
import org.openforis.rmb.spi.MessageRepository;

/* loaded from: input_file:WEB-INF/lib/repository-message-broker-core-0.1.3.jar:org/openforis/rmb/Worker.class */
final class Worker<M> {
    private final MessageRepository repository;
    private final Throttler throttler;
    private final Monitors monitors;
    private final M message;
    private final MessageConsumer<M> consumer;
    private MessageProcessingUpdate<M> update;
    private final Clock clock = new Clock.SystemClock();
    private final KeepAlive keepAlive = createKeepAlive();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/repository-message-broker-core-0.1.3.jar:org/openforis/rmb/Worker$MessageUpdateConflict.class */
    public static class MessageUpdateConflict extends RuntimeException {
        final MessageProcessingUpdate<?> update;

        MessageUpdateConflict(MessageProcessingUpdate<?> messageProcessingUpdate) {
            this.update = messageProcessingUpdate;
        }
    }

    public Worker(MessageRepository messageRepository, Throttler throttler, Monitors monitors, MessageProcessingUpdate<M> messageProcessingUpdate, M m) {
        this.repository = messageRepository;
        this.throttler = throttler;
        this.monitors = monitors;
        this.update = messageProcessingUpdate;
        this.message = m;
        this.consumer = messageProcessingUpdate.getConsumer();
    }

    public void consume() throws InterruptedException, MessageUpdateConflict {
        notifyOnWorkerStart();
        Exception tryToConsume = tryToConsume();
        if (tryToConsume == null) {
            return;
        }
        while (notReachedMaxRetries()) {
            retry(tryToConsume);
            tryToConsume = tryToConsume();
            if (tryToConsume == null) {
                return;
            }
        }
        failed(tryToConsume);
    }

    private synchronized boolean notReachedMaxRetries() {
        if (this.consumer.maxRetries >= 0) {
            return this.consumer.maxRetries >= 0 && this.update.getRetries() < this.consumer.maxRetries;
        }
        return true;
    }

    private synchronized void notifyOnWorkerStart() {
        if (this.update.getFromState() == MessageProcessingStatus.State.TIMED_OUT) {
            this.monitors.onEvent(new ConsumingTimedOutMessageEvent(this.update, this.message));
        } else {
            this.monitors.onEvent(new ConsumingNewMessageEvent(this.update, this.message));
        }
    }

    private Exception tryToConsume() {
        try {
            this.consumer.consume(this.message, this.keepAlive);
            completed();
            return null;
        } catch (RuntimeException e) {
            return e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void keepAlive() {
        updateRepo(this.update.processing(this.clock));
        this.monitors.onEvent(new MessageKeptAliveEvent(this.update, this.message));
    }

    private synchronized void retry(Exception exc) throws InterruptedException {
        updateRepo(this.update.retry(this.clock, exc.getMessage()));
        this.monitors.onEvent(new ThrottlingMessageRetryEvent(this.update, this.message, exc));
        this.throttler.throttle(this.update.getRetries(), this.consumer, this.keepAlive);
        this.monitors.onEvent(new RetryingMessageConsumptionEvent(this.update, this.message, exc));
    }

    private synchronized void failed(Exception exc) {
        updateRepo(this.update.failed(this.clock, exceptionMessage(exc)));
        this.monitors.onEvent(new MessageConsumptionFailedEvent(this.update, this.message, exc));
    }

    private String exceptionMessage(Throwable th) {
        if (th == null) {
            return null;
        }
        return th.getMessage() != null ? th.getMessage() : exceptionMessage(th.getCause());
    }

    private synchronized void completed() {
        updateRepo(this.update.completed(this.clock));
        this.monitors.onEvent(new MessageConsumedEvent(this.update, this.message));
    }

    private void updateRepo(MessageProcessingUpdate<M> messageProcessingUpdate) {
        if (!this.repository.update(messageProcessingUpdate)) {
            throw new MessageUpdateConflict(messageProcessingUpdate);
        }
        this.update = messageProcessingUpdate;
    }

    private KeepAlive createKeepAlive() {
        return new KeepAlive() { // from class: org.openforis.rmb.Worker.1
            @Override // org.openforis.rmb.KeepAlive
            public void send() {
                Worker.this.keepAlive();
            }
        };
    }
}
