package org.apache.activemq.broker.region.policy;

import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.SubscriptionRecovery;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.util.IdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:activemq-core-5.5.1-fuse-01-11.jar:org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.class */
public class QueryBasedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
    private static final Logger LOG = LoggerFactory.getLogger(QueryBasedSubscriptionRecoveryPolicy.class);
    private MessageQuery query;
    private final AtomicLong messageSequence = new AtomicLong(0);
    private final IdGenerator idGenerator = new IdGenerator();
    private final ProducerId producerId = createProducerId();

    @Override // org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy
    public SubscriptionRecoveryPolicy copy() {
        QueryBasedSubscriptionRecoveryPolicy queryBasedSubscriptionRecoveryPolicy = new QueryBasedSubscriptionRecoveryPolicy();
        queryBasedSubscriptionRecoveryPolicy.setQuery(this.query);
        return queryBasedSubscriptionRecoveryPolicy;
    }

    @Override // org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy
    public boolean add(ConnectionContext connectionContext, MessageReference messageReference) throws Exception {
        return this.query.validateUpdate(messageReference.getMessage());
    }

    @Override // org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy
    public void recover(final ConnectionContext connectionContext, final Topic topic, final SubscriptionRecovery subscriptionRecovery) throws Exception {
        if (this.query != null) {
            this.query.execute(subscriptionRecovery.getActiveMQDestination(), new MessageListener() { // from class: org.apache.activemq.broker.region.policy.QueryBasedSubscriptionRecoveryPolicy.1
                public void onMessage(Message message) {
                    QueryBasedSubscriptionRecoveryPolicy.this.dispatchInitialMessage(message, topic, connectionContext, subscriptionRecovery);
                }
            });
        }
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
        if (this.query == null) {
            throw new IllegalArgumentException("No query property configured");
        }
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
    }

    public MessageQuery getQuery() {
        return this.query;
    }

    public void setQuery(MessageQuery messageQuery) {
        this.query = messageQuery;
    }

    @Override // org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy
    public org.apache.activemq.command.Message[] browse(ActiveMQDestination activeMQDestination) throws Exception {
        return new org.apache.activemq.command.Message[0];
    }

    @Override // org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy
    public void setBroker(Broker broker) {
    }

    protected void dispatchInitialMessage(Message message, Destination destination, ConnectionContext connectionContext, SubscriptionRecovery subscriptionRecovery) {
        try {
            ActiveMQMessage transformMessage = ActiveMQMessageTransformation.transformMessage(message, null);
            if (transformMessage.getDestination() == null) {
                transformMessage.setDestination(subscriptionRecovery.getActiveMQDestination());
            }
            transformMessage.setRegionDestination(destination);
            configure(transformMessage);
            subscriptionRecovery.addRecoveredMessage(connectionContext, transformMessage);
        } catch (Throwable th) {
            LOG.warn("Failed to dispatch initial message: " + message + " into subscription. Reason: " + th, th);
        }
    }

    protected void configure(ActiveMQMessage activeMQMessage) throws JMSException {
        activeMQMessage.setMessageId(new MessageId(this.producerId, this.messageSequence.incrementAndGet()));
        activeMQMessage.onSend();
        activeMQMessage.setProducerId(this.producerId);
    }

    protected ProducerId createProducerId() {
        return new ProducerId(new SessionId(new ConnectionId(this.idGenerator.generateId()), 1L), 1L);
    }
}
