package org.apache.activemq.store.journal;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.journal.JournalTransactionStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-core-5.0.0.19-fuse.jar:org/apache/activemq/store/journal/JournalPersistenceAdapter.class */
public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
    private static final Log LOG = LogFactory.getLog(JournalPersistenceAdapter.class);
    private final Journal journal;
    private final PersistenceAdapter longTermPersistence;
    private SystemUsage usageManager;
    private ThreadPoolExecutor checkpointExecutor;
    private TaskRunner checkpointTask;
    private boolean fullCheckPoint;
    private final WireFormat wireFormat = new OpenWireFormat();
    private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<>();
    private long checkpointInterval = 300000;
    private long lastCheckpointRequest = System.currentTimeMillis();
    private long lastCleanup = System.currentTimeMillis();
    private int maxCheckpointWorkers = 10;
    private int maxCheckpointMessageAddSize = 1048576;
    private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
    private AtomicBoolean started = new AtomicBoolean(false);
    private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();

    public JournalPersistenceAdapter(Journal journal, PersistenceAdapter persistenceAdapter, TaskRunnerFactory taskRunnerFactory) throws IOException {
        this.journal = journal;
        journal.setJournalEventListener(this);
        this.checkpointTask = taskRunnerFactory.createTaskRunner(new Task() { // from class: org.apache.activemq.store.journal.JournalPersistenceAdapter.1
            @Override // org.apache.activemq.thread.Task
            public boolean iterate() {
                return JournalPersistenceAdapter.this.doCheckpoint();
            }
        }, "ActiveMQ Journal Checkpoint Worker");
        this.longTermPersistence = persistenceAdapter;
    }

    final Runnable createPeriodicCheckpointTask() {
        return new Runnable() { // from class: org.apache.activemq.store.journal.JournalPersistenceAdapter.2
            @Override // java.lang.Runnable
            public void run() {
                long j;
                synchronized (this) {
                    j = JournalPersistenceAdapter.this.lastCheckpointRequest;
                }
                if (System.currentTimeMillis() > j + JournalPersistenceAdapter.this.checkpointInterval) {
                    JournalPersistenceAdapter.this.checkpoint(false, true);
                }
            }
        };
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setUsageManager(SystemUsage systemUsage) {
        this.usageManager = systemUsage;
        this.longTermPersistence.setUsageManager(systemUsage);
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public Set<ActiveMQDestination> getDestinations() {
        HashSet hashSet = new HashSet(this.longTermPersistence.getDestinations());
        hashSet.addAll(this.queues.keySet());
        hashSet.addAll(this.topics.keySet());
        return hashSet;
    }

    private MessageStore createMessageStore(ActiveMQDestination activeMQDestination) throws IOException {
        return activeMQDestination.isQueue() ? createQueueMessageStore((ActiveMQQueue) activeMQDestination) : createTopicMessageStore((ActiveMQTopic) activeMQDestination);
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public MessageStore createQueueMessageStore(ActiveMQQueue activeMQQueue) throws IOException {
        JournalMessageStore journalMessageStore = this.queues.get(activeMQQueue);
        if (journalMessageStore == null) {
            journalMessageStore = new JournalMessageStore(this, this.longTermPersistence.createQueueMessageStore(activeMQQueue), activeMQQueue);
            this.queues.put(activeMQQueue, journalMessageStore);
        }
        return journalMessageStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public TopicMessageStore createTopicMessageStore(ActiveMQTopic activeMQTopic) throws IOException {
        JournalTopicMessageStore journalTopicMessageStore = this.topics.get(activeMQTopic);
        if (journalTopicMessageStore == null) {
            journalTopicMessageStore = new JournalTopicMessageStore(this, this.longTermPersistence.createTopicMessageStore(activeMQTopic), activeMQTopic);
            this.topics.put(activeMQTopic, journalTopicMessageStore);
        }
        return journalTopicMessageStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public TransactionStore createTransactionStore() throws IOException {
        return this.transactionStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long getLastMessageBrokerSequenceId() throws IOException {
        return this.longTermPersistence.getLastMessageBrokerSequenceId();
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void beginTransaction(ConnectionContext connectionContext) throws IOException {
        this.longTermPersistence.beginTransaction(connectionContext);
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void commitTransaction(ConnectionContext connectionContext) throws IOException {
        this.longTermPersistence.commitTransaction(connectionContext);
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void rollbackTransaction(ConnectionContext connectionContext) throws IOException {
        this.longTermPersistence.rollbackTransaction(connectionContext);
    }

    @Override // org.apache.activemq.Service
    public synchronized void start() throws Exception {
        if (this.started.compareAndSet(false, true)) {
            this.checkpointExecutor = new ThreadPoolExecutor(this.maxCheckpointWorkers, this.maxCheckpointWorkers, 30L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { // from class: org.apache.activemq.store.journal.JournalPersistenceAdapter.3
                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable, "Journal checkpoint worker");
                    thread.setPriority(7);
                    return thread;
                }
            });
            this.usageManager.getMemoryUsage().addUsageListener(this);
            if (this.longTermPersistence instanceof JDBCPersistenceAdapter) {
                ((JDBCPersistenceAdapter) this.longTermPersistence).setCleanupPeriod(0);
            }
            this.longTermPersistence.start();
            createTransactionStore();
            recover();
            Scheduler.executePeriodically(this.periodicCheckpointTask, this.checkpointInterval / 10);
        }
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        this.usageManager.getMemoryUsage().removeUsageListener(this);
        if (this.started.compareAndSet(true, false)) {
            Scheduler.cancel(this.periodicCheckpointTask);
            checkpoint(true, true);
            this.checkpointTask.shutdown();
            this.checkpointExecutor.shutdown();
            this.queues.clear();
            this.topics.clear();
            IOException iOException = null;
            try {
                this.journal.close();
            } catch (Exception e) {
                iOException = IOExceptionSupport.create("Failed to close journals: " + e, e);
            }
            this.longTermPersistence.stop();
            if (iOException != null) {
                throw iOException;
            }
        }
    }

    public PersistenceAdapter getLongTermPersistence() {
        return this.longTermPersistence;
    }

    public WireFormat getWireFormat() {
        return this.wireFormat;
    }

    @Override // org.apache.activeio.journal.JournalEventListener
    public void overflowNotification(RecordLocation recordLocation) {
        checkpoint(false, true);
    }

    public void checkpoint(boolean z, boolean z2) {
        CountDownLatch countDownLatch;
        try {
            if (this.journal == null) {
                throw new IllegalStateException("Journal is closed.");
            }
            long currentTimeMillis = System.currentTimeMillis();
            synchronized (this) {
                countDownLatch = this.nextCheckpointCountDownLatch;
                this.lastCheckpointRequest = currentTimeMillis;
                if (z2) {
                    this.fullCheckPoint = true;
                }
            }
            this.checkpointTask.wakeup();
            if (z) {
                LOG.debug("Waking for checkpoint to complete.");
                countDownLatch.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.warn("Request to start checkpoint failed: " + e, e);
        }
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void checkpoint(boolean z) {
        checkpoint(z, z);
    }

    public boolean doCheckpoint() {
        CountDownLatch countDownLatch;
        boolean z;
        boolean z2;
        synchronized (this) {
            countDownLatch = this.nextCheckpointCountDownLatch;
            this.nextCheckpointCountDownLatch = new CountDownLatch(1);
            z = this.fullCheckPoint;
            this.fullCheckPoint = false;
        }
        try {
            LOG.debug("Checkpoint started.");
            RecordLocation recordLocation = null;
            ArrayList arrayList = new ArrayList(this.queues.size() + this.topics.size());
            if (z) {
                for (final JournalMessageStore journalMessageStore : this.queues.values()) {
                    try {
                        FutureTask futureTask = new FutureTask(new Callable<RecordLocation>() { // from class: org.apache.activemq.store.journal.JournalPersistenceAdapter.4
                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // java.util.concurrent.Callable
                            public RecordLocation call() throws Exception {
                                return journalMessageStore.checkpoint();
                            }
                        });
                        arrayList.add(futureTask);
                        this.checkpointExecutor.execute(futureTask);
                    } catch (Exception e) {
                        LOG.error("Failed to checkpoint a message store: " + e, e);
                    }
                }
            }
            for (final JournalTopicMessageStore journalTopicMessageStore : this.topics.values()) {
                try {
                    FutureTask futureTask2 = new FutureTask(new Callable<RecordLocation>() { // from class: org.apache.activemq.store.journal.JournalPersistenceAdapter.5
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public RecordLocation call() throws Exception {
                            return journalTopicMessageStore.checkpoint();
                        }
                    });
                    arrayList.add(futureTask2);
                    this.checkpointExecutor.execute(futureTask2);
                } catch (Exception e2) {
                    LOG.error("Failed to checkpoint a message store: " + e2, e2);
                }
            }
            try {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    RecordLocation recordLocation2 = (RecordLocation) ((FutureTask) it.next()).get();
                    if (z && recordLocation2 != null && (recordLocation == null || recordLocation.compareTo(recordLocation2) < 0)) {
                        recordLocation = recordLocation2;
                    }
                }
            } catch (Throwable th) {
                LOG.error("Failed to checkpoint a message store: " + th, th);
            }
            if (z) {
                if (recordLocation != null) {
                    try {
                        LOG.debug("Marking journal at: " + recordLocation);
                        this.journal.setMark(recordLocation, true);
                    } catch (Exception e3) {
                        LOG.error("Failed to mark the Journal: " + e3, e3);
                    }
                }
                if (this.longTermPersistence instanceof JDBCPersistenceAdapter) {
                    long currentTimeMillis = System.currentTimeMillis();
                    if (currentTimeMillis > this.lastCleanup + this.checkpointInterval) {
                        this.lastCleanup = currentTimeMillis;
                        ((JDBCPersistenceAdapter) this.longTermPersistence).cleanup();
                    }
                }
            }
            LOG.debug("Checkpoint done.");
            countDownLatch.countDown();
            synchronized (this) {
                z2 = this.fullCheckPoint;
            }
            return z2;
        } catch (Throwable th2) {
            countDownLatch.countDown();
            throw th2;
        }
    }

    public DataStructure readCommand(RecordLocation recordLocation) throws IOException {
        try {
            return (DataStructure) this.wireFormat.unmarshal(toByteSequence(this.journal.read(recordLocation)));
        } catch (IOException e) {
            throw createReadException(recordLocation, e);
        } catch (InvalidRecordLocationException e2) {
            throw createReadException(recordLocation, e2);
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:23:0x0162. Please report as an issue. */
    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
        RecordLocation recordLocation = null;
        int i = 0;
        LOG.info("Journal Recovery Started from: " + this.journal);
        ConnectionContext connectionContext = new ConnectionContext(new NonCachedMessageEvaluationContext());
        while (true) {
            RecordLocation nextRecordLocation = this.journal.getNextRecordLocation(recordLocation);
            recordLocation = nextRecordLocation;
            if (nextRecordLocation == null) {
                this.journal.setMark(writeTraceMessage("RECOVERED", true), true);
                LOG.info("Journal Recovered: " + i + " message(s) in transactions recovered.");
                return;
            }
            DataStructure dataStructure = (DataStructure) this.wireFormat.unmarshal(toByteSequence(this.journal.read(recordLocation)));
            if (!(dataStructure instanceof Message)) {
                switch (dataStructure.getDataStructureType()) {
                    case 50:
                        JournalTopicAck journalTopicAck = (JournalTopicAck) dataStructure;
                        JournalTopicMessageStore journalTopicMessageStore = (JournalTopicMessageStore) createMessageStore(journalTopicAck.getDestination());
                        if (journalTopicAck.getTransactionId() == null) {
                            journalTopicMessageStore.replayAcknowledge(connectionContext, journalTopicAck.getClientId(), journalTopicAck.getSubscritionName(), journalTopicAck.getMessageId());
                            i++;
                            break;
                        } else {
                            this.transactionStore.acknowledge(journalTopicMessageStore, journalTopicAck, recordLocation);
                            break;
                        }
                    case 51:
                    default:
                        LOG.error("Unknown type of record in transaction log which will be discarded: " + dataStructure);
                        break;
                    case 52:
                        JournalQueueAck journalQueueAck = (JournalQueueAck) dataStructure;
                        JournalMessageStore journalMessageStore = (JournalMessageStore) createMessageStore(journalQueueAck.getDestination());
                        if (!journalQueueAck.getMessageAck().isInTransaction()) {
                            journalMessageStore.replayRemoveMessage(connectionContext, journalQueueAck.getMessageAck());
                            i++;
                            break;
                        } else {
                            this.transactionStore.removeMessage(journalMessageStore, journalQueueAck.getMessageAck(), recordLocation);
                            break;
                        }
                    case 53:
                        LOG.debug("TRACE Entry: " + ((JournalTrace) dataStructure).getMessage());
                        break;
                    case 54:
                        JournalTransaction journalTransaction = (JournalTransaction) dataStructure;
                        try {
                            switch (journalTransaction.getType()) {
                                case 1:
                                    this.transactionStore.replayPrepare(journalTransaction.getTransactionId());
                                    break;
                                case 2:
                                case 4:
                                    JournalTransactionStore.Tx replayCommit = this.transactionStore.replayCommit(journalTransaction.getTransactionId(), journalTransaction.getWasPrepared());
                                    if (replayCommit != null) {
                                        replayCommit.getOperations();
                                        Iterator<JournalTransactionStore.TxOperation> it = replayCommit.getOperations().iterator();
                                        while (it.hasNext()) {
                                            JournalTransactionStore.TxOperation next = it.next();
                                            if (next.operationType == 0) {
                                                next.store.replayAddMessage(connectionContext, (Message) next.data);
                                            }
                                            if (next.operationType == 1) {
                                                next.store.replayRemoveMessage(connectionContext, (MessageAck) next.data);
                                            }
                                            if (next.operationType == 3) {
                                                JournalTopicAck journalTopicAck2 = (JournalTopicAck) next.data;
                                                ((JournalTopicMessageStore) next.store).replayAcknowledge(connectionContext, journalTopicAck2.getClientId(), journalTopicAck2.getSubscritionName(), journalTopicAck2.getMessageId());
                                            }
                                        }
                                        i++;
                                    }
                                    break;
                                case 3:
                                case 5:
                                    this.transactionStore.replayRollback(journalTransaction.getTransactionId());
                                    break;
                                default:
                                    throw new IOException("Invalid journal command type: " + ((int) journalTransaction.getType()));
                                    break;
                            }
                        } catch (IOException e) {
                            LOG.error("Recovery Failure: Could not replay: " + dataStructure + ", reason: " + e, e);
                            break;
                        }
                }
            } else {
                Message message = (Message) dataStructure;
                JournalMessageStore journalMessageStore2 = (JournalMessageStore) createMessageStore(message.getDestination());
                if (message.isInTransaction()) {
                    this.transactionStore.addMessage(journalMessageStore2, message, recordLocation);
                } else {
                    journalMessageStore2.replayAddMessage(connectionContext, message);
                    i++;
                }
            }
        }
    }

    private IOException createReadException(RecordLocation recordLocation, Exception exc) {
        return IOExceptionSupport.create("Failed to read to journal for: " + recordLocation + ". Reason: " + exc, exc);
    }

    protected IOException createWriteException(DataStructure dataStructure, Exception exc) {
        return IOExceptionSupport.create("Failed to write to journal for: " + dataStructure + ". Reason: " + exc, exc);
    }

    protected IOException createWriteException(String str, Exception exc) {
        return IOExceptionSupport.create("Failed to write to journal for command: " + str + ". Reason: " + exc, exc);
    }

    protected IOException createRecoveryFailedException(Exception exc) {
        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + exc, exc);
    }

    public RecordLocation writeCommand(DataStructure dataStructure, boolean z) throws IOException {
        if (this.started.get()) {
            return this.journal.write(toPacket(this.wireFormat.marshal(dataStructure)), z);
        }
        throw new IOException("closed");
    }

    private RecordLocation writeTraceMessage(String str, boolean z) throws IOException {
        JournalTrace journalTrace = new JournalTrace();
        journalTrace.setMessage(str);
        return writeCommand(journalTrace, z);
    }

    @Override // org.apache.activemq.usage.UsageListener
    public void onUsageChanged(Usage usage, int i, int i2) {
        int i3 = (i2 / 10) * 10;
        int i4 = (i / 10) * 10;
        if (i3 < 70 || i4 >= i3) {
            return;
        }
        checkpoint(i3 >= 90, true);
    }

    public JournalTransactionStore getTransactionStore() {
        return this.transactionStore;
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void deleteAllMessages() throws IOException {
        try {
            JournalTrace journalTrace = new JournalTrace();
            journalTrace.setMessage("DELETED");
            this.journal.setMark(this.journal.write(toPacket(this.wireFormat.marshal(journalTrace)), false), true);
            LOG.info("Journal deleted: ");
            this.longTermPersistence.deleteAllMessages();
        } catch (IOException e) {
            throw e;
        } catch (Throwable th) {
            throw IOExceptionSupport.create(th);
        }
    }

    public SystemUsage getUsageManager() {
        return this.usageManager;
    }

    public int getMaxCheckpointMessageAddSize() {
        return this.maxCheckpointMessageAddSize;
    }

    public void setMaxCheckpointMessageAddSize(int i) {
        this.maxCheckpointMessageAddSize = i;
    }

    public int getMaxCheckpointWorkers() {
        return this.maxCheckpointWorkers;
    }

    public void setMaxCheckpointWorkers(int i) {
        this.maxCheckpointWorkers = i;
    }

    public boolean isUseExternalMessageReferences() {
        return false;
    }

    public void setUseExternalMessageReferences(boolean z) {
        if (z) {
            throw new IllegalArgumentException("The journal does not support message references.");
        }
    }

    public Packet toPacket(ByteSequence byteSequence) {
        return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(byteSequence.data, byteSequence.offset, byteSequence.length));
    }

    public ByteSequence toByteSequence(Packet packet) {
        org.apache.activeio.packet.ByteSequence asByteSequence = packet.asByteSequence();
        return new ByteSequence(asByteSequence.getData(), asByteSequence.getOffset(), asByteSequence.getLength());
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setBrokerName(String str) {
        this.longTermPersistence.setBrokerName(str);
    }

    public String toString() {
        return "JournalPersistenceAdapator(" + this.longTermPersistence + ")";
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public void setDirectory(File file) {
    }

    @Override // org.apache.activemq.store.PersistenceAdapter
    public long size() {
        return 0L;
    }

    @Override // org.apache.activemq.broker.BrokerServiceAware
    public void setBrokerService(BrokerService brokerService) {
        PersistenceAdapter longTermPersistence = getLongTermPersistence();
        if (longTermPersistence instanceof BrokerServiceAware) {
            ((BrokerServiceAware) longTermPersistence).setBrokerService(brokerService);
        }
    }
}
