package org.apache.activemq.transport.amqp.client;

import jakarta.jms.InvalidDestinationException;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/amqp/client/AmqpSender.class */
public class AmqpSender extends AmqpAbstractResource<Sender> {
    public static final long DEFAULT_SEND_TIMEOUT = 15000;
    private final AmqpTransferTagGenerator tagGenerator;
    private final AtomicBoolean closed;
    private final AmqpSession session;
    private final String address;
    private final String senderId;
    private final Target userSpecifiedTarget;
    private final SenderSettleMode userSpecifiedSenderSettlementMode;
    private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
    private final Symbol[] outcomes;
    private boolean presettle;
    private long sendTimeout;
    private final Set<Delivery> pending;
    private byte[] encodeBuffer;
    private Symbol[] desiredCapabilities;
    private Symbol[] offeredCapabilities;
    private Map<Symbol, Object> properties;
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    public static final Symbol[] DEFAULT_OUTCOMES = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};

    public AmqpSender(AmqpSession amqpSession, String str, String str2) {
        this(amqpSession, str, str2, null, null, DEFAULT_OUTCOMES);
    }

    public AmqpSender(AmqpSession amqpSession, String str, String str2, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, Symbol[] symbolArr) {
        this.tagGenerator = new AmqpTransferTagGenerator(true);
        this.closed = new AtomicBoolean();
        this.sendTimeout = DEFAULT_SEND_TIMEOUT;
        this.pending = new LinkedHashSet();
        this.encodeBuffer = new byte[8192];
        if (str != null && str.isEmpty()) {
            throw new IllegalArgumentException("Address cannot be empty.");
        }
        this.session = amqpSession;
        this.address = str;
        this.senderId = str2;
        this.userSpecifiedTarget = null;
        this.userSpecifiedSenderSettlementMode = senderSettleMode;
        this.userSpecifiedReceiverSettlementMode = receiverSettleMode;
        this.outcomes = symbolArr;
    }

    public AmqpSender(AmqpSession amqpSession, Target target, String str) {
        this.tagGenerator = new AmqpTransferTagGenerator(true);
        this.closed = new AtomicBoolean();
        this.sendTimeout = DEFAULT_SEND_TIMEOUT;
        this.pending = new LinkedHashSet();
        this.encodeBuffer = new byte[8192];
        if (target == null) {
            throw new IllegalArgumentException("User specified Target cannot be null");
        }
        this.session = amqpSession;
        this.address = target.getAddress();
        this.senderId = str;
        this.userSpecifiedTarget = target;
        this.userSpecifiedSenderSettlementMode = null;
        this.userSpecifiedReceiverSettlementMode = null;
        this.outcomes = DEFAULT_OUTCOMES;
    }

    public void send(AmqpMessage amqpMessage) throws IOException {
        checkClosed();
        send(amqpMessage, null);
    }

    public void send(AmqpMessage amqpMessage, AmqpTransactionId amqpTransactionId) throws IOException {
        checkClosed();
        ClientFuture clientFuture = new ClientFuture();
        this.session.getScheduler().execute(() -> {
            try {
                doSend(amqpMessage, clientFuture, amqpTransactionId);
                this.session.pumpToProtonTransport(clientFuture);
            } catch (Exception e) {
                clientFuture.onFailure(e);
                this.session.getConnection().fireClientException(e);
            }
        });
        if (this.sendTimeout <= 0) {
            clientFuture.sync();
        } else {
            clientFuture.sync(this.sendTimeout, TimeUnit.MILLISECONDS);
        }
    }

    public void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            ClientFuture clientFuture = new ClientFuture();
            this.session.getScheduler().execute(() -> {
                checkClosed();
                close(clientFuture);
                this.session.pumpToProtonTransport(clientFuture);
            });
            clientFuture.sync();
        }
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public Sender getSender() {
        return UnmodifiableProxy.senderProxy(getEndpoint());
    }

    public String getAddress() {
        return this.address;
    }

    public boolean isPresettle() {
        return this.presettle;
    }

    public void setPresettle(boolean z) {
        this.presettle = z;
    }

    public long getSendTimeout() {
        return this.sendTimeout;
    }

    public void setSendTimeout(long j) {
        this.sendTimeout = j;
    }

    public void setDesiredCapabilities(Symbol[] symbolArr) {
        if (getEndpoint() != null) {
            throw new IllegalStateException("Endpoint already established");
        }
        this.desiredCapabilities = symbolArr;
    }

    public void setOfferedCapabilities(Symbol[] symbolArr) {
        if (getEndpoint() != null) {
            throw new IllegalStateException("Endpoint already established");
        }
        this.offeredCapabilities = symbolArr;
    }

    public void setProperties(Map<Symbol, Object> map) {
        if (getEndpoint() != null) {
            throw new IllegalStateException("Endpoint already established");
        }
        this.properties = map;
    }

    private void checkClosed() {
        if (isClosed()) {
            throw new IllegalStateException("Sender is already closed");
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doOpen() {
        Source source = new Source();
        source.setAddress(this.senderId);
        source.setOutcomes(this.outcomes);
        Target target = this.userSpecifiedTarget;
        if (target == null) {
            target = new Target();
            target.setAddress(this.address);
        }
        Sender sender = this.session.getEndpoint().sender(this.senderId + ":" + this.address);
        sender.setSource(source);
        sender.setTarget(target);
        if (this.userSpecifiedSenderSettlementMode != null) {
            sender.setSenderSettleMode(this.userSpecifiedSenderSettlementMode);
            if (SenderSettleMode.SETTLED.equals(this.userSpecifiedSenderSettlementMode)) {
                this.presettle = true;
            }
        } else if (this.presettle) {
            sender.setSenderSettleMode(SenderSettleMode.SETTLED);
        } else {
            sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        }
        if (this.userSpecifiedReceiverSettlementMode != null) {
            sender.setReceiverSettleMode(this.userSpecifiedReceiverSettlementMode);
        } else {
            sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        }
        sender.setDesiredCapabilities(this.desiredCapabilities);
        sender.setOfferedCapabilities(this.offeredCapabilities);
        sender.setProperties(this.properties);
        setEndpoint(sender);
        super.doOpen();
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doOpenCompletion() {
        if (getEndpoint().getRemoteTarget() != null) {
            super.doOpenCompletion();
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doOpenInspection() {
        try {
            getStateInspector().inspectOpenedResource(getSender());
        } catch (Throwable th) {
            getStateInspector().markAsInvalid(th.getMessage());
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doClosedInspection() {
        try {
            getStateInspector().inspectClosedResource(getSender());
        } catch (Throwable th) {
            getStateInspector().markAsInvalid(th.getMessage());
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doDetachedInspection() {
        try {
            getStateInspector().inspectDetachedResource(getSender());
        } catch (Throwable th) {
            getStateInspector().markAsInvalid(th.getMessage());
        }
    }

    protected void doDeliveryUpdateInspection(Delivery delivery) {
        try {
            getStateInspector().inspectDeliveryUpdate(getSender(), delivery);
        } catch (Throwable th) {
            getStateInspector().markAsInvalid(th.getMessage());
        }
    }

    private void doCreditInspection() {
        try {
            getStateInspector().inspectCredit(getSender());
        } catch (Throwable th) {
            getStateInspector().markAsInvalid(th.getMessage());
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected Exception getOpenAbortException() {
        return getEndpoint().getRemoteTarget() != null ? super.getOpenAbortException() : new InvalidDestinationException("Link creation was refused");
    }

    private void doSend(AmqpMessage amqpMessage, AsyncResult asyncResult, AmqpTransactionId amqpTransactionId) throws Exception {
        Delivery delivery;
        logger.trace("Producer sending message: {}", amqpMessage);
        if (this.presettle) {
            delivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
        } else {
            byte[] nextTag = this.tagGenerator.getNextTag();
            delivery = getEndpoint().delivery(nextTag, 0, nextTag.length);
        }
        delivery.setContext(asyncResult);
        Binary binary = null;
        if (amqpTransactionId != null) {
            binary = amqpTransactionId.getRemoteTxId();
        } else if (this.session.isInTransaction()) {
            binary = this.session.getTransactionId().getRemoteTxId();
        }
        if (binary != null) {
            TransactionalState transactionalState = new TransactionalState();
            transactionalState.setTxnId(binary);
            delivery.disposition(transactionalState);
        }
        encodeAndSend(amqpMessage.getWrappedMessage(), delivery);
        if (this.presettle) {
            delivery.settle();
            asyncResult.onSuccess();
        } else {
            this.pending.add(delivery);
            getEndpoint().advance();
        }
    }

    private void encodeAndSend(Message message, Delivery delivery) throws IOException {
        int encode;
        while (true) {
            try {
                encode = message.encode(this.encodeBuffer, 0, this.encodeBuffer.length);
                break;
            } catch (BufferOverflowException e) {
                this.encodeBuffer = new byte[this.encodeBuffer.length * 2];
            }
        }
        int i = 0;
        while (true) {
            int send = getEndpoint().send(this.encodeBuffer, i, encode - i);
            if (send > 0) {
                i += send;
                if (encode - i == 0) {
                    return;
                }
            } else {
                logger.warn("{} failed to send any data from current Message.", this);
            }
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource, org.apache.activemq.transport.amqp.client.AmqpEventSink
    public void processFlowUpdates(AmqpConnection amqpConnection) throws IOException {
        logger.trace("Sender {} flow update, credit = {}", Integer.valueOf(getEndpoint().getCredit()));
        doCreditInspection();
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource, org.apache.activemq.transport.amqp.client.AmqpEventSink
    public void processDeliveryUpdates(AmqpConnection amqpConnection, Delivery delivery) throws IOException {
        Outcome outcome;
        ArrayList arrayList = new ArrayList();
        for (Delivery delivery2 : this.pending) {
            Outcome remoteState = delivery2.getRemoteState();
            if (remoteState != null) {
                doDeliveryUpdateInspection(delivery2);
                if (remoteState instanceof TransactionalState) {
                    logger.trace("State of delivery is Transactional, retrieving outcome: {}", remoteState);
                    outcome = ((TransactionalState) remoteState).getOutcome();
                } else if (remoteState instanceof Outcome) {
                    outcome = remoteState;
                } else {
                    logger.warn("Message send updated with unsupported state: {}", remoteState);
                    outcome = null;
                }
                AsyncResult asyncResult = (AsyncResult) delivery2.getContext();
                Exception exc = null;
                if (outcome instanceof Accepted) {
                    logger.trace("Outcome of delivery was accepted: {}", delivery2);
                    if (asyncResult != null && !asyncResult.isComplete()) {
                        asyncResult.onSuccess();
                    }
                } else if (outcome instanceof Rejected) {
                    logger.trace("Outcome of delivery was rejected: {}", delivery2);
                    ErrorCondition error = ((Rejected) outcome).getError();
                    if (error == null) {
                        error = getEndpoint().getRemoteCondition();
                    }
                    exc = AmqpSupport.convertToException(error);
                } else if (outcome instanceof Released) {
                    logger.trace("Outcome of delivery was released: {}", delivery2);
                    exc = new IOException("Delivery failed: released by receiver");
                } else if (outcome instanceof Modified) {
                    logger.trace("Outcome of delivery was modified: {}", delivery2);
                    exc = new IOException("Delivery failed: failure at remote");
                }
                if (exc != null) {
                    if (asyncResult == null || asyncResult.isComplete()) {
                        amqpConnection.fireClientException(exc);
                    } else {
                        asyncResult.onFailure(exc);
                    }
                }
                this.tagGenerator.returnTag(delivery2.getTag());
                delivery2.settle();
                arrayList.add(delivery2);
            }
        }
        this.pending.removeAll(arrayList);
    }

    public String toString() {
        return getClass().getSimpleName() + "{ address = " + this.address + "}";
    }
}
