package org.apache.activemq.transport.amqp.client;

import jakarta.jms.InvalidDestinationException;
import jakarta.jms.JMSException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/transport/amqp/client/AmqpReceiver.class */
public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final AtomicBoolean closed;
    private final BlockingQueue<AmqpMessage> prefetch;
    private final AmqpSession session;
    private final String address;
    private final String receiverId;
    private final Source userSpecifiedSource;
    private final SenderSettleMode userSpecifiedSenderSettlementMode;
    private final ReceiverSettleMode userSpecifiedReceiverSettlementMode;
    private String subscriptionName;
    private String selector;
    private boolean presettle;
    private boolean noLocal;
    private Map<Symbol, Object> properties;
    private AsyncResult pullRequest;
    private AsyncResult stopRequest;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/transport/amqp/client/AmqpReceiver$ScheduledRequest.class */
    public static final class ScheduledRequest implements AsyncResult {
        private final ScheduledFuture<?> sheduledTask;
        private final AsyncResult origRequest;

        public ScheduledRequest(ScheduledFuture<?> scheduledFuture, AsyncResult asyncResult) {
            this.sheduledTask = scheduledFuture;
            this.origRequest = asyncResult;
        }

        @Override // org.apache.activemq.transport.amqp.client.util.AsyncResult
        public void onFailure(Throwable th) {
            this.sheduledTask.cancel(false);
            this.origRequest.onFailure(th);
        }

        @Override // org.apache.activemq.transport.amqp.client.util.AsyncResult
        public void onSuccess() {
            if (this.sheduledTask.cancel(false)) {
                this.origRequest.onSuccess();
            }
        }

        @Override // org.apache.activemq.transport.amqp.client.util.AsyncResult
        public boolean isComplete() {
            return this.origRequest.isComplete();
        }
    }

    public int getPrefetchSize() {
        return this.prefetch.size();
    }

    public AmqpReceiver(AmqpSession amqpSession, String str, String str2) {
        this(amqpSession, str, str2, null, null);
    }

    public AmqpReceiver(AmqpSession amqpSession, String str, String str2, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
        this.closed = new AtomicBoolean();
        this.prefetch = new LinkedBlockingDeque();
        if (str != null && str.isEmpty()) {
            throw new IllegalArgumentException("Address cannot be empty.");
        }
        this.userSpecifiedSource = null;
        this.session = amqpSession;
        this.address = str;
        this.receiverId = str2;
        this.userSpecifiedSenderSettlementMode = senderSettleMode;
        this.userSpecifiedReceiverSettlementMode = receiverSettleMode;
    }

    public AmqpReceiver(AmqpSession amqpSession, Source source, String str) {
        this.closed = new AtomicBoolean();
        this.prefetch = new LinkedBlockingDeque();
        if (source == null) {
            throw new IllegalArgumentException("User specified Source cannot be null");
        }
        this.session = amqpSession;
        this.address = source.getAddress();
        this.receiverId = str;
        this.userSpecifiedSource = source;
        this.userSpecifiedSenderSettlementMode = null;
        this.userSpecifiedReceiverSettlementMode = null;
    }

    public void close() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            ClientFuture clientFuture = new ClientFuture();
            this.session.getScheduler().execute(() -> {
                checkClosed();
                close(clientFuture);
                this.session.pumpToProtonTransport(clientFuture);
            });
            clientFuture.sync();
        }
    }

    public void setProperties(Map<Symbol, Object> map) {
        if (getEndpoint() != null) {
            throw new IllegalStateException("Endpoint already established");
        }
        this.properties = map;
    }

    public void detach() throws IOException {
        if (this.closed.compareAndSet(false, true)) {
            ClientFuture clientFuture = new ClientFuture();
            this.session.getScheduler().execute(() -> {
                checkClosed();
                detach(clientFuture);
                this.session.pumpToProtonTransport(clientFuture);
            });
            clientFuture.sync();
        }
    }

    public AmqpSession getSession() {
        return this.session;
    }

    public String getAddress() {
        return this.address;
    }

    public AmqpMessage receive() throws Exception {
        checkClosed();
        return this.prefetch.take();
    }

    public AmqpMessage receive(long j, TimeUnit timeUnit) throws Exception {
        checkClosed();
        return this.prefetch.poll(j, timeUnit);
    }

    public AmqpMessage receiveNoWait() throws Exception {
        checkClosed();
        return this.prefetch.poll();
    }

    public AmqpMessage pull() throws IOException {
        return pull(-1L, TimeUnit.MILLISECONDS);
    }

    public AmqpMessage pullImmediate() throws IOException {
        return pull(0L, TimeUnit.MILLISECONDS);
    }

    public AmqpMessage pull(long j, TimeUnit timeUnit) throws IOException {
        checkClosed();
        ClientFuture clientFuture = new ClientFuture();
        this.session.getScheduler().execute(() -> {
            checkClosed();
            long millis = timeUnit.toMillis(j);
            try {
                logger.trace("Pull on Receiver {} with timeout = {}", getSubscriptionName(), Long.valueOf(millis));
                if (millis < 0) {
                    if (getEndpoint().getCredit() == 0) {
                        logger.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
                        getEndpoint().flow(1);
                    }
                    this.pullRequest = clientFuture;
                } else if (millis == 0) {
                    if (getEndpoint().getCredit() == 0) {
                        logger.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
                        getEndpoint().flow(1);
                    }
                    stop(clientFuture);
                } else if (millis > 0) {
                    if (getEndpoint().getCredit() == 0) {
                        logger.trace("Receiver {} granting 1 additional credit for pull.", getSubscriptionName());
                        getEndpoint().flow(1);
                    }
                    stopOnSchedule(millis, clientFuture);
                }
                this.session.pumpToProtonTransport(clientFuture);
            } catch (Exception e) {
                clientFuture.onFailure(e);
            }
        });
        clientFuture.sync();
        return this.prefetch.poll();
    }

    public void flow(int i) throws IOException {
        flow(i, false);
    }

    public void flow(int i, boolean z) throws IOException {
        checkClosed();
        ClientFuture clientFuture = new ClientFuture();
        this.session.getScheduler().execute(() -> {
            checkClosed();
            try {
                getEndpoint().flow(i);
                if (!z) {
                    this.session.pumpToProtonTransport(clientFuture);
                }
                clientFuture.onSuccess();
            } catch (Exception e) {
                clientFuture.onFailure(e);
            }
        });
        clientFuture.sync();
    }

    public void drain(int i) throws IOException {
        checkClosed();
        ClientFuture clientFuture = new ClientFuture();
        this.session.getScheduler().execute(() -> {
            checkClosed();
            try {
                getEndpoint().drain(i);
                this.session.pumpToProtonTransport(clientFuture);
                clientFuture.onSuccess();
            } catch (Exception e) {
                clientFuture.onFailure(e);
            }
        });
        clientFuture.sync();
    }

    public void stop() throws IOException {
        checkClosed();
        ClientFuture clientFuture = new ClientFuture();
        this.session.getScheduler().execute(() -> {
            checkClosed();
            try {
                stop(clientFuture);
                this.session.pumpToProtonTransport(clientFuture);
            } catch (Exception e) {
                clientFuture.onFailure(e);
            }
        });
        clientFuture.sync();
    }

    public void accept(Delivery delivery) throws IOException {
        accept(delivery, this.session, true);
    }

    public void accept(Delivery delivery, boolean z) throws IOException {
        accept(delivery, this.session, z);
    }

    public void accept(Delivery delivery, AmqpSession amqpSession) throws IOException {
        accept(delivery, amqpSession, true);
    }

    public void accept(Delivery delivery, AmqpSession amqpSession, boolean z) throws IOException {
        checkClosed();
        if (delivery == null) {
            throw new IllegalArgumentException("Delivery to accept cannot be null");
        }
        if (amqpSession == null) {
            throw new IllegalArgumentException("Session given cannot be null");
        }
        if (amqpSession.getConnection() != this.session.getConnection()) {
            throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver.");
        }
        ClientFuture clientFuture = new ClientFuture();
        amqpSession.getScheduler().execute(() -> {
            checkClosed();
            try {
                if (!delivery.isSettled()) {
                    if (amqpSession.isInTransaction()) {
                        Binary remoteTxId = amqpSession.getTransactionId().getRemoteTxId();
                        if (remoteTxId != null) {
                            TransactionalState transactionalState = new TransactionalState();
                            transactionalState.setOutcome(Accepted.getInstance());
                            transactionalState.setTxnId(remoteTxId);
                            delivery.disposition(transactionalState);
                            amqpSession.getTransactionContext().registerTxConsumer(this);
                        }
                    } else {
                        delivery.disposition(Accepted.getInstance());
                    }
                    if (z) {
                        delivery.settle();
                    }
                }
                amqpSession.pumpToProtonTransport(clientFuture);
                clientFuture.onSuccess();
            } catch (Exception e) {
                clientFuture.onFailure(e);
            }
        });
        clientFuture.sync();
    }

    public void modified(Delivery delivery, Boolean bool, Boolean bool2) throws IOException {
        checkClosed();
        if (delivery == null) {
            throw new IllegalArgumentException("Delivery to reject cannot be null");
        }
        ClientFuture clientFuture = new ClientFuture();
        this.session.getScheduler().execute(() -> {
            checkClosed();
            try {
                if (!delivery.isSettled()) {
                    Modified modified = new Modified();
                    modified.setUndeliverableHere(bool2);
                    modified.setDeliveryFailed(bool);
                    delivery.disposition(modified);
                    delivery.settle();
                    this.session.pumpToProtonTransport(clientFuture);
                }
                clientFuture.onSuccess();
            } catch (Exception e) {
                clientFuture.onFailure(e);
            }
        });
        clientFuture.sync();
    }

    public void release(Delivery delivery) throws IOException {
        checkClosed();
        if (delivery == null) {
            throw new IllegalArgumentException("Delivery to release cannot be null");
        }
        ClientFuture clientFuture = new ClientFuture();
        this.session.getScheduler().execute(() -> {
            checkClosed();
            try {
                if (!delivery.isSettled()) {
                    delivery.disposition(Released.getInstance());
                    delivery.settle();
                    this.session.pumpToProtonTransport(clientFuture);
                }
                clientFuture.onSuccess();
            } catch (Exception e) {
                clientFuture.onFailure(e);
            }
        });
        clientFuture.sync();
    }

    public void reject(Delivery delivery) throws IOException {
        checkClosed();
        if (delivery == null) {
            throw new IllegalArgumentException("Delivery to release cannot be null");
        }
        ClientFuture clientFuture = new ClientFuture();
        this.session.getScheduler().execute(() -> {
            checkClosed();
            try {
                if (!delivery.isSettled()) {
                    delivery.disposition(new Rejected());
                    delivery.settle();
                    this.session.pumpToProtonTransport(clientFuture);
                }
                clientFuture.onSuccess();
            } catch (Exception e) {
                clientFuture.onFailure(e);
            }
        });
        clientFuture.sync();
    }

    public Receiver getReceiver() {
        return UnmodifiableProxy.receiverProxy(getEndpoint());
    }

    public boolean isPresettle() {
        return this.presettle;
    }

    public void setPresettle(boolean z) {
        this.presettle = z;
    }

    public boolean isDurable() {
        return this.subscriptionName != null;
    }

    public String getSubscriptionName() {
        return this.subscriptionName;
    }

    public void setSubscriptionName(String str) {
        this.subscriptionName = str;
    }

    public String getSelector() {
        return this.selector;
    }

    public void setSelector(String str) {
        this.selector = str;
    }

    public boolean isNoLocal() {
        return this.noLocal;
    }

    public void setNoLocal(boolean z) {
        this.noLocal = z;
    }

    public long getDrainTimeout() {
        return this.session.getConnection().getDrainTimeout();
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doOpen() {
        Source source = this.userSpecifiedSource;
        Target target = new Target();
        if (source == null && this.address != null) {
            source = new Source();
            source.setAddress(this.address);
            configureSource(source);
        }
        String str = this.receiverId + ":" + this.address;
        if (getSubscriptionName() != null && !getSubscriptionName().isEmpty()) {
            str = getSubscriptionName();
        }
        Receiver receiver = this.session.getEndpoint().receiver(str);
        receiver.setSource(source);
        receiver.setTarget(target);
        if (this.userSpecifiedSenderSettlementMode != null) {
            receiver.setSenderSettleMode(this.userSpecifiedSenderSettlementMode);
            if (SenderSettleMode.SETTLED.equals(this.userSpecifiedSenderSettlementMode)) {
                setPresettle(true);
            }
        } else if (isPresettle()) {
            receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
        } else {
            receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        }
        if (this.userSpecifiedReceiverSettlementMode != null) {
            receiver.setReceiverSettleMode(this.userSpecifiedReceiverSettlementMode);
        } else {
            receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        }
        if (this.properties != null) {
            receiver.setProperties(this.properties);
        }
        setEndpoint(receiver);
        super.doOpen();
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doOpenCompletion() {
        if (getEndpoint().getRemoteSource() != null) {
            super.doOpenCompletion();
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doClose() {
        getEndpoint().close();
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doDetach() {
        getEndpoint().detach();
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected Exception getOpenAbortException() {
        return getEndpoint().getRemoteSource() != null ? super.getOpenAbortException() : new InvalidDestinationException("Link creation was refused");
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doOpenInspection() {
        try {
            getStateInspector().inspectOpenedResource(getReceiver());
        } catch (Throwable th) {
            getStateInspector().markAsInvalid(th.getMessage());
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doClosedInspection() {
        try {
            getStateInspector().inspectClosedResource(getReceiver());
        } catch (Throwable th) {
            getStateInspector().markAsInvalid(th.getMessage());
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource
    protected void doDetachedInspection() {
        try {
            getStateInspector().inspectDetachedResource(getReceiver());
        } catch (Throwable th) {
            getStateInspector().markAsInvalid(th.getMessage());
        }
    }

    protected void configureSource(Source source) {
        HashMap hashMap = new HashMap();
        Symbol[] symbolArr = {Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
        if (getSubscriptionName() == null || getSubscriptionName().isEmpty()) {
            source.setDurable(TerminusDurability.NONE);
            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
        } else {
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
            source.setDistributionMode(org.apache.activemq.transport.amqp.AmqpSupport.COPY);
        }
        source.setOutcomes(symbolArr);
        Modified modified = new Modified();
        modified.setDeliveryFailed(true);
        modified.setUndeliverableHere(false);
        source.setDefaultOutcome(modified);
        if (isNoLocal()) {
            hashMap.put(org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
        }
        if (getSelector() != null && !getSelector().trim().equals("")) {
            hashMap.put(org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(getSelector()));
        }
        if (hashMap.isEmpty()) {
            return;
        }
        source.setFilter(hashMap);
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource, org.apache.activemq.transport.amqp.client.AmqpEventSink
    public void processDeliveryUpdates(AmqpConnection amqpConnection, Delivery delivery) throws IOException {
        Delivery current;
        do {
            current = getEndpoint().current();
            if (current != null) {
                if (!current.isReadable() || current.isPartial()) {
                    logger.trace("{} has a partial incoming Message(s), deferring.", this);
                    current = null;
                } else {
                    logger.trace("{} has incoming Message(s).", this);
                    try {
                        processDelivery(current);
                        getEndpoint().advance();
                    } catch (Exception e) {
                        throw IOExceptionSupport.create(e);
                    }
                }
            } else if (getEndpoint().getRemoteCredit() <= 0 && this.stopRequest != null) {
                this.stopRequest.onSuccess();
                this.stopRequest = null;
            }
        } while (current != null);
        super.processDeliveryUpdates(amqpConnection, delivery);
    }

    private void processDelivery(Delivery delivery) throws Exception {
        doDeliveryInspection(delivery);
        try {
            AmqpMessage amqpMessage = new AmqpMessage(this, decodeIncomingMessage(delivery), delivery);
            delivery.setContext(amqpMessage);
            this.prefetch.add(amqpMessage);
            if (this.pullRequest != null) {
                this.pullRequest.onSuccess();
                this.pullRequest = null;
            }
        } catch (Exception e) {
            logger.warn("Error on transform: {}", e.getMessage());
            deliveryFailed(delivery, true);
        }
    }

    private void doDeliveryInspection(Delivery delivery) {
        try {
            getStateInspector().inspectDelivery(getReceiver(), delivery);
        } catch (Throwable th) {
            getStateInspector().markAsInvalid(th.getMessage());
        }
    }

    @Override // org.apache.activemq.transport.amqp.client.AmqpAbstractResource, org.apache.activemq.transport.amqp.client.AmqpEventSink
    public void processFlowUpdates(AmqpConnection amqpConnection) throws IOException {
        if (this.pullRequest != null || this.stopRequest != null) {
            Receiver endpoint = getEndpoint();
            if (endpoint.getRemoteCredit() <= 0 && endpoint.getQueued() == 0) {
                if (this.pullRequest != null) {
                    this.pullRequest.onSuccess();
                    this.pullRequest = null;
                }
                if (this.stopRequest != null) {
                    this.stopRequest.onSuccess();
                    this.stopRequest = null;
                }
            }
        }
        logger.trace("Consumer {} flow updated, remote credit = {}", getSubscriptionName(), Integer.valueOf(getEndpoint().getRemoteCredit()));
        super.processFlowUpdates(amqpConnection);
    }

    protected Message decodeIncomingMessage(Delivery delivery) {
        byte[] bArr = new byte[2048];
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        while (true) {
            int recv = getEndpoint().recv(bArr, 0, bArr.length);
            if (recv <= 0) {
                break;
            }
            byteArrayOutputStream.write(bArr, 0, recv);
        }
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        try {
            Message create = Message.Factory.create();
            create.decode(byteArray, 0, byteArray.length);
            return create;
        } finally {
            try {
                byteArrayOutputStream.close();
            } catch (IOException e) {
            }
        }
    }

    protected void deliveryFailed(Delivery delivery, boolean z) {
        Modified modified = new Modified();
        modified.setUndeliverableHere(true);
        modified.setDeliveryFailed(true);
        delivery.disposition(modified);
        delivery.settle();
        if (z) {
            getEndpoint().flow(1);
        }
    }

    private void stop(AsyncResult asyncResult) {
        Receiver endpoint = getEndpoint();
        if (endpoint.getRemoteCredit() <= 0) {
            if (endpoint.getQueued() == 0) {
                asyncResult.onSuccess();
                return;
            } else {
                this.stopRequest = asyncResult;
                return;
            }
        }
        this.stopRequest = asyncResult;
        endpoint.drain(0);
        if (getDrainTimeout() > 0) {
            this.stopRequest = new ScheduledRequest(getSession().getScheduler().schedule(() -> {
                JMSException amqpOperationTimedOutException = new AmqpOperationTimedOutException("Remote did not respond to a drain request in time");
                locallyClosed(this.session.getConnection(), amqpOperationTimedOutException);
                this.stopRequest.onFailure(amqpOperationTimedOutException);
                this.session.pumpToProtonTransport(this.stopRequest);
            }, getDrainTimeout(), TimeUnit.MILLISECONDS), this.stopRequest);
        }
    }

    private void stopOnSchedule(long j, AsyncResult asyncResult) {
        logger.trace("Receiver {} scheduling stop", this);
        this.stopRequest = new ScheduledRequest(getSession().getScheduler().schedule(() -> {
            if (getEndpoint().getRemoteCredit() != 0) {
                stop(asyncResult);
                this.session.pumpToProtonTransport(asyncResult);
            }
        }, j, TimeUnit.MILLISECONDS), asyncResult);
    }

    public String toString() {
        return getClass().getSimpleName() + "{ address = " + this.address + "}";
    }

    private void checkClosed() {
        if (isClosed()) {
            throw new IllegalStateException("Receiver is already closed");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void preCommit() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void preRollback() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void postCommit() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void postRollback() {
    }
}
