package org.apache.activemq.transport.mqtt;

import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-mqtt-5.11.0.redhat-630310-05.jar:org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.class */
public class MQTTInactivityMonitor extends TransportFilter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MQTTInactivityMonitor.class);
    private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
    private static ThreadPoolExecutor ASYNC_TASKS;
    private static int CHECKER_COUNTER;
    private static Timer READ_CHECK_TIMER;
    private final AtomicBoolean failed;
    private final AtomicBoolean inReceive;
    private final AtomicInteger lastReceiveCounter;
    private final ReentrantLock sendLock;
    private SchedulerTimerTask readCheckerTask;
    private long readGraceTime;
    private long readKeepAliveTime;
    private MQTTProtocolConverter protocolConverter;
    private long connectionTimeout;
    private SchedulerTimerTask connectCheckerTask;
    private final Runnable connectChecker;
    private final Runnable readChecker;
    private final ThreadFactory factory;

    public MQTTInactivityMonitor(Transport transport, WireFormat wireFormat) {
        super(transport);
        this.failed = new AtomicBoolean(false);
        this.inReceive = new AtomicBoolean(false);
        this.lastReceiveCounter = new AtomicInteger(0);
        this.sendLock = new ReentrantLock();
        this.readGraceTime = 30000L;
        this.readKeepAliveTime = 30000L;
        this.connectionTimeout = 30000L;
        this.connectChecker = new Runnable() { // from class: org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.1
            private final long startTime = System.currentTimeMillis();

            @Override // java.lang.Runnable
            public void run() {
                if (System.currentTimeMillis() - this.startTime < MQTTInactivityMonitor.this.connectionTimeout || MQTTInactivityMonitor.this.connectCheckerTask == null || MQTTInactivityMonitor.ASYNC_TASKS.isShutdown()) {
                    return;
                }
                if (MQTTInactivityMonitor.LOG.isDebugEnabled()) {
                    MQTTInactivityMonitor.LOG.debug("No CONNECT frame received in time for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
                }
                try {
                    MQTTInactivityMonitor.ASYNC_TASKS.execute(new Runnable() { // from class: org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            MQTTInactivityMonitor.this.onException(new InactivityIOException("Channel was inactive for too (>" + (MQTTInactivityMonitor.this.readKeepAliveTime + MQTTInactivityMonitor.this.readGraceTime) + ") long: " + MQTTInactivityMonitor.this.next.getRemoteAddress()));
                        }
                    });
                } catch (RejectedExecutionException e) {
                    if (MQTTInactivityMonitor.ASYNC_TASKS.isShutdown()) {
                        return;
                    }
                    MQTTInactivityMonitor.LOG.error("Async connection timeout task was rejected from the executor: ", (Throwable) e);
                    throw e;
                }
            }
        };
        this.readChecker = new Runnable() { // from class: org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.2
            long lastReceiveTime = System.currentTimeMillis();

            @Override // java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                int receiveCounter = MQTTInactivityMonitor.this.next.getReceiveCounter();
                int andSet = MQTTInactivityMonitor.this.lastReceiveCounter.getAndSet(receiveCounter);
                if (MQTTInactivityMonitor.this.inReceive.get() || receiveCounter != andSet) {
                    if (MQTTInactivityMonitor.LOG.isTraceEnabled()) {
                        MQTTInactivityMonitor.LOG.trace("Command received since last read check.");
                    }
                    this.lastReceiveTime = currentTimeMillis;
                } else {
                    if (currentTimeMillis - this.lastReceiveTime < MQTTInactivityMonitor.this.readKeepAliveTime + MQTTInactivityMonitor.this.readGraceTime || MQTTInactivityMonitor.this.readCheckerTask == null || MQTTInactivityMonitor.ASYNC_TASKS.isShutdown()) {
                        return;
                    }
                    if (MQTTInactivityMonitor.LOG.isDebugEnabled()) {
                        MQTTInactivityMonitor.LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
                    }
                    try {
                        MQTTInactivityMonitor.ASYNC_TASKS.execute(new Runnable() { // from class: org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.2.1
                            @Override // java.lang.Runnable
                            public void run() {
                                MQTTInactivityMonitor.this.onException(new InactivityIOException("Channel was inactive for too (>" + MQTTInactivityMonitor.this.connectionTimeout + ") long: " + MQTTInactivityMonitor.this.next.getRemoteAddress()));
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        if (MQTTInactivityMonitor.ASYNC_TASKS.isShutdown()) {
                            return;
                        }
                        MQTTInactivityMonitor.LOG.error("Async connection timeout task was rejected from the executor: ", (Throwable) e);
                        throw e;
                    }
                }
            }
        };
        this.factory = new ThreadFactory() { // from class: org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.3
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
                thread.setDaemon(true);
                return thread;
            }
        };
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.Service
    public void start() throws Exception {
        this.next.start();
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.Service
    public void stop() throws Exception {
        stopReadChecker();
        stopConnectChecker();
        this.next.stop();
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.transport.TransportListener
    public void onCommand(Object obj) {
        this.inReceive.set(true);
        try {
            this.transportListener.onCommand(obj);
            this.inReceive.set(false);
        } catch (Throwable th) {
            this.inReceive.set(false);
            throw th;
        }
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.transport.Transport
    public void oneway(Object obj) throws IOException {
        this.sendLock.lock();
        try {
            doOnewaySend(obj);
            this.sendLock.unlock();
        } catch (Throwable th) {
            this.sendLock.unlock();
            throw th;
        }
    }

    private void doOnewaySend(Object obj) throws IOException {
        if (this.failed.get()) {
            throw new InactivityIOException("Cannot send, channel has already failed: " + this.next.getRemoteAddress());
        }
        this.next.oneway(obj);
    }

    @Override // org.apache.activemq.transport.TransportFilter, org.apache.activemq.transport.TransportListener
    public void onException(IOException iOException) {
        if (this.failed.compareAndSet(false, true)) {
            stopConnectChecker();
            stopReadChecker();
            if (this.protocolConverter != null) {
                this.protocolConverter.onTransportError();
            }
            this.transportListener.onException(iOException);
        }
    }

    public long getReadGraceTime() {
        return this.readGraceTime;
    }

    public void setReadGraceTime(long j) {
        this.readGraceTime = j;
    }

    public long getReadKeepAliveTime() {
        return this.readKeepAliveTime;
    }

    public void setReadKeepAliveTime(long j) {
        this.readKeepAliveTime = j;
    }

    public void setProtocolConverter(MQTTProtocolConverter mQTTProtocolConverter) {
        this.protocolConverter = mQTTProtocolConverter;
    }

    public MQTTProtocolConverter getProtocolConverter() {
        return this.protocolConverter;
    }

    public synchronized void startConnectChecker(long j) {
        this.connectionTimeout = j;
        if (j <= 0 || this.connectCheckerTask != null) {
            return;
        }
        this.connectCheckerTask = new SchedulerTimerTask(this.connectChecker);
        long min = Math.min(j, 1000L);
        synchronized (AbstractInactivityMonitor.class) {
            if (CHECKER_COUNTER == 0) {
                if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
                    ASYNC_TASKS = createExecutor();
                }
                READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
            }
            CHECKER_COUNTER++;
            READ_CHECK_TIMER.schedule(this.connectCheckerTask, min, min);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void startReadChecker() {
        if (this.readKeepAliveTime <= 0 || this.readCheckerTask != null) {
            return;
        }
        this.readCheckerTask = new SchedulerTimerTask(this.readChecker);
        synchronized (AbstractInactivityMonitor.class) {
            if (CHECKER_COUNTER == 0) {
                if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
                    ASYNC_TASKS = createExecutor();
                }
                READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
            }
            CHECKER_COUNTER++;
            READ_CHECK_TIMER.schedule(this.readCheckerTask, this.readKeepAliveTime, this.readGraceTime);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void stopConnectChecker() {
        if (this.connectCheckerTask != null) {
            this.connectCheckerTask.cancel();
            this.connectCheckerTask = null;
            synchronized (AbstractInactivityMonitor.class) {
                READ_CHECK_TIMER.purge();
                CHECKER_COUNTER--;
                if (CHECKER_COUNTER == 0) {
                    READ_CHECK_TIMER.cancel();
                    READ_CHECK_TIMER = null;
                }
            }
        }
    }

    synchronized void stopReadChecker() {
        if (this.readCheckerTask != null) {
            this.readCheckerTask.cancel();
            this.readCheckerTask = null;
            synchronized (AbstractInactivityMonitor.class) {
                READ_CHECK_TIMER.purge();
                CHECKER_COUNTER--;
                if (CHECKER_COUNTER == 0) {
                    READ_CHECK_TIMER.cancel();
                    READ_CHECK_TIMER = null;
                }
            }
        }
    }

    private ThreadPoolExecutor createExecutor() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), this.factory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }
}
