package org.apache.activemq.artemis.protocol.amqp.proton;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.class */
public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler {
    private static final Logger log = Logger.getLogger(ProtonServerSenderContext.class);
    private static final Symbol COPY = Symbol.valueOf("copy");
    private static final Symbol TOPIC = Symbol.valueOf("topic");
    private Object brokerConsumer;
    protected final AMQPSessionContext protonSession;
    protected final Sender sender;
    protected final AMQPConnectionContext connection;
    protected final AMQPSessionCallback sessionSPI;
    protected boolean closed = false;
    protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);

    public ProtonServerSenderContext(AMQPConnectionContext aMQPConnectionContext, Sender sender, AMQPSessionContext aMQPSessionContext, AMQPSessionCallback aMQPSessionCallback) {
        this.connection = aMQPConnectionContext;
        this.sender = sender;
        this.protonSession = aMQPSessionContext;
        this.sessionSPI = aMQPSessionCallback;
    }

    public Object getBrokerConsumer() {
        return this.brokerConsumer;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onFlow(int i, boolean z) {
        this.creditsSemaphore.setCredits(i);
        this.sessionSPI.onFlowConsumer(this.brokerConsumer, i, z);
    }

    public Sender getSender() {
        return this.sender;
    }

    public void start() throws ActiveMQAMQPException {
        this.sessionSPI.start();
        try {
            if (this.brokerConsumer != null) {
                this.sessionSPI.startSender(this.brokerConsumer);
            }
        } catch (Exception e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage());
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
    public void initialise() throws Exception {
        String address;
        Map.Entry<Symbol, DescribedType> findFilter;
        Map.Entry<Symbol, DescribedType> findFilter2;
        super.initialise();
        Source source = (Source) this.sender.getRemoteSource();
        String str = null;
        HashMap hashMap = new HashMap();
        if (source != null && (findFilter2 = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS)) != null) {
            str = findFilter2.getValue().getDescribed().toString();
            try {
                SelectorParser.parse(str);
                hashMap.put(findFilter2.getKey(), findFilter2.getValue());
            } catch (FilterException e) {
                throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
            }
        }
        boolean z = hasCapabilities(TOPIC, source) || isPubSub(source);
        if (z && (findFilter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS)) != null) {
            String str2 = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.sender.getSession().getConnection().getRemoteContainer() + "'";
            str = str != null ? str + " AND " + str2 : str2;
            hashMap.put(findFilter.getKey(), findFilter.getValue());
        }
        if (source == null) {
            address = createQueueName(getClientId(), this.sender.getName());
            QueueQueryResult queueQuery = this.sessionSPI.queueQuery(address, false);
            if (!queueQuery.isExists()) {
                throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + this.sender.getName());
            }
            source = new Source();
            source.setAddress(address);
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDistributionMode(COPY);
            source.setCapabilities(new Symbol[]{TOPIC});
            SimpleString filterString = queueQuery.getFilterString();
            if (filterString != null) {
                str = filterString.toString();
                boolean z2 = false;
                String str3 = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + this.sender.getSession().getConnection().getRemoteContainer() + "'";
                if (str.endsWith(str3)) {
                    str = str.length() > str3.length() ? str.substring(0, str.length() - (" AND " + str3).length()) : null;
                    z2 = true;
                }
                if (z2) {
                    hashMap.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
                }
                if (str != null && !str.trim().isEmpty()) {
                    hashMap.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(str));
                }
            }
            this.sender.setSource(source);
        } else if (source.getDynamic()) {
            address = UUID.randomUUID().toString();
            try {
                this.sessionSPI.createTemporaryQueue(address);
                source.setAddress(address);
            } catch (Exception e2) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e2.getMessage());
            }
        } else {
            if (!z) {
                address = source.getAddress();
            } else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {
                address = createQueueName(getClientId(), this.sender.getName());
                QueueQueryResult queueQuery2 = this.sessionSPI.queueQuery(address, false);
                if (!queueQuery2.isExists()) {
                    this.sessionSPI.createDurableQueue(source.getAddress(), address, str);
                } else if (!Objects.equals(queueQuery2.getFilterString(), SimpleString.toSimpleString(str)) || (this.sender.getSource() != null && !this.sender.getSource().getAddress().equals(queueQuery2.getAddress().toString()))) {
                    if (queueQuery2.getConsumerCount() != 0) {
                        throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                    }
                    this.sessionSPI.deleteQueue(address);
                    this.sessionSPI.createDurableQueue(source.getAddress(), address, str);
                }
            } else {
                address = UUID.randomUUID().toString();
                try {
                    this.sessionSPI.createTemporaryQueue(source.getAddress(), address, str);
                } catch (Exception e3) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e3.getMessage());
                }
            }
            if (address == null) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
            }
            try {
                if (!this.sessionSPI.queueQuery(address, !z).isExists()) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                }
            } catch (Exception e4) {
                throw new ActiveMQAMQPInternalErrorException(e4.getMessage(), e4);
            } catch (ActiveMQAMQPNotFoundException e5) {
                throw e5;
            }
        }
        source.setFilter(hashMap.isEmpty() ? null : hashMap);
        try {
            this.brokerConsumer = this.sessionSPI.createSender(this, address, z ? null : str, (z || source.getDistributionMode() == null || !source.getDistributionMode().equals(COPY)) ? false : true);
        } catch (Exception e6) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e6.getMessage());
        }
    }

    protected String getClientId() {
        return this.connection.getRemoteContainer();
    }

    private boolean isPubSub(Source source) {
        String pubSubPrefix = this.sessionSPI.getPubSubPrefix();
        return (source == null || pubSubPrefix == null || source.getAddress() == null || !source.getAddress().startsWith(pubSubPrefix)) ? false : true;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(ErrorCondition errorCondition) throws ActiveMQAMQPException {
        this.closed = true;
        if (errorCondition != null) {
            this.sender.setCondition(errorCondition);
        }
        this.protonSession.removeSender(this.sender);
        synchronized (this.connection.getLock()) {
            this.sender.close();
        }
        this.connection.flush();
        try {
            this.sessionSPI.closeSender(this.brokerConsumer);
        } catch (Exception e) {
            log.warn(e.getMessage(), e);
            throw new ActiveMQAMQPInternalErrorException(e.getMessage());
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(boolean z) throws ActiveMQAMQPException {
        Source source;
        try {
            this.sessionSPI.closeSender(this.brokerConsumer);
            if (z && (source = (Source) this.sender.getSource()) != null && source.getAddress() != null && (hasCapabilities(TOPIC, source) || isPubSub(source))) {
                String address = source.getAddress();
                if (this.sessionSPI.queueQuery(address, false).isExists() && source.getDynamic()) {
                    this.sessionSPI.deleteQueue(address);
                } else {
                    String createQueueName = createQueueName(getClientId(), this.sender.getName());
                    QueueQueryResult queueQuery = this.sessionSPI.queueQuery(createQueueName, false);
                    if (queueQuery.isExists()) {
                        if (queueQuery.getConsumerCount() > 0) {
                            System.out.println("error");
                        }
                        this.sessionSPI.deleteQueue(createQueueName);
                    }
                }
            }
        } catch (Exception e) {
            log.warn(e.getMessage(), e);
            throw new ActiveMQAMQPInternalErrorException(e.getMessage());
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
        Object context = delivery.getContext();
        boolean z = this.sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
        TransactionalState remoteState = delivery.getRemoteState();
        if (remoteState != null) {
            if (remoteState instanceof TransactionalState) {
                TransactionalState transactionalState = remoteState;
                Transaction transaction = this.sessionSPI.getTransaction(transactionalState.getTxnId());
                if (transactionalState.getOutcome() != null && (transactionalState.getOutcome() instanceof Accepted)) {
                    if (!delivery.remotelySettled()) {
                        TransactionalState transactionalState2 = new TransactionalState();
                        transactionalState2.setOutcome(Accepted.getInstance());
                        transactionalState2.setTxnId(transactionalState.getTxnId());
                        delivery.disposition(transactionalState2);
                    }
                    try {
                        this.sessionSPI.ack(transaction, this.brokerConsumer, context);
                    } catch (Exception e) {
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(context.toString(), e.getMessage());
                    }
                }
            } else if (remoteState instanceof Accepted) {
                try {
                    this.sessionSPI.ack(null, this.brokerConsumer, context);
                } catch (Exception e2) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(context.toString(), e2.getMessage());
                }
            } else if (remoteState instanceof Released) {
                try {
                    this.sessionSPI.cancel(this.brokerConsumer, context, false);
                } catch (Exception e3) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(context.toString(), e3.getMessage());
                }
            } else if ((remoteState instanceof Rejected) || (remoteState instanceof Modified)) {
                try {
                    this.sessionSPI.cancel(this.brokerConsumer, context, true);
                } catch (Exception e4) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(context.toString(), e4.getMessage());
                }
            }
            if (!z) {
                this.protonSession.replaceTag(delivery.getTag());
            }
            synchronized (this.connection.getLock()) {
                delivery.settle();
                this.sender.offer(1);
            }
        }
    }

    public synchronized void checkState() {
        this.sessionSPI.resumeDelivery(this.brokerConsumer);
    }

    public int deliverMessage(Object obj, int i) throws Exception {
        if (this.closed) {
            System.err.println("Message can't be delivered as it's closed");
            return 0;
        }
        if (!this.creditsSemaphore.tryAcquire()) {
            try {
                this.creditsSemaphore.acquire();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        boolean z = this.sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
        byte[] tag = z ? new byte[0] : this.protonSession.getTag();
        ByteBuf heapBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
        try {
            try {
                long encodeMessage = this.sessionSPI.encodeMessage(obj, i, new NettyWritable(heapBuffer));
                int writerIndex = heapBuffer.writerIndex();
                synchronized (this.connection.getLock()) {
                    Delivery delivery = this.sender.delivery(tag, 0, tag.length);
                    delivery.setMessageFormat((int) encodeMessage);
                    delivery.setContext(obj);
                    this.sender.send(heapBuffer.array(), heapBuffer.arrayOffset() + heapBuffer.readerIndex(), heapBuffer.readableBytes());
                    if (z) {
                        this.sessionSPI.ack(null, this.brokerConsumer, obj);
                        delivery.settle();
                    } else {
                        this.sender.advance();
                    }
                }
                this.connection.flush();
                heapBuffer.release();
                return writerIndex;
            } catch (Throwable th) {
                log.warn(th.getMessage(), th);
                throw new ActiveMQAMQPInternalErrorException(th.getMessage(), th);
            }
        } catch (Throwable th2) {
            heapBuffer.release();
            throw th2;
        }
    }

    private static boolean hasCapabilities(Symbol symbol, Source source) {
        if (source == null || source.getCapabilities() == null) {
            return false;
        }
        for (Symbol symbol2 : source.getCapabilities()) {
            if (symbol.equals(symbol2)) {
                return true;
            }
        }
        return false;
    }

    private static String createQueueName(String str, String str2) {
        return str + "." + str2;
    }
}
