package org.apache.activemq.artemis.cli.commands.messages;

import io.airlift.airline.Command;
import io.airlift.airline.Option;
import java.io.FileOutputStream;
import java.io.OutputStream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;

@Command(name = "consumer", description = "It will consume messages from an instance")
/* loaded from: input_file:m2repo/org/apache/activemq/artemis-cli/2.6.3.jbossorg-00014/artemis-cli-2.6.3.jbossorg-00014.jar:org/apache/activemq/artemis/cli/commands/messages/Consumer.class */
public class Consumer extends DestAbstract {

    @Option(name = {"--durable"}, description = "It will use durable subscription in case of client")
    boolean durable = false;

    @Option(name = {"--break-on-null"}, description = "It will break on null messages")
    boolean breakOnNull = false;

    @Option(name = {"--receive-timeout"}, description = "Time used on receive(timeout)")
    int receiveTimeout = 3000;

    @Option(name = {"--filter"}, description = "filter to be used with the consumer")
    String filter;

    @Option(name = {"--data"}, description = "serialize the messages to the specified file as they are consumed")
    String file;

    /* loaded from: input_file:m2repo/org/apache/activemq/artemis-cli/2.6.3.jbossorg-00014/artemis-cli-2.6.3.jbossorg-00014.jar:org/apache/activemq/artemis/cli/commands/messages/Consumer$SerialiserMessageListener.class */
    private class SerialiserMessageListener implements MessageListener {
        private MessageSerializer messageSerializer;

        SerialiserMessageListener(MessageSerializer messageSerializer, OutputStream outputStream) throws Exception {
            this.messageSerializer = messageSerializer;
            this.messageSerializer.setOutput(outputStream);
        }

        @Override // javax.jms.MessageListener
        public void onMessage(Message message) {
            this.messageSerializer.write(message);
        }
    }

    @Override // org.apache.activemq.artemis.cli.commands.InputAbstract, org.apache.activemq.artemis.cli.commands.ActionAbstract, org.apache.activemq.artemis.cli.commands.Action
    public Object execute(ActionContext actionContext) throws Exception {
        super.execute(actionContext);
        System.out.println("Consumer:: filter = " + this.filter);
        ConnectionFactory createConnectionFactory = createConnectionFactory();
        SerialiserMessageListener serialiserMessageListener = null;
        MessageSerializer messageSerializer = null;
        if (this.file != null) {
            try {
                String str = this.serializer == null ? DestAbstract.DEFAULT_MESSAGE_SERIALIZER : this.serializer;
                if (str.equals(DestAbstract.DEFAULT_MESSAGE_SERIALIZER) && !this.protocol.equalsIgnoreCase("CORE")) {
                    System.err.println("Default Serializer does not support: " + this.protocol + " protocol");
                    return null;
                }
                messageSerializer = (MessageSerializer) Class.forName(str).getConstructor(new Class[0]).newInstance(new Object[0]);
                try {
                    serialiserMessageListener = new SerialiserMessageListener(messageSerializer, new FileOutputStream(this.file));
                } catch (Exception e) {
                    System.err.println("Error: Unable to open file for writing\n" + e.getMessage());
                    return null;
                }
            } catch (Exception e2) {
                System.err.println("Error. Unable to instantiate serializer class: " + this.serializer);
                return null;
            }
        }
        if (messageSerializer != null) {
            messageSerializer.start();
        }
        Connection createConnection = createConnectionFactory.createConnection();
        Throwable th = null;
        try {
            try {
                ConsumerThread[] consumerThreadArr = new ConsumerThread[this.threads];
                for (int i = 0; i < this.threads; i++) {
                    Session createSession = this.txBatchSize > 0 ? createConnection.createSession(true, 0) : createConnection.createSession(false, 1);
                    consumerThreadArr[i] = new ConsumerThread(createSession, isFQQN() ? createSession.createQueue(getFQQNFromDestination(this.destination)) : lookupDestination(createSession), i);
                    consumerThreadArr[i].setVerbose(this.verbose).setSleep(this.sleep).setDurable(this.durable).setBatchSize(this.txBatchSize).setBreakOnNull(this.breakOnNull).setMessageCount(this.messageCount).setReceiveTimeOut(this.receiveTimeout).setFilter(this.filter).setBrowse(false).setListener(serialiserMessageListener);
                }
                for (ConsumerThread consumerThread : consumerThreadArr) {
                    consumerThread.start();
                }
                createConnection.start();
                int i2 = 0;
                for (ConsumerThread consumerThread2 : consumerThreadArr) {
                    consumerThread2.join();
                    i2 += consumerThread2.getReceived();
                }
                if (messageSerializer != null) {
                    messageSerializer.stop();
                }
                Integer valueOf = Integer.valueOf(i2);
                if (createConnection != null) {
                    if (0 != 0) {
                        try {
                            createConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConnection.close();
                    }
                }
                return valueOf;
            } finally {
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                if (th != null) {
                    try {
                        createConnection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConnection.close();
                }
            }
            throw th3;
        }
    }
}
