/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport;

import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SoWriteTimeoutTest
extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutTest.class);
    final int receiveBufferSize = 16384;
    public String brokerTransportScheme = "nio";

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = super.createBroker();
        broker.setPersistent(true);
        broker.setDeleteAllMessagesOnStartup(true);
        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
        adapter.setConcurrentStoreAndDispatchQueues(false);
        broker.setPersistenceAdapter((PersistenceAdapter)adapter);
        broker.addConnector(this.brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0&transport.soWriteTimeout=1000&transport.sleep=1000");
        if ("nio".equals(this.brokerTransportScheme)) {
            broker.addConnector("stomp+" + this.brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize=" + 16384 + "&trace=true");
        }
        return broker;
    }

    public void initCombosForTestWriteTimeout() {
        this.addCombinationValues("brokerTransportScheme", new Object[]{"tcp", "nio"});
    }

    public void testWriteTimeout() throws Exception {
        ActiveMQQueue dest = new ActiveMQQueue("testWriteTimeout");
        this.messageTextPrefix = this.initMessagePrefix(8192);
        this.sendMessages((Destination)dest, 500);
        URI tcpBrokerUri = URISupport.removeQuery((URI)((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri());
        LOG.info("consuming using uri: " + tcpBrokerUri);
        SocketProxy proxy = new SocketProxy();
        proxy.setTarget(tcpBrokerUri);
        proxy.setReceiveBufferSize(16384);
        proxy.open();
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(proxy.getUrl());
        Connection c = factory.createConnection();
        c.start();
        Session session = c.createSession(true, 0);
        MessageConsumer consumer = session.createConsumer((Destination)dest);
        proxy.pause();
        TimeUnit.SECONDS.sleep(10L);
        proxy.goOn();
        SoWriteTimeoutTest.assertNotNull((String)"can receive buffered messages", (Object)consumer.receive(500L));
        try {
            session.commit();
            SoWriteTimeoutTest.fail((String)"expect commit to fail as server has aborted writeTimeout connection");
        }
        catch (JMSException expected) {
            // empty catch block
        }
    }

    public void testWriteTimeoutStompNio() throws Exception {
        ActiveMQQueue dest = new ActiveMQQueue("testWriteTimeout");
        this.messageTextPrefix = this.initMessagePrefix(8192);
        this.sendMessages((Destination)dest, 500);
        URI stompBrokerUri = URISupport.removeQuery((URI)((TransportConnector)this.broker.getTransportConnectors().get(1)).getConnectUri());
        LOG.info("consuming using uri: " + stompBrokerUri);
        SocketProxy proxy = new SocketProxy();
        proxy.setTarget(new URI("tcp://localhost:" + stompBrokerUri.getPort()));
        proxy.setReceiveBufferSize(16384);
        proxy.open();
        StompConnection stompConnection = new StompConnection();
        stompConnection.open(new Socket("localhost", proxy.getUrl().getPort()));
        stompConnection.getStompSocket().setTcpNoDelay(true);
        String frame = "CONNECT\nlogin:system\npasscode:manager\n\n\u0000";
        stompConnection.sendFrame(frame);
        frame = stompConnection.receiveFrame();
        SoWriteTimeoutTest.assertTrue((boolean)frame.startsWith("CONNECTED"));
        frame = "SUBSCRIBE\ndestination:/queue/" + dest.getQueueName() + "\n" + "ack:client\n\n" + "\u0000";
        stompConnection.sendFrame(frame);
        frame = stompConnection.receiveFrame();
        SoWriteTimeoutTest.assertTrue((boolean)frame.startsWith("MESSAGE"));
        proxy.pause();
        TimeUnit.SECONDS.sleep(1L);
        TimeUnit.SECONDS.sleep(10L);
        proxy.goOn();
        frame = stompConnection.receiveFrame();
        SoWriteTimeoutTest.assertTrue((boolean)frame.startsWith("MESSAGE"));
        try {
            for (int i = 0; i < 200; ++i) {
                stompConnection.send("/queue/" + dest.getPhysicalName(), "ShouldBeDeadConnectionText" + i);
            }
            SoWriteTimeoutTest.fail((String)"expected send to fail with timeout out connection");
        }
        catch (SocketException expected) {
            LOG.info("got exception on send after timeout: " + expected);
        }
    }

    private String initMessagePrefix(int i) {
        byte[] content = new byte[i];
        return new String(content);
    }

    @Override
    protected void setUp() throws Exception {
        this.setAutoFail(true);
        super.setUp();
    }

    public static Test suite() {
        return SoWriteTimeoutTest.suite(SoWriteTimeoutTest.class);
    }
}

