package org.apache.activemq.transport.amqp.protocol;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-amqp-5.11.0.redhat-630415.jar:org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.class */
public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
    private static final Logger LOG;
    private final Set<AmqpSession> txSessions;
    static final /* synthetic */ boolean $assertionsDisabled;

    public AmqpTransactionCoordinator(AmqpSession amqpSession, Receiver receiver) {
        super(amqpSession, receiver);
        this.txSessions = new HashSet();
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpAbstractReceiver
    protected void processDelivery(final Delivery delivery, Buffer buffer) throws Exception {
        byte b;
        Message message = Proton.message();
        int i = buffer.offset;
        int i2 = buffer.length;
        while (true) {
            int i3 = i2;
            if (i3 <= 0) {
                final AmqpSession amqpSession = (AmqpSession) getEndpoint().getSession().getContext();
                ConnectionId connectionId = amqpSession.getConnection().getConnectionId();
                final Object value = ((AmqpValue) message.getBody()).getValue();
                LOG.debug("COORDINATOR received: {}, [{}]", value, buffer);
                if (value instanceof Declare) {
                    if (((Declare) value).getGlobalId() != null) {
                        throw new Exception("don't know how to handle a declare /w a set GlobalId");
                    }
                    LocalTransactionId nextTransactionId = amqpSession.getConnection().getNextTransactionId();
                    TransactionInfo transactionInfo = new TransactionInfo(connectionId, nextTransactionId, (byte) 0);
                    amqpSession.getConnection().registerTransaction(nextTransactionId, this);
                    sendToActiveMQ(transactionInfo, null);
                    LOG.trace("started transaction {}", Long.valueOf(nextTransactionId.getValue()));
                    Declared declared = new Declared();
                    declared.setTxnId(new Binary(AmqpSupport.toBytes(nextTransactionId.getValue())));
                    delivery.disposition(declared);
                    delivery.settle();
                } else {
                    if (!(value instanceof Discharge)) {
                        throw new Exception("Expected coordinator message type: " + value.getClass());
                    }
                    Discharge discharge = (Discharge) value;
                    LocalTransactionId localTransactionId = new LocalTransactionId(connectionId, AmqpSupport.toLong(discharge.getTxnId()));
                    if (discharge.getFail().booleanValue()) {
                        LOG.trace("rollback transaction {}", Long.valueOf(localTransactionId.getValue()));
                        b = 4;
                    } else {
                        LOG.trace("commit transaction {}", Long.valueOf(localTransactionId.getValue()));
                        b = 2;
                    }
                    for (AmqpSession amqpSession2 : this.txSessions) {
                        if (b == 4) {
                            amqpSession2.rollback();
                        } else {
                            amqpSession2.commit();
                        }
                    }
                    this.txSessions.clear();
                    amqpSession.getConnection().unregisterTransaction(localTransactionId);
                    final byte b2 = b;
                    sendToActiveMQ(new TransactionInfo(connectionId, localTransactionId, b), new ResponseHandler() { // from class: org.apache.activemq.transport.amqp.protocol.AmqpTransactionCoordinator.1
                        @Override // org.apache.activemq.transport.amqp.ResponseHandler
                        public void onResponse(AmqpProtocolConverter amqpProtocolConverter, Response response) throws IOException {
                            if (response.isException()) {
                                Rejected rejected = new Rejected();
                                rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), ((ExceptionResponse) response).getException().getMessage()));
                                delivery.disposition(rejected);
                            } else {
                                delivery.disposition(Accepted.getInstance());
                            }
                            AmqpTransactionCoordinator.LOG.debug("TX: {} settling {}", Byte.valueOf(b2), value);
                            delivery.settle();
                            amqpSession.pumpProtonToSocket();
                        }
                    });
                    if (b == 4) {
                        amqpSession.flushPendingMessages();
                    }
                }
                replenishCredit();
                return;
            }
            int decode = message.decode(buffer.data, i, i3);
            if (!$assertionsDisabled && decode <= 0) {
                throw new AssertionError("Make progress decoding the message");
            }
            i += decode;
            i2 = i3 - decode;
        }
    }

    private void replenishCredit() {
        if (getEndpoint().getCredit() <= getConfiguredReceiverCredit() * 0.2d) {
            LOG.debug("Sending more credit ({}) to transaction coordinator on session {}", Integer.valueOf(getConfiguredReceiverCredit() - getEndpoint().getCredit()), this.session.getSessionId());
            getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
            this.session.pumpProtonToSocket();
        }
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpLink
    public ActiveMQDestination getDestination() {
        return null;
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpLink
    public void setDestination(ActiveMQDestination activeMQDestination) {
    }

    public void enlist(AmqpSession amqpSession) {
        this.txSessions.add(amqpSession);
    }

    static {
        $assertionsDisabled = !AmqpTransactionCoordinator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) AmqpTransactionCoordinator.class);
    }
}
