package org.apache.activemq.broker.scheduler;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Connector;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.JobSchedulerUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.TypeConversionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-broker-5.11.0.redhat-630389.jar:org/apache/activemq/broker/scheduler/SchedulerBroker.class */
public class SchedulerBroker extends BrokerFilter implements JobListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SchedulerBroker.class);
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
    private final LongSequenceGenerator messageIdGenerator;
    private final AtomicBoolean started;
    private final WireFormat wireFormat;
    private final ConnectionContext context;
    private final ProducerId producerId;
    private final SystemUsage systemUsage;
    private final JobSchedulerStore store;
    private JobScheduler scheduler;

    public SchedulerBroker(BrokerService brokerService, Broker broker, JobSchedulerStore jobSchedulerStore) throws Exception {
        super(broker);
        this.messageIdGenerator = new LongSequenceGenerator();
        this.started = new AtomicBoolean();
        this.wireFormat = new OpenWireFormat();
        this.context = new ConnectionContext();
        this.producerId = new ProducerId();
        this.store = jobSchedulerStore;
        this.producerId.setConnectionId(ID_GENERATOR.generateId());
        this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
        this.context.setConnection(new Connection() { // from class: org.apache.activemq.broker.scheduler.SchedulerBroker.1
            @Override // org.apache.activemq.broker.Connection
            public Connector getConnector() {
                return null;
            }

            @Override // org.apache.activemq.broker.Connection
            public void dispatchSync(Command command) {
                if (command instanceof ExceptionResponse) {
                    SchedulerBroker.LOG.warn("Unexpected response: " + command);
                }
            }

            @Override // org.apache.activemq.broker.Connection
            public void dispatchAsync(Command command) {
                if (command instanceof ExceptionResponse) {
                    SchedulerBroker.LOG.warn("Unexpected response: " + command);
                }
            }

            @Override // org.apache.activemq.broker.Connection
            public Response service(Command command) {
                return null;
            }

            @Override // org.apache.activemq.broker.Connection
            public void serviceException(Throwable th) {
                SchedulerBroker.LOG.warn("Unexpected exception: " + th, th);
            }

            @Override // org.apache.activemq.broker.Connection
            public boolean isSlow() {
                return false;
            }

            @Override // org.apache.activemq.broker.Connection
            public boolean isBlocked() {
                return false;
            }

            @Override // org.apache.activemq.broker.Connection
            public boolean isConnected() {
                return false;
            }

            @Override // org.apache.activemq.broker.Connection
            public boolean isActive() {
                return false;
            }

            @Override // org.apache.activemq.broker.Connection
            public int getDispatchQueueSize() {
                return 0;
            }

            @Override // org.apache.activemq.broker.Connection
            public ConnectionStatistics getStatistics() {
                return null;
            }

            @Override // org.apache.activemq.broker.Connection
            public boolean isManageable() {
                return false;
            }

            @Override // org.apache.activemq.broker.Connection
            public String getRemoteAddress() {
                return null;
            }

            @Override // org.apache.activemq.broker.Connection
            public void serviceExceptionAsync(IOException iOException) {
                SchedulerBroker.LOG.warn("Unexpected async ioexception: " + iOException, (Throwable) iOException);
            }

            @Override // org.apache.activemq.broker.Connection
            public String getConnectionId() {
                return null;
            }

            @Override // org.apache.activemq.broker.Connection
            public boolean isNetworkConnection() {
                return false;
            }

            @Override // org.apache.activemq.broker.Connection
            public boolean isFaultTolerantConnection() {
                return false;
            }

            @Override // org.apache.activemq.broker.Connection
            public void updateClient(ConnectionControl connectionControl) {
            }

            @Override // org.apache.activemq.broker.Connection
            public int getActiveTransactionCount() {
                return 0;
            }

            @Override // org.apache.activemq.broker.Connection
            public Long getOldestActiveTransactionDuration() {
                return null;
            }

            @Override // org.apache.activemq.Service
            public void start() throws Exception {
            }

            @Override // org.apache.activemq.Service
            public void stop() throws Exception {
            }
        });
        this.context.setBroker(broker);
        this.systemUsage = brokerService.getSystemUsage();
        this.wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
    }

    public synchronized JobScheduler getJobScheduler() throws Exception {
        return new JobSchedulerFacade(this);
    }

    @Override // org.apache.activemq.broker.BrokerFilter, org.apache.activemq.Service
    public void start() throws Exception {
        this.started.set(true);
        getInternalScheduler();
        super.start();
    }

    @Override // org.apache.activemq.broker.BrokerFilter, org.apache.activemq.Service
    public void stop() throws Exception {
        if (this.started.compareAndSet(true, false)) {
            if (this.store != null) {
                this.store.stop();
            }
            if (this.scheduler != null) {
                this.scheduler.removeListener(this);
                this.scheduler = null;
            }
        }
        super.stop();
    }

    @Override // org.apache.activemq.broker.BrokerFilter, org.apache.activemq.broker.region.Region
    public void send(ProducerBrokerExchange producerBrokerExchange, final Message message) throws Exception {
        ConnectionContext connectionContext = producerBrokerExchange.getConnectionContext();
        String str = (String) message.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
        final Object property = message.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
        final Object property2 = message.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
        final Object property3 = message.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
        if (!message.getDestination().getPhysicalName().regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length())) {
            if ((property == null && property2 == null && property3 == null) || str != null) {
                super.send(producerBrokerExchange, message);
                return;
            }
            if (this.systemUsage.getJobSchedulerUsage() != null) {
                JobSchedulerUsage jobSchedulerUsage = this.systemUsage.getJobSchedulerUsage();
                if (jobSchedulerUsage.isFull()) {
                    String str2 = "Job Scheduler Store is Full (" + jobSchedulerUsage.getPercentUsage() + "% of " + jobSchedulerUsage.getLimit() + "). Stopping producer (" + message.getProducerId() + ") to prevent flooding of the job scheduler store. See http://activemq.apache.org/producer-flow-control.html for more info";
                    long currentTimeMillis = System.currentTimeMillis();
                    long j = currentTimeMillis;
                    while (!jobSchedulerUsage.waitForSpace(1000L)) {
                        if (connectionContext.getStopping().get()) {
                            throw new IOException("Connection closed, send aborted.");
                        }
                        long currentTimeMillis2 = System.currentTimeMillis();
                        if (currentTimeMillis2 >= j) {
                            LOG.info("" + jobSchedulerUsage + ": " + str2 + " (blocking for: " + ((currentTimeMillis2 - currentTimeMillis) / 1000) + "s)");
                            j = currentTimeMillis2 + 30000;
                        }
                    }
                }
            }
            if (connectionContext.isInTransaction()) {
                connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: org.apache.activemq.broker.scheduler.SchedulerBroker.2
                    @Override // org.apache.activemq.transaction.Synchronization
                    public void afterCommit() throws Exception {
                        SchedulerBroker.this.doSchedule(message, property, property2, property3);
                    }
                });
                return;
            } else {
                doSchedule(message, property, property2, property3);
                return;
            }
        }
        JobScheduler internalScheduler = getInternalScheduler();
        ActiveMQDestination replyTo = message.getReplyTo();
        String str3 = (String) message.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
        if (str3 != null) {
            Object property4 = message.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
            Object property5 = message.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
            if (replyTo != null && str3.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
                if (property4 == null || property5 == null) {
                    Iterator<Job> it = internalScheduler.getAllJobs().iterator();
                    while (it.hasNext()) {
                        sendScheduledJob(producerBrokerExchange.getConnectionContext(), it.next(), replyTo);
                    }
                } else {
                    Iterator<Job> it2 = internalScheduler.getAllJobs(((Long) TypeConversionSupport.convert(property4, Long.class)).longValue(), ((Long) TypeConversionSupport.convert(property5, Long.class)).longValue()).iterator();
                    while (it2.hasNext()) {
                        sendScheduledJob(producerBrokerExchange.getConnectionContext(), it2.next(), replyTo);
                    }
                }
            }
            if (str != null && str3.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
                internalScheduler.remove(str);
                return;
            }
            if (str3.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
                if (property4 == null || property5 == null) {
                    internalScheduler.removeAllJobs();
                } else {
                    internalScheduler.removeAllJobs(((Long) TypeConversionSupport.convert(property4, Long.class)).longValue(), ((Long) TypeConversionSupport.convert(property5, Long.class)).longValue());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doSchedule(Message message, Object obj, Object obj2, Object obj3) throws Exception {
        long j = 0;
        long j2 = 0;
        int i = 0;
        Message copy = message.copy();
        copy.setTransactionId(null);
        ByteSequence marshal = this.wireFormat.marshal(copy);
        String obj4 = obj != null ? obj.toString() : "";
        if (obj2 != null) {
            j2 = ((Long) TypeConversionSupport.convert(obj2, Long.class)).longValue();
        }
        if (obj3 != null) {
            j = ((Long) TypeConversionSupport.convert(obj3, Long.class)).longValue();
        }
        Object property = copy.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
        if (property != null) {
            i = ((Integer) TypeConversionSupport.convert(property, Integer.class)).intValue();
        }
        getInternalScheduler().schedule(copy.getMessageId().toString(), new ByteSequence(marshal.data, marshal.offset, marshal.length), obj4, j, j2, i);
    }

    @Override // org.apache.activemq.broker.scheduler.JobListener
    public void scheduledJob(String str, ByteSequence byteSequence) {
        try {
            Message message = (Message) this.wireFormat.unmarshal(new ByteSequence(byteSequence.getData(), byteSequence.getOffset(), byteSequence.getLength()));
            message.setOriginalTransactionId(null);
            Object property = message.getProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
            Object property2 = message.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
            String obj = property2 != null ? property2.toString() : null;
            int i = 0;
            if (property != null) {
                i = ((Integer) TypeConversionSupport.convert(property, Integer.class)).intValue();
            }
            if (i != 0 || (obj != null && obj.length() > 0)) {
                message.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
            }
            message.setProperty(ScheduledMessage.AMQ_SCHEDULED_ID, str);
            message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
            message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
            message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT);
            message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
            if (message.getTimestamp() > 0 && message.getExpiration() > 0) {
                long expiration = message.getExpiration();
                long currentTimeMillis = System.currentTimeMillis();
                long j = 0;
                long timestamp = message.getTimestamp();
                if (expiration > 0) {
                    j = expiration - timestamp;
                }
                long j2 = j + currentTimeMillis;
                if (j2 > expiration) {
                    if (j > 0 && j2 > 0) {
                        message.setExpiration(j2);
                    }
                    message.setTimestamp(currentTimeMillis);
                    LOG.debug("Set message {} timestamp from {} to {}", message.getMessageId(), Long.valueOf(timestamp), Long.valueOf(currentTimeMillis));
                }
            }
            message.beforeMarshall(this.wireFormat);
            ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
            producerBrokerExchange.setConnectionContext(this.context);
            producerBrokerExchange.setMutable(true);
            producerBrokerExchange.setProducerState(new ProducerState(new ProducerInfo()));
            super.send(producerBrokerExchange, message);
        } catch (Exception e) {
            LOG.error("Failed to send scheduled message {}", str, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized JobScheduler getInternalScheduler() throws Exception {
        if (!this.started.get()) {
            return null;
        }
        if (this.scheduler == null && this.store != null) {
            this.scheduler = this.store.getJobScheduler("JMS");
            this.scheduler.addListener(this);
            this.scheduler.startDispatching();
        }
        return this.scheduler;
    }

    protected void sendScheduledJob(ConnectionContext connectionContext, Job job, ActiveMQDestination activeMQDestination) throws Exception {
        try {
            Message message = (Message) this.wireFormat.unmarshal(new ByteSequence(job.getPayload()));
            message.setOriginalTransactionId(null);
            message.setPersistent(false);
            message.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
            message.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
            message.setDestination(activeMQDestination);
            message.setResponseRequired(false);
            message.setProducerId(this.producerId);
            message.setProperty(ScheduledMessage.AMQ_SCHEDULED_ID, job.getJobId());
            boolean isProducerFlowControl = connectionContext.isProducerFlowControl();
            ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
            producerBrokerExchange.setConnectionContext(connectionContext);
            producerBrokerExchange.setMutable(true);
            producerBrokerExchange.setProducerState(new ProducerState(new ProducerInfo()));
            try {
                connectionContext.setProducerFlowControl(false);
                this.next.send(producerBrokerExchange, message);
                connectionContext.setProducerFlowControl(isProducerFlowControl);
            } catch (Throwable th) {
                connectionContext.setProducerFlowControl(isProducerFlowControl);
                throw th;
            }
        } catch (Exception e) {
            LOG.error("Failed to send scheduled message {}", job.getJobId(), e);
        }
    }
}
