package org.apache.activemq.transport.mqtt;

import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-all-5.9.0.redhat-610090.jar:org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.class */
public class MQTTInactivityMonitor extends TransportFilter {
    private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
    private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
    private static ThreadPoolExecutor ASYNC_TASKS;
    private static int CHECKER_COUNTER;
    private static Timer READ_CHECK_TIMER;
    private final AtomicBoolean monitorStarted;
    private final AtomicBoolean failed;
    private final AtomicBoolean commandReceived;
    private final AtomicBoolean inReceive;
    private final AtomicInteger lastReceiveCounter;
    private final ReentrantLock sendLock;
    private SchedulerTimerTask readCheckerTask;
    private long readCheckTime;
    private long initialDelayTime;
    private boolean keepAliveResponseRequired;
    private MQTTProtocolConverter protocolConverter;
    private final Runnable readChecker;
    private ThreadFactory factory;

    /* JADX INFO: Access modifiers changed from: private */
    public boolean allowReadCheck(long j) {
        return j > (this.readCheckTime * 9) / 10;
    }

    public MQTTInactivityMonitor(Transport transport, WireFormat wireFormat) {
        super(transport);
        this.monitorStarted = new AtomicBoolean(false);
        this.failed = new AtomicBoolean(false);
        this.commandReceived = new AtomicBoolean(true);
        this.inReceive = new AtomicBoolean(false);
        this.lastReceiveCounter = new AtomicInteger(0);
        this.sendLock = new ReentrantLock();
        this.readCheckTime = 30000L;
        this.initialDelayTime = 30000L;
        this.readChecker = new Runnable() { // from class: org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.1
            long lastRunTime;

            @Override // java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                long j = currentTimeMillis - this.lastRunTime;
                if (this.lastRunTime != 0 && MQTTInactivityMonitor.LOG.isDebugEnabled()) {
                    MQTTInactivityMonitor.LOG.debug("" + j + " ms elapsed since last read check.");
                }
                if (!MQTTInactivityMonitor.this.allowReadCheck(j)) {
                    MQTTInactivityMonitor.LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
                } else {
                    this.lastRunTime = currentTimeMillis;
                    MQTTInactivityMonitor.this.readCheck();
                }
            }
        };
        this.factory = new ThreadFactory() { // from class: org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.3
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
                thread.setDaemon(true);
                return thread;
            }
        };
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.Service
    public void start() throws Exception {
        this.next.start();
        startMonitorThread();
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.Service
    public void stop() throws Exception {
        stopMonitorThread();
        this.next.stop();
    }

    final void readCheck() {
        int receiveCounter = this.next.getReceiveCounter();
        int andSet = this.lastReceiveCounter.getAndSet(receiveCounter);
        if (this.inReceive.get() || receiveCounter != andSet) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("A receive is in progress");
                return;
            }
            return;
        }
        if (!this.commandReceived.get() && this.monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
            }
            ASYNC_TASKS.execute(new Runnable() { // from class: org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.2
                @Override // java.lang.Runnable
                public void run() {
                    if (MQTTInactivityMonitor.this.protocolConverter != null) {
                        MQTTInactivityMonitor.this.protocolConverter.onTransportError();
                    }
                    MQTTInactivityMonitor.this.onException(new InactivityIOException("Channel was inactive for too (>" + MQTTInactivityMonitor.this.readCheckTime + ") long: " + MQTTInactivityMonitor.this.next.getRemoteAddress()));
                }
            });
        } else if (LOG.isTraceEnabled()) {
            LOG.trace("Message received since last read check, resetting flag: ");
        }
        this.commandReceived.set(false);
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.transport.TransportListener
    public void onCommand(Object obj) {
        this.commandReceived.set(true);
        this.inReceive.set(true);
        try {
            this.transportListener.onCommand(obj);
            this.inReceive.set(false);
        } catch (Throwable th) {
            this.inReceive.set(false);
            throw th;
        }
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.transport.Transport
    public void oneway(Object obj) throws IOException {
        this.sendLock.lock();
        try {
            doOnewaySend(obj);
            this.sendLock.unlock();
        } catch (Throwable th) {
            this.sendLock.unlock();
            throw th;
        }
    }

    private void doOnewaySend(Object obj) throws IOException {
        if (this.failed.get()) {
            throw new InactivityIOException("Cannot send, channel has already failed: " + this.next.getRemoteAddress());
        }
        this.next.oneway(obj);
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.transport.TransportListener
    public void onException(IOException iOException) {
        if (this.failed.compareAndSet(false, true)) {
            stopMonitorThread();
            this.transportListener.onException(iOException);
        }
    }

    public long getReadCheckTime() {
        return this.readCheckTime;
    }

    public void setReadCheckTime(long j) {
        this.readCheckTime = j;
    }

    public long getInitialDelayTime() {
        return this.initialDelayTime;
    }

    public void setInitialDelayTime(long j) {
        this.initialDelayTime = j;
    }

    public boolean isKeepAliveResponseRequired() {
        return this.keepAliveResponseRequired;
    }

    public void setKeepAliveResponseRequired(boolean z) {
        this.keepAliveResponseRequired = z;
    }

    public boolean isMonitorStarted() {
        return this.monitorStarted.get();
    }

    public void setProtocolConverter(MQTTProtocolConverter mQTTProtocolConverter) {
        this.protocolConverter = mQTTProtocolConverter;
    }

    public MQTTProtocolConverter getProtocolConverter() {
        return this.protocolConverter;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void startMonitorThread() {
        if (this.protocolConverter == null || this.monitorStarted.get()) {
            return;
        }
        if (this.readCheckTime > 0) {
            this.readCheckerTask = new SchedulerTimerTask(this.readChecker);
        }
        if (this.readCheckTime > 0) {
            this.monitorStarted.set(true);
            synchronized (AbstractInactivityMonitor.class) {
                if (CHECKER_COUNTER == 0) {
                    ASYNC_TASKS = createExecutor();
                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
                }
                CHECKER_COUNTER++;
                if (this.readCheckTime > 0) {
                    READ_CHECK_TIMER.schedule(this.readCheckerTask, this.initialDelayTime, this.readCheckTime);
                }
            }
        }
    }

    synchronized void stopMonitorThread() {
        if (this.monitorStarted.compareAndSet(true, false)) {
            if (this.readCheckerTask != null) {
                this.readCheckerTask.cancel();
            }
            synchronized (AbstractInactivityMonitor.class) {
                READ_CHECK_TIMER.purge();
                CHECKER_COUNTER--;
                if (CHECKER_COUNTER == 0) {
                    READ_CHECK_TIMER.cancel();
                    READ_CHECK_TIMER = null;
                    ThreadPoolUtils.shutdown(ASYNC_TASKS);
                    ASYNC_TASKS = null;
                }
            }
        }
    }

    private ThreadPoolExecutor createExecutor() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), this.factory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }
}
