package org.proton.plug.context;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.exceptions.ActiveMQAMQPException;
import org.proton.plug.logger.ActiveMQAMQPProtocolMessageBundle;
import org.proton.plug.util.DeliveryUtil;

/* loaded from: input_file:org/proton/plug/context/ProtonTransactionHandler.class */
public class ProtonTransactionHandler implements ProtonDeliveryHandler {
    final AMQPSessionCallback sessionSPI;

    public ProtonTransactionHandler(AMQPSessionCallback aMQPSessionCallback) {
        this.sessionSPI = aMQPSessionCallback;
    }

    @Override // org.proton.plug.context.ProtonDeliveryHandler
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
        ByteBuf heapBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
        try {
            try {
                Receiver link = delivery.getLink();
                if (!delivery.isReadable()) {
                    heapBuffer.release();
                    return;
                }
                DeliveryUtil.readDelivery(link, heapBuffer);
                link.advance();
                Object value = DeliveryUtil.decodeMessageImpl(heapBuffer).getBody().getValue();
                if (value instanceof Declare) {
                    Binary currentTXID = this.sessionSPI.getCurrentTXID();
                    Declared declared = new Declared();
                    declared.setTxnId(currentTXID);
                    delivery.disposition(declared);
                    delivery.settle();
                } else if (value instanceof Discharge) {
                    if (((Discharge) value).getFail().booleanValue()) {
                        try {
                            this.sessionSPI.rollbackCurrentTX(true);
                        } catch (Exception e) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
                        }
                    } else {
                        try {
                            this.sessionSPI.commitCurrentTX();
                        } catch (Exception e2) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e2.getMessage());
                        }
                    }
                    delivery.settle();
                }
                heapBuffer.release();
            } catch (Exception e3) {
                e3.printStackTrace();
                Rejected rejected = new Rejected();
                ErrorCondition errorCondition = new ErrorCondition();
                errorCondition.setCondition(Symbol.valueOf("failed"));
                errorCondition.setDescription(e3.getMessage());
                rejected.setError(errorCondition);
                delivery.disposition(rejected);
                heapBuffer.release();
            }
        } catch (Throwable th) {
            heapBuffer.release();
            throw th;
        }
    }

    @Override // org.proton.plug.context.ProtonDeliveryHandler
    public void onFlow(int i, boolean z) {
    }

    @Override // org.proton.plug.context.ProtonDeliveryHandler
    public void close(boolean z) throws ActiveMQAMQPException {
    }

    @Override // org.proton.plug.context.ProtonDeliveryHandler
    public void close(ErrorCondition errorCondition) throws ActiveMQAMQPException {
    }
}
