/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LargeStreamletTest
extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class);
    private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
    private static final int BUFFER_SIZE = 1024;
    private static final int MESSAGE_COUNT = 10240;
    protected Exception writerException;
    protected Exception readerException;
    private final AtomicInteger totalRead = new AtomicInteger();
    private final AtomicInteger totalWritten = new AtomicInteger();
    private final AtomicBoolean stopThreads = new AtomicBoolean(false);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testStreamlets() throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
        final ActiveMQConnection connection = (ActiveMQConnection)factory.createConnection();
        connection.start();
        try {
            Session session = connection.createSession(false, 1);
            try {
                Queue destination = session.createQueue("wibble");
                Thread readerThread = new Thread(new Runnable((Destination)destination){
                    final /* synthetic */ Destination val$destination;
                    {
                        this.val$destination = destination;
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        LargeStreamletTest.this.totalRead.set(0);
                        try {
                            InputStream inputStream = connection.createInputStream(this.val$destination);
                            try {
                                int read;
                                byte[] buf = new byte[1024];
                                while (!LargeStreamletTest.this.stopThreads.get() && (read = inputStream.read(buf)) != -1) {
                                    LargeStreamletTest.this.totalRead.addAndGet(read);
                                }
                            }
                            finally {
                                inputStream.close();
                            }
                        }
                        catch (Exception e) {
                            LargeStreamletTest.this.readerException = e;
                            e.printStackTrace();
                        }
                        finally {
                            LOG.info(LargeStreamletTest.this.totalRead + " total bytes read.");
                        }
                    }
                });
                Thread writerThread = new Thread(new Runnable((Destination)destination){
                    private final Random random = new Random();
                    final /* synthetic */ Destination val$destination;
                    {
                        this.val$destination = destination;
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        LargeStreamletTest.this.totalWritten.set(0);
                        try {
                            OutputStream outputStream = connection.createOutputStream(this.val$destination);
                            try {
                                byte[] buf = new byte[1024];
                                this.random.nextBytes(buf);
                                for (int count = 10240; count > 0 && !LargeStreamletTest.this.stopThreads.get(); --count) {
                                    outputStream.write(buf);
                                    LargeStreamletTest.this.totalWritten.addAndGet(buf.length);
                                }
                            }
                            finally {
                                outputStream.close();
                            }
                        }
                        catch (Exception e) {
                            LargeStreamletTest.this.writerException = e;
                            e.printStackTrace();
                        }
                        finally {
                            LOG.info(LargeStreamletTest.this.totalWritten + " total bytes written.");
                        }
                    }
                });
                readerThread.start();
                writerThread.start();
                Thread.sleep(1000L);
                int lastRead = this.totalRead.get();
                while (readerThread.isAlive()) {
                    readerThread.join(1000L);
                    if (lastRead == this.totalRead.get()) break;
                    lastRead = this.totalRead.get();
                }
                this.stopThreads.set(true);
                LargeStreamletTest.assertTrue((String)"Should not have received a reader exception", (this.readerException == null ? 1 : 0) != 0);
                LargeStreamletTest.assertTrue((String)"Should not have received a writer exception", (this.writerException == null ? 1 : 0) != 0);
                LargeStreamletTest.assertEquals((String)"Not all messages accounted for", (int)this.totalWritten.get(), (int)this.totalRead.get());
            }
            finally {
                session.close();
            }
        }
        finally {
            connection.close();
        }
    }
}

