package org.projectodd.stilts.stomp.client;

import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.util.VirtualExecutorService;
import org.projectodd.stilts.StompException;
import org.projectodd.stilts.StompMessage;
import org.projectodd.stilts.logging.Logger;
import org.projectodd.stilts.logging.LoggerManager;
import org.projectodd.stilts.logging.SimpleLoggerManager;
import org.projectodd.stilts.stomp.client.StompClient;
import org.projectodd.stilts.stomp.protocol.StompControlFrame;
import org.projectodd.stilts.stomp.protocol.StompFrame;
import org.projectodd.stilts.stomp.protocol.StompFrames;

/* loaded from: input_file:org/projectodd/stilts/stomp/client/AbstractStompClient.class */
public class AbstractStompClient implements StompClient {
    private static final Callable<Void> NOOP = new Callable<Void>() { // from class: org.projectodd.stilts.stomp.client.AbstractStompClient.2
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            return null;
        }
    };
    private Logger log;
    private AtomicInteger receiptCounter = new AtomicInteger();
    private ConcurrentHashMap<String, ReceiptFuture> receiptHandlers = new ConcurrentHashMap<>(20);
    private AtomicInteger transactionCounter = new AtomicInteger();
    private Map<String, DefaultClientTransaction> transactions = new HashMap();
    private AtomicInteger subscriptionCounter = new AtomicInteger();
    private final Map<String, DefaultClientSubscription> subscriptions = new HashMap();
    private final Object stateLock = new Object();
    private StompClient.State connectionState;
    private ClientListener clientListener;
    private LoggerManager loggerManager;
    private Executor executor;
    private Channel channel;
    private SocketAddress serverAddress;

    public AbstractStompClient(SocketAddress socketAddress) {
        this.serverAddress = socketAddress;
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public void setLoggerManager(LoggerManager loggerManager) {
        this.loggerManager = loggerManager;
    }

    public LoggerManager getLoggerManager() {
        return this.loggerManager;
    }

    @Override // org.projectodd.stilts.stomp.client.StompClient
    public boolean isConnected() {
        return getConnectionState() == StompClient.State.CONNECTED;
    }

    public boolean isDisconnected() {
        return getConnectionState() == StompClient.State.DISCONNECTED;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setConnectionState(StompClient.State state) {
        synchronized (this.stateLock) {
            this.log.info("Set state: " + state);
            this.connectionState = state;
            this.stateLock.notifyAll();
        }
    }

    void waitForConnected() throws InterruptedException {
        synchronized (this.stateLock) {
            while (this.connectionState == StompClient.State.CONNECTING) {
                this.stateLock.wait();
            }
        }
    }

    void waitForDisconnected() throws InterruptedException {
        synchronized (this.stateLock) {
            while (this.connectionState == StompClient.State.DISCONNECTING) {
                this.stateLock.wait();
            }
        }
    }

    public StompClient.State getConnectionState() {
        StompClient.State state;
        synchronized (this.stateLock) {
            state = this.connectionState;
        }
        return state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void messageReceived(StompMessage stompMessage) {
        DefaultClientSubscription defaultClientSubscription;
        MessageHandler messageHandler;
        this.log.info("received message: " + stompMessage);
        boolean z = false;
        String str = stompMessage.getHeaders().get(StompFrame.Header.SUBSCRIPTION);
        if (str != null && (defaultClientSubscription = this.subscriptions.get(str)) != null && (messageHandler = defaultClientSubscription.getMessageHandler()) != null) {
            messageHandler.handle(stompMessage);
            z = true;
        }
        if (!z) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void errorReceived(StompMessage stompMessage) {
        this.log.info("received error: " + stompMessage);
        String str = stompMessage.getHeaders().get(StompFrame.Header.RECEIPT_ID);
        if (str != null) {
            receiptReceived(str, stompMessage);
        }
    }

    @Override // org.projectodd.stilts.stomp.client.StompClient
    public void connect() throws InterruptedException {
        if (this.loggerManager == null) {
            this.loggerManager = SimpleLoggerManager.DEFAULT_INSTANCE;
        }
        this.log = this.loggerManager.getLogger("client");
        if (this.executor == null) {
            this.executor = Executors.newFixedThreadPool(2);
        }
        ClientBootstrap clientBootstrap = new ClientBootstrap();
        clientBootstrap.setPipelineFactory(createPipelineFactory());
        clientBootstrap.setFactory(createChannelFactory());
        connectInternal(clientBootstrap);
    }

    void connectInternal(ClientBootstrap clientBootstrap) throws InterruptedException {
        this.log.info("Connecting");
        setConnectionState(StompClient.State.CONNECTING);
        this.channel = clientBootstrap.connect(this.serverAddress).await().getChannel();
        sendFrame(new StompControlFrame(StompFrame.Command.CONNECT));
        waitForConnected();
        if (this.connectionState != StompClient.State.CONNECTED) {
            this.log.info("Failed to connect");
            return;
        }
        this.log.info("Connected");
        if (this.clientListener != null) {
            this.clientListener.connected(this);
        }
    }

    String getNextTransactionId() {
        return "transaction-" + this.transactionCounter.getAndIncrement();
    }

    @Override // org.projectodd.stilts.stomp.client.StompClient
    public void disconnect() throws InterruptedException {
        StompControlFrame stompControlFrame = new StompControlFrame(StompFrame.Command.DISCONNECT);
        setConnectionState(StompClient.State.DISCONNECTING);
        sendFrame(stompControlFrame, new Callable<Void>() { // from class: org.projectodd.stilts.stomp.client.AbstractStompClient.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                AbstractStompClient.this.setConnectionState(StompClient.State.DISCONNECTED);
                return null;
            }
        });
        waitForDisconnected();
    }

    @Override // org.projectodd.stilts.stomp.client.StompClient
    public SubscriptionBuilder subscribe(String str) {
        return new DefaultSubscriptionBuilder(this, str);
    }

    @Override // org.projectodd.stilts.stomp.client.StompClient
    public void send(StompMessage stompMessage) {
        this.log.debug("Sending outbound message: " + stompMessage);
        sendFrame(StompFrames.newSendFrame(stompMessage));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultClientSubscription subscribe(DefaultSubscriptionBuilder defaultSubscriptionBuilder) throws InterruptedException, ExecutionException {
        StompControlFrame stompControlFrame = new StompControlFrame(StompFrame.Command.SUBSCRIBE, defaultSubscriptionBuilder.getHeaders());
        String nextSubscriptionId = getNextSubscriptionId();
        stompControlFrame.setHeader(StompFrame.Header.ID, nextSubscriptionId);
        ReceiptFuture sendFrame = sendFrame(stompControlFrame);
        sendFrame.await();
        if (sendFrame.isError()) {
            return null;
        }
        DefaultClientSubscription defaultClientSubscription = new DefaultClientSubscription(this, nextSubscriptionId, defaultSubscriptionBuilder.getMessageHandler());
        this.subscriptions.put(defaultClientSubscription.getId(), defaultClientSubscription);
        return defaultClientSubscription;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unsubscribe(DefaultClientSubscription defaultClientSubscription) throws InterruptedException, ExecutionException {
        StompControlFrame stompControlFrame = new StompControlFrame(StompFrame.Command.UNSUBSCRIBE);
        stompControlFrame.setHeader(StompFrame.Header.ID, defaultClientSubscription.getId());
        sendFrame(stompControlFrame).await();
        defaultClientSubscription.setActive(false);
    }

    String getNextSubscriptionId() {
        return "subscription-" + this.subscriptionCounter.getAndIncrement();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReceiptFuture sendFrame(StompFrame stompFrame) {
        return sendFrame(stompFrame, NOOP);
    }

    ReceiptFuture sendFrame(StompFrame stompFrame, Callable<Void> callable) {
        String nextReceiptId = getNextReceiptId();
        stompFrame.setHeader(StompFrame.Header.RECEIPT, nextReceiptId);
        ReceiptFuture receiptFuture = new ReceiptFuture(callable);
        this.receiptHandlers.put(nextReceiptId, receiptFuture);
        this.channel.write(stompFrame);
        return receiptFuture;
    }

    String getNextReceiptId() {
        return "receipt-" + this.receiptCounter.getAndIncrement();
    }

    public void receiptReceived(String str) {
        receiptReceived(str, null);
    }

    public void receiptReceived(String str, StompMessage stompMessage) {
        ReceiptFuture remove = this.receiptHandlers.remove(str);
        if (remove != null) {
            try {
                remove.received(stompMessage);
            } catch (Exception e) {
                this.log.error("Error during receipt of '" + str + "'", e);
            }
        }
    }

    @Override // org.projectodd.stilts.stomp.client.StompClient
    public ClientTransaction begin() throws StompException {
        String nextTransactionId = getNextTransactionId();
        StompControlFrame stompControlFrame = new StompControlFrame(StompFrame.Command.BEGIN);
        stompControlFrame.setHeader(StompFrame.Header.TRANSACTION, nextTransactionId);
        ReceiptFuture sendFrame = sendFrame(stompControlFrame);
        try {
            sendFrame.await();
            if (sendFrame.isError()) {
                return null;
            }
            DefaultClientTransaction defaultClientTransaction = new DefaultClientTransaction(this, nextTransactionId);
            this.transactions.put(defaultClientTransaction.getId(), defaultClientTransaction);
            return defaultClientTransaction;
        } catch (InterruptedException e) {
            throw new StompException(e);
        } catch (ExecutionException e2) {
            throw new StompException(e2);
        }
    }

    @Override // org.projectodd.stilts.stomp.client.StompClient
    public void abort(String str) throws StompException {
        StompControlFrame stompControlFrame = new StompControlFrame(StompFrame.Command.ABORT);
        stompControlFrame.setHeader(StompFrame.Header.TRANSACTION, str);
        try {
            sendFrame(stompControlFrame).await();
        } catch (InterruptedException e) {
            throw new StompException(e);
        } catch (ExecutionException e2) {
            throw new StompException(e2);
        }
    }

    @Override // org.projectodd.stilts.stomp.client.StompClient
    public void commit(String str) throws StompException {
        StompControlFrame stompControlFrame = new StompControlFrame(StompFrame.Command.COMMIT);
        stompControlFrame.setHeader(StompFrame.Header.TRANSACTION, str);
        try {
            sendFrame(stompControlFrame).await();
        } catch (InterruptedException e) {
            throw new StompException(e);
        } catch (ExecutionException e2) {
            throw new StompException(e2);
        }
    }

    protected ChannelPipelineFactory createPipelineFactory() {
        return new StompClientPipelineFactory(this, new DefaultClientContext(this));
    }

    protected ClientSocketChannelFactory createChannelFactory() {
        return new NioClientSocketChannelFactory(new VirtualExecutorService(this.executor), new VirtualExecutorService(this.executor));
    }

    public void close() throws InterruptedException {
        this.channel.getCloseFuture().await();
    }
}
