package org.openforis.rmb.jdbc;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Map;
import org.openforis.rmb.MessageConsumer;
import org.openforis.rmb.spi.Clock;
import org.openforis.rmb.spi.MessageDetails;
import org.openforis.rmb.spi.MessageProcessing;
import org.openforis.rmb.spi.MessageProcessingStatus;
import org.openforis.rmb.spi.MessageProcessingUpdate;
import org.openforis.rmb.spi.MessageRepository;

/* loaded from: input_file:WEB-INF/lib/repository-message-broker-core-0.1.3.jar:org/openforis/rmb/jdbc/MessageTaker.class */
final class MessageTaker extends Operation {
    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageTaker(Connection connection, String str, Clock clock) {
        super(connection, str, clock);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void take(Map<MessageConsumer<?>, Integer> map, MessageRepository.MessageTakenCallback messageTakenCallback) throws SQLException {
        for (Map.Entry<MessageConsumer<?>, Integer> entry : map.entrySet()) {
            takeMessages(entry.getKey(), entry.getValue().intValue(), messageTakenCallback);
        }
    }

    private void takeMessages(MessageConsumer<?> messageConsumer, int i, MessageRepository.MessageTakenCallback messageTakenCallback) throws SQLException {
        PreparedStatement prepareStatement = this.connection.prepareStatement("SELECT queue_id, message_id, publication_time, version_id, state, message_string, message_bytes,        times_out, retries, error_message \nFROM " + this.tablePrefix + "message_processing mc\nJOIN " + this.tablePrefix + "message m ON mc.message_id = m.id\nWHERE consumer_id = ?\nAND state IN ('PENDING', 'PROCESSING')\nORDER BY sequence_no");
        prepareStatement.setString(1, messageConsumer.getId());
        prepareStatement.setMaxRows(i);
        ResultSet executeQuery = prepareStatement.executeQuery();
        for (int i2 = 0; executeQuery.next() && i2 < i; i2++) {
            if (canTakeMessage(executeQuery)) {
                takeMessage(executeQuery, messageConsumer, messageTakenCallback);
            }
        }
        executeQuery.close();
        prepareStatement.close();
    }

    private boolean canTakeMessage(ResultSet resultSet) throws SQLException {
        return resultSet.getString("state").equals("PENDING") || resultSet.getTimestamp("times_out").before(new Timestamp(this.clock.millis()));
    }

    private void takeMessage(ResultSet resultSet, MessageConsumer<?> messageConsumer, MessageRepository.MessageTakenCallback messageTakenCallback) throws SQLException {
        String string = resultSet.getString("queue_id");
        Timestamp timestamp = resultSet.getTimestamp("publication_time");
        MessageProcessingStatus.State state = resultSet.getString("state").equals("PROCESSING") ? MessageProcessingStatus.State.TIMED_OUT : MessageProcessingStatus.State.PENDING;
        String string2 = resultSet.getString("message_id");
        Object serializedMessage = serializedMessage(resultSet);
        MessageProcessingUpdate take = MessageProcessing.create(new MessageDetails(string, string2, toDate(timestamp)), messageConsumer, new MessageProcessingStatus(state, resultSet.getInt("retries"), resultSet.getString("error_message"), now(), resultSet.getString("version_id"))).take(this.clock);
        if (updateMessageProcessing(take)) {
            messageTakenCallback.taken(take, serializedMessage);
        }
    }
}
