package org.apache.activemq.transport.amqp.protocol;

import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
import org.apache.activemq.transport.amqp.message.EncodedMessage;
import org.apache.activemq.transport.amqp.message.OutboundTransformer;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-amqp-5.11.0.redhat-6-2-0-SNAPSHOT.jar:org/apache/activemq/transport/amqp/protocol/AmqpSender.class */
public class AmqpSender extends AmqpAbstractLink<Sender> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AmqpSender.class);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private final OutboundTransformer outboundTransformer;
    private final AmqpTransferTagGenerator tagCache;
    private final LinkedList<MessageDispatch> outbound;
    private final LinkedList<MessageDispatch> dispatchedInTx;
    private final String MESSAGE_FORMAT_KEY;
    private final ConsumerInfo consumerInfo;
    private final boolean presettle;
    private int currentCredit;
    private boolean draining;
    private long lastDeliveredSequenceId;
    private Buffer currentBuffer;
    private Delivery currentDelivery;

    public AmqpSender(AmqpSession amqpSession, Sender sender, ConsumerInfo consumerInfo) {
        super(amqpSession, sender);
        this.outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
        this.tagCache = new AmqpTransferTagGenerator();
        this.outbound = new LinkedList<>();
        this.dispatchedInTx = new LinkedList<>();
        this.MESSAGE_FORMAT_KEY = this.outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
        this.currentCredit = sender.getRemoteCredit();
        this.consumerInfo = consumerInfo;
        this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpAbstractLink, org.apache.activemq.transport.amqp.protocol.AmqpResource
    public void open() {
        if (!isClosed()) {
            this.session.registerSender(getConsumerId(), this);
        }
        super.open();
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpAbstractLink, org.apache.activemq.transport.amqp.protocol.AmqpLink
    public void detach() {
        if (!isClosed() && isOpened()) {
            RemoveInfo removeInfo = new RemoveInfo(getConsumerId());
            removeInfo.setLastDeliveredSequenceId(this.lastDeliveredSequenceId);
            sendToActiveMQ(removeInfo, null);
            this.session.unregisterSender(getConsumerId());
        }
        super.detach();
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpAbstractLink, org.apache.activemq.transport.amqp.protocol.AmqpResource
    public void close() {
        if (!isClosed() && isOpened()) {
            RemoveInfo removeInfo = new RemoveInfo(getConsumerId());
            removeInfo.setLastDeliveredSequenceId(this.lastDeliveredSequenceId);
            sendToActiveMQ(removeInfo, null);
            if (this.consumerInfo.isDurable()) {
                RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
                removeSubscriptionInfo.setConnectionId(this.session.getConnection().getConnectionId());
                removeSubscriptionInfo.setSubscriptionName(((Sender) getEndpoint()).getName());
                removeSubscriptionInfo.setClientId(this.session.getConnection().getClientId());
                sendToActiveMQ(removeSubscriptionInfo, null);
            }
            this.session.unregisterSender(getConsumerId());
        }
        super.close();
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpLink
    public void flow() throws Exception {
        int credit = ((Sender) getEndpoint()).getCredit();
        LOG.trace("Flow: drain={} credit={}, remoteCredit={}", Boolean.valueOf(((Sender) getEndpoint()).getDrain()), Integer.valueOf(((Sender) getEndpoint()).getCredit()), Integer.valueOf(((Sender) getEndpoint()).getRemoteCredit()));
        if (!((Sender) getEndpoint()).getDrain() || (credit == this.currentCredit && this.draining)) {
            if (credit != this.currentCredit) {
                this.currentCredit = credit >= 0 ? credit : 0;
                ConsumerControl consumerControl = new ConsumerControl();
                consumerControl.setConsumerId(getConsumerId());
                consumerControl.setDestination(getDestination());
                consumerControl.setPrefetch(this.currentCredit);
                sendToActiveMQ(consumerControl, null);
                return;
            }
            return;
        }
        this.currentCredit = credit >= 0 ? credit : 0;
        this.draining = true;
        ConsumerControl consumerControl2 = new ConsumerControl();
        consumerControl2.setConsumerId(getConsumerId());
        consumerControl2.setDestination(getDestination());
        consumerControl2.setPrefetch(0);
        sendToActiveMQ(consumerControl2, null);
        MessagePull messagePull = new MessagePull();
        messagePull.setConsumerId(getConsumerId());
        messagePull.setDestination(getDestination());
        messagePull.setTimeout(-1L);
        messagePull.setAlwaysSignalDone(true);
        messagePull.setQuantity(this.currentCredit);
        sendToActiveMQ(messagePull, null);
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpLink
    public void delivery(Delivery delivery) throws Exception {
        MessageDispatch messageDispatch = (MessageDispatch) delivery.getContext();
        DeliveryState remoteState = delivery.getRemoteState();
        if (remoteState instanceof TransactionalState) {
            TransactionalState transactionalState = (TransactionalState) remoteState;
            LOG.trace("onDelivery: TX delivery state = {}", remoteState);
            if (transactionalState.getOutcome() != null && (transactionalState.getOutcome() instanceof Accepted)) {
                if (!delivery.remotelySettled()) {
                    TransactionalState transactionalState2 = new TransactionalState();
                    transactionalState2.setOutcome(Accepted.getInstance());
                    transactionalState2.setTxnId(((TransactionalState) remoteState).getTxnId());
                    delivery.disposition(transactionalState2);
                }
                settle(delivery, 0);
            }
        } else if (remoteState instanceof Accepted) {
            LOG.trace("onDelivery: accepted state = {}", remoteState);
            if (!delivery.remotelySettled()) {
                delivery.disposition(new Accepted());
            }
            settle(delivery, 4);
        } else if (remoteState instanceof Rejected) {
            messageDispatch.setRedeliveryCounter(messageDispatch.getRedeliveryCounter() + 1);
            LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", remoteState, Integer.valueOf(messageDispatch.getRedeliveryCounter()));
            settle(delivery, -1);
        } else if (remoteState instanceof Released) {
            LOG.trace("onDelivery: Released state = {}", remoteState);
            settle(delivery, -1);
        } else if (remoteState instanceof Modified) {
            Modified modified = (Modified) remoteState;
            if (modified.getDeliveryFailed().booleanValue()) {
                messageDispatch.setRedeliveryCounter(messageDispatch.getRedeliveryCounter() + 1);
            }
            LOG.trace("onDelivery: Modified state = {}, delivery count now {}", remoteState, Integer.valueOf(messageDispatch.getRedeliveryCounter()));
            int i = -1;
            Boolean undeliverableHere = modified.getUndeliverableHere();
            if (undeliverableHere != null && undeliverableHere.booleanValue()) {
                i = 1;
            }
            settle(delivery, i);
        }
        pumpOutbound();
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpLink
    public void commit() throws Exception {
        if (this.dispatchedInTx.isEmpty()) {
            return;
        }
        Iterator<MessageDispatch> it = this.dispatchedInTx.iterator();
        while (it.hasNext()) {
            MessageDispatch next = it.next();
            MessageAck messageAck = new MessageAck(next, (byte) 4, 1);
            messageAck.setFirstMessageId(next.getMessage().getMessageId());
            messageAck.setTransactionId(next.getMessage().getTransactionId());
            LOG.trace("Sending commit Ack to ActiveMQ: {}", messageAck);
            sendToActiveMQ(messageAck, new ResponseHandler() { // from class: org.apache.activemq.transport.amqp.protocol.AmqpSender.1
                @Override // org.apache.activemq.transport.amqp.ResponseHandler
                public void onResponse(AmqpProtocolConverter amqpProtocolConverter, Response response) throws IOException {
                    if (response.isException() && response.isException()) {
                        ((ExceptionResponse) response).getException().printStackTrace();
                        AmqpSender.this.getEndpoint().close();
                    }
                    AmqpSender.this.session.pumpProtonToSocket();
                }
            });
        }
        this.dispatchedInTx.clear();
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpLink
    public void rollback() throws Exception {
        synchronized (this.outbound) {
            LOG.trace("Rolling back {} messages for redelivery. ", Integer.valueOf(this.dispatchedInTx.size()));
            Iterator<MessageDispatch> it = this.dispatchedInTx.iterator();
            while (it.hasNext()) {
                MessageDispatch next = it.next();
                next.setRedeliveryCounter(next.getRedeliveryCounter() + 1);
                next.getMessage().setTransactionId(null);
                this.outbound.addFirst(next);
            }
            this.dispatchedInTx.clear();
        }
    }

    public void onMessageDispatch(MessageDispatch messageDispatch) throws Exception {
        if (isClosed()) {
            return;
        }
        synchronized (this.outbound) {
            this.outbound.addLast(messageDispatch);
        }
        pumpOutbound();
        this.session.pumpProtonToSocket();
    }

    public void onConsumerControl(ConsumerControl consumerControl) {
        if (consumerControl.isClose()) {
            close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed"));
            this.session.pumpProtonToSocket();
        }
    }

    public String toString() {
        return "AmqpSender {" + getConsumerId() + "}";
    }

    public ConsumerId getConsumerId() {
        return this.consumerInfo.getConsumerId();
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpLink
    public ActiveMQDestination getDestination() {
        return this.consumerInfo.getDestination();
    }

    @Override // org.apache.activemq.transport.amqp.protocol.AmqpLink
    public void setDestination(ActiveMQDestination activeMQDestination) {
        this.consumerInfo.setDestination(activeMQDestination);
    }

    public void pumpOutbound() throws Exception {
        while (!isClosed()) {
            while (this.currentBuffer != null) {
                int send = getEndpoint().send(this.currentBuffer.data, this.currentBuffer.offset, this.currentBuffer.length);
                if (send <= 0) {
                    return;
                }
                this.currentBuffer.moveHead(send);
                if (this.currentBuffer.length == 0) {
                    if (this.presettle) {
                        settle(this.currentDelivery, 4);
                    } else {
                        getEndpoint().advance();
                    }
                    this.currentBuffer = null;
                    this.currentDelivery = null;
                }
            }
            if (this.outbound.isEmpty()) {
                return;
            }
            MessageDispatch removeFirst = this.outbound.removeFirst();
            try {
                ActiveMQMessage activeMQMessage = null;
                if (removeFirst.getMessage() != null) {
                    if (removeFirst.getDestination().isTopic()) {
                        synchronized (removeFirst.getMessage()) {
                            activeMQMessage = (ActiveMQMessage) removeFirst.getMessage().copy();
                        }
                    } else {
                        activeMQMessage = (ActiveMQMessage) removeFirst.getMessage();
                    }
                    if (!activeMQMessage.getProperties().containsKey(this.MESSAGE_FORMAT_KEY)) {
                        activeMQMessage.setProperty(this.MESSAGE_FORMAT_KEY, 0);
                    }
                }
                ActiveMQMessage activeMQMessage2 = activeMQMessage;
                if (activeMQMessage2 == null) {
                    LOG.trace("Sender:[{}] browse done.", getEndpoint().getName());
                    getEndpoint().drained();
                    this.draining = false;
                    this.currentCredit = 0;
                } else {
                    activeMQMessage2.setRedeliveryCounter(removeFirst.getRedeliveryCounter());
                    activeMQMessage2.setReadOnlyBody(true);
                    EncodedMessage transform = this.outboundTransformer.transform(activeMQMessage2);
                    if (transform != null && transform.getLength() > 0) {
                        this.currentBuffer = new Buffer(transform.getArray(), transform.getArrayOffset(), transform.getLength());
                        if (this.presettle) {
                            this.currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
                        } else {
                            byte[] nextTag = this.tagCache.getNextTag();
                            this.currentDelivery = getEndpoint().delivery(nextTag, 0, nextTag.length);
                        }
                        this.currentDelivery.setContext(removeFirst);
                    }
                }
            } catch (Exception e) {
                LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage());
            }
        }
    }

    private void settle(final Delivery delivery, int i) throws Exception {
        byte[] tag = delivery.getTag();
        if (tag != null && tag.length > 0 && delivery.remotelySettled()) {
            this.tagCache.returnTag(tag);
        }
        if (i == -1) {
            delivery.settle();
            onMessageDispatch((MessageDispatch) delivery.getContext());
            return;
        }
        MessageDispatch messageDispatch = (MessageDispatch) delivery.getContext();
        this.lastDeliveredSequenceId = messageDispatch.getMessage().getMessageId().getBrokerSequenceId();
        MessageAck messageAck = new MessageAck();
        messageAck.setConsumerId(getConsumerId());
        messageAck.setFirstMessageId(messageDispatch.getMessage().getMessageId());
        messageAck.setLastMessageId(messageDispatch.getMessage().getMessageId());
        messageAck.setMessageCount(1);
        messageAck.setAckType((byte) i);
        messageAck.setDestination(messageDispatch.getDestination());
        DeliveryState remoteState = delivery.getRemoteState();
        if (remoteState != null && (remoteState instanceof TransactionalState)) {
            LocalTransactionId localTransactionId = new LocalTransactionId(this.session.getConnection().getConnectionId(), AmqpSupport.toLong(((TransactionalState) remoteState).getTxnId()));
            messageAck.setTransactionId(localTransactionId);
            messageDispatch.getMessage().setTransactionId(localTransactionId);
            this.dispatchedInTx.addFirst(messageDispatch);
        }
        LOG.trace("Sending Ack to ActiveMQ: {}", messageAck);
        sendToActiveMQ(messageAck, new ResponseHandler() { // from class: org.apache.activemq.transport.amqp.protocol.AmqpSender.2
            @Override // org.apache.activemq.transport.amqp.ResponseHandler
            public void onResponse(AmqpProtocolConverter amqpProtocolConverter, Response response) throws IOException {
                if (!response.isException()) {
                    delivery.settle();
                } else if (response.isException()) {
                    ((ExceptionResponse) response).getException().printStackTrace();
                    AmqpSender.this.getEndpoint().close();
                }
                AmqpSender.this.session.pumpProtonToSocket();
            }
        });
    }
}
