package org.codehaus.stomp.jms;

import java.io.ByteArrayOutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.stomp.ProtocolException;
import org.codehaus.stomp.Stomp;
import org.codehaus.stomp.StompFrame;
import org.codehaus.stomp.StompFrameError;
import org.codehaus.stomp.StompHandler;
import org.codehaus.stomp.util.IntrospectionSupport;

/* loaded from: input_file:org/codehaus/stomp/jms/ProtocolConverter.class */
public class ProtocolConverter implements StompHandler {
    private static final transient Log log = LogFactory.getLog(ProtocolConverter.class);
    private ConnectionFactory connectionFactory;
    private final StompHandler outputHandler;
    private Connection connection;
    private StompSession defaultSession;
    private StompSession clientAckSession;
    private final Map<String, StompSession> transactedSessions = new ConcurrentHashMap();
    private final Map subscriptions = new ConcurrentHashMap();
    private final Map messages = new ConcurrentHashMap();
    private volatile boolean closing = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/codehaus/stomp/jms/ProtocolConverter$MSC.class */
    public class MSC {
        public Message message;
        public Session session;
        public MessageConsumer consumer;

        private MSC() {
        }
    }

    /* loaded from: input_file:org/codehaus/stomp/jms/ProtocolConverter$MessageSession.class */
    private class MessageSession {
        public Message message;
        public Session session;

        private MessageSession() {
        }
    }

    public ProtocolConverter(ConnectionFactory connectionFactory, StompHandler stompHandler) {
        this.connectionFactory = connectionFactory;
        this.outputHandler = stompHandler;
    }

    public ConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    public StompHandler getOutputHandler() {
        return this.outputHandler;
    }

    @Override // org.codehaus.stomp.StompHandler
    public synchronized void close() throws JMSException {
        this.closing = true;
        try {
            for (MSC msc : this.messages.values()) {
                synchronized (msc.consumer) {
                    msc.consumer.setMessageListener((MessageListener) null);
                    msc.consumer.notify();
                }
            }
            JMSException jMSException = null;
            ArrayList<StompSession> arrayList = new ArrayList(this.transactedSessions.values());
            if (this.defaultSession != null) {
                arrayList.add(this.defaultSession);
            }
            if (this.clientAckSession != null) {
                arrayList.add(this.clientAckSession);
            }
            for (StompSession stompSession : arrayList) {
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Closing session: " + stompSession + " with ack mode: " + stompSession.getSession().getAcknowledgeMode());
                    }
                    stompSession.close();
                } catch (JMSException e) {
                    if (jMSException == null) {
                        jMSException = e;
                    }
                }
            }
            if (this.connection != null) {
                this.connection.close();
            }
            if (jMSException != null) {
                throw jMSException;
            }
        } finally {
            this.connection = null;
            this.defaultSession = null;
            this.clientAckSession = null;
            this.transactedSessions.clear();
            this.subscriptions.clear();
            this.messages.clear();
        }
    }

    @Override // org.codehaus.stomp.StompHandler
    public void onStompFrame(StompFrame stompFrame) throws Exception {
        try {
            if (log.isDebugEnabled()) {
                log.debug(">>>> " + stompFrame.getAction() + " headers: " + stompFrame.getHeaders());
            }
            if (stompFrame.getClass() == StompFrameError.class) {
                throw ((StompFrameError) stompFrame).getException();
            }
            String action = stompFrame.getAction();
            if (action.startsWith(Stomp.Commands.SEND)) {
                onStompSend(stompFrame);
            } else if (action.startsWith(Stomp.Commands.ACK)) {
                onStompAck(stompFrame);
            } else if (action.startsWith("BEGIN")) {
                onStompBegin(stompFrame);
            } else if (action.startsWith("COMMIT")) {
                onStompCommit(stompFrame);
            } else if (action.startsWith("ABORT")) {
                onStompAbort(stompFrame);
            } else if (action.startsWith(Stomp.Commands.SUBSCRIBE)) {
                onStompSubscribe(stompFrame);
            } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
                onStompUnsubscribe(stompFrame);
            } else if (action.startsWith(Stomp.Commands.CONNECT)) {
                onStompConnect(stompFrame);
            } else {
                if (!action.startsWith(Stomp.Commands.DISCONNECT)) {
                    throw new ProtocolException("Unknown STOMP action: " + action);
                }
                onStompDisconnect(stompFrame);
            }
        } catch (Exception e) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(byteArrayOutputStream, "UTF-8"));
            e.printStackTrace(printWriter);
            printWriter.close();
            HashMap hashMap = new HashMap();
            hashMap.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
            String str = (String) stompFrame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
            if (str != null) {
                hashMap.put(Stomp.Headers.Response.RECEIPT_ID, str);
            }
            sendToStomp(new StompFrame(Stomp.Responses.ERROR, hashMap, byteArrayOutputStream.toByteArray()));
        }
    }

    @Override // org.codehaus.stomp.StompHandler
    public void onException(Exception exc) {
        log.error("Caught: " + exc, exc);
    }

    public boolean addMessageToAck(Message message, MessageConsumer messageConsumer) throws JMSException {
        if (!this.closing) {
            MSC msc = new MSC();
            msc.message = message;
            msc.consumer = messageConsumer;
            this.messages.put(message.getJMSMessageID(), msc);
        }
        return this.closing;
    }

    protected void onStompConnect(StompFrame stompFrame) throws Exception {
        if (this.connection != null) {
            throw new ProtocolException("Allready connected.");
        }
        Map<String, Object> headers = stompFrame.getHeaders();
        String str = (String) headers.get(Stomp.Headers.Connect.LOGIN);
        String str2 = (String) headers.get(Stomp.Headers.Connect.PASSCODE);
        String str3 = (String) headers.get(Stomp.Headers.Connect.CLIENT_ID);
        ConnectionFactory connectionFactory = getConnectionFactory();
        IntrospectionSupport.setProperties(connectionFactory, headers, "factory.");
        if (str != null) {
            this.connection = connectionFactory.createConnection(str, str2);
        } else {
            this.connection = connectionFactory.createConnection();
        }
        if (str3 != null) {
            this.connection.setClientID(str3);
        }
        IntrospectionSupport.setProperties(this.connection, headers, "connection.");
        this.connection.start();
        HashMap hashMap = new HashMap();
        hashMap.put(Stomp.Headers.Connected.SESSION, this.connection.getClientID());
        String str4 = (String) headers.get(Stomp.Headers.Connect.REQUEST_ID);
        if (str4 == null) {
            str4 = (String) headers.get(Stomp.Headers.RECEIPT_REQUESTED);
        }
        if (str4 != null) {
            hashMap.put(Stomp.Headers.Connected.RESPONSE_ID, str4);
            hashMap.put(Stomp.Headers.Response.RECEIPT_ID, str4);
        }
        StompFrame stompFrame2 = new StompFrame();
        stompFrame2.setAction(Stomp.Responses.CONNECTED);
        stompFrame2.setHeaders(hashMap);
        sendToStomp(stompFrame2);
    }

    protected void onStompDisconnect(StompFrame stompFrame) throws Exception {
        checkConnected();
        close();
    }

    protected void onStompSend(StompFrame stompFrame) throws Exception {
        checkConnected();
        String str = (String) stompFrame.getHeaders().get(Stomp.Headers.TRANSACTION);
        (str != null ? getExistingTransactedSession(str) : getDefaultSession()).sendToJms(stompFrame);
        sendResponse(stompFrame);
    }

    protected void onStompBegin(StompFrame stompFrame) throws Exception {
        checkConnected();
        String str = (String) stompFrame.getHeaders().get(Stomp.Headers.TRANSACTION);
        if (str == null) {
            throw new ProtocolException("Must specify the transaction you are beginning");
        }
        if (getTransactedSession(str) != null) {
            throw new ProtocolException("The transaction was already started: " + str);
        }
        setTransactedSession(str, createTransactedSession(str));
        sendResponse(stompFrame);
    }

    protected void onStompCommit(StompFrame stompFrame) throws Exception {
        checkConnected();
        String str = (String) stompFrame.getHeaders().get(Stomp.Headers.TRANSACTION);
        if (str == null) {
            throw new ProtocolException("Must specify the transaction you are committing");
        }
        StompSession existingTransactedSession = getExistingTransactedSession(str);
        existingTransactedSession.getSession().commit();
        considerClosingTransactedSession(existingTransactedSession, str);
        sendResponse(stompFrame);
    }

    protected void onStompAbort(StompFrame stompFrame) throws Exception {
        checkConnected();
        String str = (String) stompFrame.getHeaders().get(Stomp.Headers.TRANSACTION);
        if (str == null) {
            throw new ProtocolException("Must specify the transaction you are committing");
        }
        StompSession existingTransactedSession = getExistingTransactedSession(str);
        existingTransactedSession.getSession().rollback();
        considerClosingTransactedSession(existingTransactedSession, str);
        sendResponse(stompFrame);
    }

    protected void onStompSubscribe(StompFrame stompFrame) throws Exception {
        StompSession defaultSession;
        checkConnected();
        Map<String, Object> headers = stompFrame.getHeaders();
        String str = (String) headers.get(Stomp.Headers.TRANSACTION);
        if (str != null) {
            defaultSession = getExistingTransactedSession(str);
        } else {
            String str2 = (String) headers.get(Stomp.Headers.Subscribe.ACK_MODE);
            defaultSession = (str2 == null || !"client".equals(str2)) ? getDefaultSession() : getClientAckSession();
        }
        String str3 = (String) headers.get("id");
        if (str3 == null) {
            str3 = createSubscriptionId(headers);
        }
        if (((StompSubscription) this.subscriptions.get(str3)) != null) {
            throw new ProtocolException("There already is a subscription for: " + str3 + ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
        }
        this.subscriptions.put(str3, new StompSubscription(defaultSession, str3, stompFrame));
        sendResponse(stompFrame);
    }

    protected void onStompUnsubscribe(StompFrame stompFrame) throws Exception {
        checkConnected();
        Map<String, Object> headers = stompFrame.getHeaders();
        String str = (String) headers.get("destination");
        String str2 = (String) headers.get("id");
        if (str2 == null) {
            if (str == null) {
                throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
            }
            str2 = createSubscriptionId(headers);
        }
        StompSubscription stompSubscription = (StompSubscription) this.subscriptions.remove(str2);
        if (stompSubscription == null) {
            throw new ProtocolException("Cannot unsubscribe as mo subscription exists for id: " + str2);
        }
        stompSubscription.close();
        sendResponse(stompFrame);
    }

    protected void onStompAck(StompFrame stompFrame) throws Exception {
        checkConnected();
        String str = (String) stompFrame.getHeaders().get("message-id");
        if (str == null) {
            throw new ProtocolException("ACK received without a message-id to acknowledge!");
        }
        MSC msc = (MSC) this.messages.remove(str);
        if (msc == null) {
            throw new ProtocolException("No such message for message-id: " + str);
        }
        synchronized (msc.consumer) {
            msc.message.acknowledge();
            msc.consumer.notify();
        }
        sendResponse(stompFrame);
    }

    protected void checkConnected() throws ProtocolException {
        if (this.connection == null) {
            throw new ProtocolException("Not connected.");
        }
    }

    protected String createSubscriptionId(Map map) {
        return "/subscription-to/" + map.get("destination");
    }

    protected StompSession getDefaultSession() throws JMSException {
        if (this.defaultSession == null) {
            this.defaultSession = createSession(1);
        }
        return this.defaultSession;
    }

    protected StompSession getClientAckSession() throws JMSException {
        if (this.clientAckSession == null) {
            this.clientAckSession = createSession(2);
        }
        return this.clientAckSession;
    }

    protected StompSession getExistingTransactedSession(String str) throws ProtocolException, JMSException {
        StompSession transactedSession = getTransactedSession(str);
        if (transactedSession == null) {
            throw new ProtocolException("Invalid transaction id: " + str);
        }
        return transactedSession;
    }

    protected StompSession getTransactedSession(String str) throws ProtocolException, JMSException {
        return this.transactedSessions.get(str);
    }

    protected void setTransactedSession(String str, StompSession stompSession) {
        this.transactedSessions.put(str, stompSession);
    }

    protected StompSession createSession(int i) throws JMSException {
        Session createSession = this.connection.createSession(false, i);
        if (log.isDebugEnabled()) {
            log.debug("Created session with ack mode: " + createSession.getAcknowledgeMode());
        }
        return new StompSession(this, createSession);
    }

    protected StompSession createTransactedSession(String str) throws JMSException {
        return new StompSession(this, this.connection.createSession(true, 0));
    }

    protected void sendResponse(StompFrame stompFrame) throws Exception {
        String str = (String) stompFrame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
        if (str != null) {
            StompFrame stompFrame2 = new StompFrame();
            stompFrame2.setAction(Stomp.Responses.RECEIPT);
            stompFrame2.setHeaders(new HashMap(1));
            stompFrame2.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, str);
            sendToStomp(stompFrame2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendToStomp(StompFrame stompFrame) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("<<<< " + stompFrame.getAction() + " headers: " + stompFrame.getHeaders());
        }
        this.outputHandler.onStompFrame(stompFrame);
    }

    protected void considerClosingTransactedSession(StompSession stompSession, String str) {
    }
}
