package org.apache.activemq.transport.stomp;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerContextAware;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:activemq-core-5.4.1-fuse-00-00.jar:org/apache/activemq/transport/stomp/ProtocolConverter.class */
public class ProtocolConverter {
    private static final Log LOG = LogFactory.getLog(ProtocolConverter.class);
    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
    private final StompTransport stompTransport;
    private int lastCommandId;
    private final FrameTranslator frameTranslator;
    private final BrokerContext brokerContext;
    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
    private final SessionId sessionId = new SessionId(this.connectionId, -1);
    private final ProducerId producerId = new ProducerId(this.sessionId, 1);
    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
    private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
    private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
    private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<>();
    private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap();
    private final Object commnadIdMutex = new Object();
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
    ConnectionInfo connectionInfo = new ConnectionInfo();

    public ProtocolConverter(StompTransport stompTransport, FrameTranslator frameTranslator, BrokerContext brokerContext) {
        this.stompTransport = stompTransport;
        this.frameTranslator = frameTranslator;
        this.brokerContext = brokerContext;
    }

    protected int generateCommandId() {
        int i;
        synchronized (this.commnadIdMutex) {
            i = this.lastCommandId;
            this.lastCommandId = i + 1;
        }
        return i;
    }

    protected ResponseHandler createResponseHandler(final StompFrame stompFrame) {
        final String str = stompFrame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
        if (str != null) {
            return new ResponseHandler() { // from class: org.apache.activemq.transport.stomp.ProtocolConverter.1
                @Override // org.apache.activemq.transport.stomp.ResponseHandler
                public void onResponse(ProtocolConverter protocolConverter, Response response) throws IOException {
                    if (response.isException()) {
                        ProtocolConverter.this.handleException(((ExceptionResponse) response).getException(), stompFrame);
                        return;
                    }
                    StompFrame stompFrame2 = new StompFrame();
                    stompFrame2.setAction(Stomp.Responses.RECEIPT);
                    stompFrame2.setHeaders(new HashMap(1));
                    stompFrame2.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, str);
                    ProtocolConverter.this.stompTransport.sendToStomp(stompFrame2);
                }
            };
        }
        return null;
    }

    protected void sendToActiveMQ(Command command, ResponseHandler responseHandler) {
        command.setCommandId(generateCommandId());
        if (responseHandler != null) {
            command.setResponseRequired(true);
            this.resposeHandlers.put(Integer.valueOf(command.getCommandId()), responseHandler);
        }
        this.stompTransport.sendToActiveMQ(command);
    }

    protected void sendToStomp(StompFrame stompFrame) throws IOException {
        this.stompTransport.sendToStomp(stompFrame);
    }

    protected FrameTranslator findTranslator(String str) {
        FrameTranslator frameTranslator = this.frameTranslator;
        if (str != null) {
            try {
                frameTranslator = (FrameTranslator) this.FRAME_TRANSLATOR_FINDER.newInstance(str);
                if (frameTranslator instanceof BrokerContextAware) {
                    ((BrokerContextAware) frameTranslator).setBrokerContext(this.brokerContext);
                }
            } catch (Exception e) {
            }
        }
        return frameTranslator;
    }

    public void onStompCommand(StompFrame stompFrame) throws IOException, JMSException {
        try {
            if (stompFrame.getClass() == StompFrameError.class) {
                throw ((StompFrameError) stompFrame).getException();
            }
            String action = stompFrame.getAction();
            if (action.startsWith(Stomp.Commands.SEND)) {
                onStompSend(stompFrame);
            } else if (action.startsWith(Stomp.Commands.ACK)) {
                onStompAck(stompFrame);
            } else if (action.startsWith("BEGIN")) {
                onStompBegin(stompFrame);
            } else if (action.startsWith("COMMIT")) {
                onStompCommit(stompFrame);
            } else if (action.startsWith("ABORT")) {
                onStompAbort(stompFrame);
            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
                onStompSubscribe(stompFrame);
            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
                onStompUnsubscribe(stompFrame);
            } else if (action.startsWith(Stomp.Commands.CONNECT)) {
                onStompConnect(stompFrame);
            } else {
                if (!action.startsWith(Stomp.Commands.DISCONNECT)) {
                    throw new ProtocolException("Unknown STOMP action: " + action);
                }
                onStompDisconnect(stompFrame);
            }
        } catch (ProtocolException e) {
            handleException(e, stompFrame);
            if (e.isFatal()) {
                getStompTransport().onException(e);
            }
        }
    }

    protected void handleException(Throwable th, StompFrame stompFrame) throws IOException {
        String str;
        LOG.warn("Exception occured processing: \n" + stompFrame, th);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(byteArrayOutputStream, "UTF-8"));
        th.printStackTrace(printWriter);
        printWriter.close();
        HashMap hashMap = new HashMap();
        hashMap.put(Stomp.Headers.Error.MESSAGE, th.getMessage());
        if (stompFrame != null && (str = stompFrame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED)) != null) {
            hashMap.put(Stomp.Headers.Response.RECEIPT_ID, str);
        }
        sendToStomp(new StompFrame(Stomp.Responses.ERROR, hashMap, byteArrayOutputStream.toByteArray()));
    }

    protected void onStompSend(StompFrame stompFrame) throws IOException, JMSException {
        checkConnected();
        Map<String, String> headers = stompFrame.getHeaders();
        String str = headers.get(Stomp.Headers.TRANSACTION);
        headers.remove(Stomp.Headers.TRANSACTION);
        ActiveMQMessage convertMessage = convertMessage(stompFrame);
        convertMessage.setProducerId(this.producerId);
        convertMessage.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
        convertMessage.setJMSTimestamp(System.currentTimeMillis());
        if (str != null) {
            LocalTransactionId localTransactionId = this.transactions.get(str);
            if (localTransactionId == null) {
                throw new ProtocolException("Invalid transaction id: " + str);
            }
            convertMessage.setTransactionId(localTransactionId);
        }
        convertMessage.onSend();
        sendToActiveMQ(convertMessage, createResponseHandler(stompFrame));
    }

    protected void onStompAck(StompFrame stompFrame) throws ProtocolException {
        checkConnected();
        Map<String, String> headers = stompFrame.getHeaders();
        String str = headers.get("message-id");
        if (str == null) {
            throw new ProtocolException("ACK received without a message-id to acknowledge!");
        }
        LocalTransactionId localTransactionId = null;
        String str2 = headers.get(Stomp.Headers.TRANSACTION);
        if (str2 != null) {
            localTransactionId = this.transactions.get(str2);
            if (localTransactionId == null) {
                throw new ProtocolException("Invalid transaction id: " + str2);
            }
        }
        boolean z = false;
        Iterator<StompSubscription> it = this.subscriptionsByConsumerId.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            MessageAck onStompMessageAck = it.next().onStompMessageAck(str, localTransactionId);
            if (onStompMessageAck != null) {
                onStompMessageAck.setTransactionId(localTransactionId);
                sendToActiveMQ(onStompMessageAck, createResponseHandler(stompFrame));
                z = true;
                break;
            }
        }
        if (!z) {
            throw new ProtocolException("Unexpected ACK received for message-id [" + str + PropertyAccessor.PROPERTY_KEY_SUFFIX);
        }
    }

    protected void onStompBegin(StompFrame stompFrame) throws ProtocolException {
        checkConnected();
        Map<String, String> headers = stompFrame.getHeaders();
        String str = headers.get(Stomp.Headers.TRANSACTION);
        if (!headers.containsKey(Stomp.Headers.TRANSACTION)) {
            throw new ProtocolException("Must specify the transaction you are beginning");
        }
        if (this.transactions.get(str) != null) {
            throw new ProtocolException("The transaction was allready started: " + str);
        }
        LocalTransactionId localTransactionId = new LocalTransactionId(this.connectionId, this.transactionIdGenerator.getNextSequenceId());
        this.transactions.put(str, localTransactionId);
        TransactionInfo transactionInfo = new TransactionInfo();
        transactionInfo.setConnectionId(this.connectionId);
        transactionInfo.setTransactionId(localTransactionId);
        transactionInfo.setType((byte) 0);
        sendToActiveMQ(transactionInfo, createResponseHandler(stompFrame));
    }

    protected void onStompCommit(StompFrame stompFrame) throws ProtocolException {
        checkConnected();
        String str = stompFrame.getHeaders().get(Stomp.Headers.TRANSACTION);
        if (str == null) {
            throw new ProtocolException("Must specify the transaction you are committing");
        }
        LocalTransactionId remove = this.transactions.remove(str);
        if (remove == null) {
            throw new ProtocolException("Invalid transaction id: " + str);
        }
        Iterator<StompSubscription> it = this.subscriptionsByConsumerId.values().iterator();
        while (it.hasNext()) {
            it.next().onStompCommit(remove);
        }
        TransactionInfo transactionInfo = new TransactionInfo();
        transactionInfo.setConnectionId(this.connectionId);
        transactionInfo.setTransactionId(remove);
        transactionInfo.setType((byte) 2);
        sendToActiveMQ(transactionInfo, createResponseHandler(stompFrame));
    }

    protected void onStompAbort(StompFrame stompFrame) throws ProtocolException {
        checkConnected();
        String str = stompFrame.getHeaders().get(Stomp.Headers.TRANSACTION);
        if (str == null) {
            throw new ProtocolException("Must specify the transaction you are committing");
        }
        LocalTransactionId remove = this.transactions.remove(str);
        if (remove == null) {
            throw new ProtocolException("Invalid transaction id: " + str);
        }
        Iterator<StompSubscription> it = this.subscriptionsByConsumerId.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().onStompAbort(remove);
            } catch (Exception e) {
                throw new ProtocolException("Transaction abort failed", false, e);
            }
        }
        TransactionInfo transactionInfo = new TransactionInfo();
        transactionInfo.setConnectionId(this.connectionId);
        transactionInfo.setTransactionId(remove);
        transactionInfo.setType((byte) 4);
        sendToActiveMQ(transactionInfo, createResponseHandler(stompFrame));
    }

    protected void onStompSubscribe(StompFrame stompFrame) throws ProtocolException {
        checkConnected();
        FrameTranslator findTranslator = findTranslator(stompFrame.getHeaders().get(Stomp.Headers.TRANSFORMATION));
        Map<String, String> headers = stompFrame.getHeaders();
        String str = headers.get("id");
        String str2 = headers.get("destination");
        ActiveMQDestination convertDestination = findTranslator.convertDestination(this, str2);
        if (convertDestination == null) {
            throw new ProtocolException("Invalid Destination.");
        }
        ConsumerId consumerId = new ConsumerId(this.sessionId, this.consumerIdGenerator.getNextSequenceId());
        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
        consumerInfo.setPrefetchSize(1000);
        consumerInfo.setDispatchAsync(true);
        consumerInfo.setSelector(headers.remove(Stomp.Headers.Subscribe.SELECTOR));
        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
        consumerInfo.setDestination(findTranslator.convertDestination(this, str2));
        StompSubscription stompSubscription = new StompSubscription(this, str, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
        stompSubscription.setDestination(convertDestination);
        String str3 = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
        if ("client".equals(str3)) {
            stompSubscription.setAckMode("client");
        } else if ("client-individual".equals(str3)) {
            stompSubscription.setAckMode("client-individual");
        } else {
            stompSubscription.setAckMode("auto");
        }
        this.subscriptionsByConsumerId.put(consumerId, stompSubscription);
        sendToActiveMQ(consumerInfo, createResponseHandler(stompFrame));
    }

    protected void onStompUnsubscribe(StompFrame stompFrame) throws ProtocolException {
        checkConnected();
        Map<String, String> headers = stompFrame.getHeaders();
        ActiveMQDestination activeMQDestination = null;
        String str = headers.get("destination");
        if (str != null) {
            activeMQDestination = findTranslator(stompFrame.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, str);
        }
        String str2 = headers.get("id");
        if (str2 == null && activeMQDestination == null) {
            throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
        }
        String str3 = stompFrame.getHeaders().get("activemq.subscriptionName");
        if (str3 != null) {
            RemoveSubscriptionInfo removeSubscriptionInfo = new RemoveSubscriptionInfo();
            removeSubscriptionInfo.setClientId(str3);
            removeSubscriptionInfo.setSubscriptionName(str3);
            removeSubscriptionInfo.setConnectionId(this.connectionId);
            sendToActiveMQ(removeSubscriptionInfo, createResponseHandler(stompFrame));
            return;
        }
        Iterator<StompSubscription> it = this.subscriptionsByConsumerId.values().iterator();
        while (it.hasNext()) {
            StompSubscription next = it.next();
            if ((str2 != null && str2.equals(next.getSubscriptionId())) || (activeMQDestination != null && activeMQDestination.equals(next.getDestination()))) {
                sendToActiveMQ(next.getConsumerInfo().createRemoveCommand(), createResponseHandler(stompFrame));
                it.remove();
                return;
            }
        }
        throw new ProtocolException("No subscription matched.");
    }

    protected void onStompConnect(final StompFrame stompFrame) throws ProtocolException {
        if (this.connected.get()) {
            throw new ProtocolException("Allready connected.");
        }
        final Map<String, String> headers = stompFrame.getHeaders();
        String str = headers.get(Stomp.Headers.Connect.LOGIN);
        String str2 = headers.get(Stomp.Headers.Connect.PASSCODE);
        String str3 = headers.get(Stomp.Headers.Connect.CLIENT_ID);
        IntrospectionSupport.setProperties(this.connectionInfo, headers, "activemq.");
        this.connectionInfo.setConnectionId(this.connectionId);
        if (str3 != null) {
            this.connectionInfo.setClientId(str3);
        } else {
            this.connectionInfo.setClientId("" + this.connectionInfo.getConnectionId().toString());
        }
        this.connectionInfo.setResponseRequired(true);
        this.connectionInfo.setUserName(str);
        this.connectionInfo.setPassword(str2);
        this.connectionInfo.setTransportContext(this.stompTransport.getPeerCertificates());
        sendToActiveMQ(this.connectionInfo, new ResponseHandler() { // from class: org.apache.activemq.transport.stomp.ProtocolConverter.2
            @Override // org.apache.activemq.transport.stomp.ResponseHandler
            public void onResponse(ProtocolConverter protocolConverter, Response response) throws IOException {
                if (response.isException()) {
                    Throwable exception = ((ExceptionResponse) response).getException();
                    ProtocolConverter.this.handleException(exception, stompFrame);
                    ProtocolConverter.this.getStompTransport().onException(IOExceptionSupport.create(exception));
                } else {
                    ProtocolConverter.this.sendToActiveMQ(new SessionInfo(ProtocolConverter.this.sessionId), null);
                    ProtocolConverter.this.sendToActiveMQ(new ProducerInfo(ProtocolConverter.this.producerId), new ResponseHandler() { // from class: org.apache.activemq.transport.stomp.ProtocolConverter.2.1
                        @Override // org.apache.activemq.transport.stomp.ResponseHandler
                        public void onResponse(ProtocolConverter protocolConverter2, Response response2) throws IOException {
                            if (response2.isException()) {
                                Throwable exception2 = ((ExceptionResponse) response2).getException();
                                ProtocolConverter.this.handleException(exception2, stompFrame);
                                ProtocolConverter.this.getStompTransport().onException(IOExceptionSupport.create(exception2));
                            }
                            ProtocolConverter.this.connected.set(true);
                            HashMap hashMap = new HashMap();
                            hashMap.put(Stomp.Headers.Connected.SESSION, ProtocolConverter.this.connectionInfo.getClientId());
                            String str4 = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID);
                            if (str4 == null) {
                                str4 = (String) headers.get(Stomp.Headers.RECEIPT_REQUESTED);
                            }
                            if (str4 != null) {
                                hashMap.put(Stomp.Headers.Connected.RESPONSE_ID, str4);
                                hashMap.put(Stomp.Headers.Response.RECEIPT_ID, str4);
                            }
                            StompFrame stompFrame2 = new StompFrame();
                            stompFrame2.setAction(Stomp.Responses.CONNECTED);
                            stompFrame2.setHeaders(hashMap);
                            ProtocolConverter.this.sendToStomp(stompFrame2);
                        }
                    });
                }
            }
        });
    }

    protected void onStompDisconnect(StompFrame stompFrame) throws ProtocolException {
        checkConnected();
        sendToActiveMQ(this.connectionInfo.createRemoveCommand(), createResponseHandler(stompFrame));
        sendToActiveMQ(new ShutdownInfo(), createResponseHandler(stompFrame));
        this.connected.set(false);
    }

    protected void checkConnected() throws ProtocolException {
        if (!this.connected.get()) {
            throw new ProtocolException("Not connected.");
        }
    }

    public void onActiveMQCommand(Command command) throws IOException, JMSException {
        if (command.isResponse()) {
            Response response = (Response) command;
            ResponseHandler remove = this.resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
            if (remove != null) {
                remove.onResponse(this, response);
                return;
            } else {
                if (response.isException()) {
                    handleException(((ExceptionResponse) response).getException(), null);
                    return;
                }
                return;
            }
        }
        if (!command.isMessageDispatch()) {
            if (command.getDataStructureType() == 16) {
                handleException(((ConnectionError) command).getException(), null);
            }
        } else {
            MessageDispatch messageDispatch = (MessageDispatch) command;
            StompSubscription stompSubscription = this.subscriptionsByConsumerId.get(messageDispatch.getConsumerId());
            if (stompSubscription != null) {
                stompSubscription.onMessageDispatch(messageDispatch);
            }
        }
    }

    public ActiveMQMessage convertMessage(StompFrame stompFrame) throws IOException, JMSException {
        return findTranslator(stompFrame.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertFrame(this, stompFrame);
    }

    public StompFrame convertMessage(ActiveMQMessage activeMQMessage, boolean z) throws IOException, JMSException {
        return z ? this.frameTranslator.convertMessage(this, activeMQMessage) : findTranslator(activeMQMessage.getStringProperty(Stomp.Headers.TRANSFORMATION)).convertMessage(this, activeMQMessage);
    }

    public StompTransport getStompTransport() {
        return this.stompTransport;
    }

    public ActiveMQDestination createTempQueue(String str) {
        ActiveMQDestination activeMQDestination = this.tempDestinations.get(str);
        if (activeMQDestination == null) {
            activeMQDestination = new ActiveMQTempQueue(this.connectionId, this.tempDestinationGenerator.getNextSequenceId());
            sendToActiveMQ(new DestinationInfo(this.connectionId, (byte) 0, activeMQDestination), null);
            this.tempDestinations.put(str, activeMQDestination);
        }
        return activeMQDestination;
    }

    public ActiveMQDestination createTempTopic(String str) {
        ActiveMQDestination activeMQDestination = this.tempDestinations.get(str);
        if (activeMQDestination == null) {
            activeMQDestination = new ActiveMQTempTopic(this.connectionId, this.tempDestinationGenerator.getNextSequenceId());
            sendToActiveMQ(new DestinationInfo(this.connectionId, (byte) 0, activeMQDestination), null);
            this.tempDestinations.put(str, activeMQDestination);
            this.tempDestinationAmqToStompMap.put(activeMQDestination.getQualifiedName(), str);
        }
        return activeMQDestination;
    }

    public String getCreatedTempDestinationName(ActiveMQDestination activeMQDestination) {
        return this.tempDestinationAmqToStompMap.get(activeMQDestination.getQualifiedName());
    }
}
