/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.jpa;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.jpa.JPAPersistenceAdapter;
import org.apache.activemq.store.jpa.JPAReferenceStore;
import org.apache.activemq.store.jpa.model.StoredMessageReference;
import org.apache.activemq.store.jpa.model.StoredSubscription;
import org.apache.activemq.util.IOExceptionSupport;

public class JPATopicReferenceStore
extends JPAReferenceStore
implements TopicReferenceStore {
    private Map<StoredSubscription.SubscriptionId, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<StoredSubscription.SubscriptionId, AtomicLong>();

    public JPATopicReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
        super(adapter, destination);
    }

    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
        EntityManager manager = this.adapter.beginEntityManager(context);
        try {
            StoredSubscription ss = this.findStoredSubscription(manager, clientId, subscriptionName);
            ss.setLastAckedId(messageId.getBrokerSequenceId());
        }
        catch (Throwable e) {
            this.adapter.rollbackEntityManager(context, manager);
            throw IOExceptionSupport.create((Throwable)e);
        }
        this.adapter.commitEntityManager(context, manager);
    }

    public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
        EntityManager manager = this.adapter.beginEntityManager(null);
        try {
            Query query;
            Long rc;
            StoredSubscription ss = new StoredSubscription();
            ss.setClientId(info.getClientId());
            ss.setSubscriptionName(info.getSubcriptionName());
            ss.setDestination(this.destinationName);
            ss.setSelector(info.getSelector());
            ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
            ss.setLastAckedId(-1L);
            if (!retroactive && (rc = (Long)(query = manager.createQuery("select max(m.id) from StoredMessageReference m")).getSingleResult()) != null) {
                ss.setLastAckedId(rc);
            }
            manager.persist((Object)ss);
        }
        catch (Throwable e) {
            this.adapter.rollbackEntityManager(null, manager);
            throw IOExceptionSupport.create((Throwable)e);
        }
        this.adapter.commitEntityManager(null, manager);
    }

    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
        EntityManager manager = this.adapter.beginEntityManager(null);
        try {
            StoredSubscription ss = this.findStoredSubscription(manager, clientId, subscriptionName);
            manager.remove((Object)ss);
        }
        catch (Throwable e) {
            this.adapter.rollbackEntityManager(null, manager);
            throw IOExceptionSupport.create((Throwable)e);
        }
        this.adapter.commitEntityManager(null, manager);
    }

    private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
        Query query = manager.createQuery("select ss from StoredSubscription ss where ss.clientId=?1 and ss.subscriptionName=?2 and ss.destination=?3");
        query.setParameter(1, (Object)clientId);
        query.setParameter(2, (Object)subscriptionName);
        query.setParameter(3, (Object)this.destinationName);
        List resultList = query.getResultList();
        if (resultList.isEmpty()) {
            return null;
        }
        return (StoredSubscription)resultList.get(0);
    }

    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        SubscriptionInfo[] rc;
        EntityManager manager = this.adapter.beginEntityManager(null);
        try {
            ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
            Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
            query.setParameter(1, (Object)this.destinationName);
            for (StoredSubscription ss : query.getResultList()) {
                SubscriptionInfo info = new SubscriptionInfo();
                info.setClientId(ss.getClientId());
                info.setDestination(this.destination);
                info.setSelector(ss.getSelector());
                info.setSubscriptionName(ss.getSubscriptionName());
                info.setSubscribedDestination(this.toSubscribedDestination(ss));
                l.add(info);
            }
            rc = new SubscriptionInfo[l.size()];
            l.toArray(rc);
        }
        catch (Throwable e) {
            this.adapter.rollbackEntityManager(null, manager);
            throw IOExceptionSupport.create((Throwable)e);
        }
        this.adapter.commitEntityManager(null, manager);
        return rc;
    }

    private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
        if (ss.getSubscribedDestination() == null) {
            return null;
        }
        return ActiveMQDestination.createDestination((String)ss.getSubscribedDestination(), (byte)1);
    }

    public int getMessageCount(String clientId, String subscriptionName) throws IOException {
        Long rc;
        EntityManager manager = this.adapter.beginEntityManager(null);
        try {
            Query query = manager.createQuery("select count(m) FROM StoredMessageReference m, StoredSubscription ss where ss.clientId=?1 and   ss.subscriptionName=?2 and   ss.destination=?3 and   m.destination=ss.destination and m.id > ss.lastAckedId");
            query.setParameter(1, (Object)clientId);
            query.setParameter(2, (Object)subscriptionName);
            query.setParameter(3, (Object)this.destinationName);
            rc = (Long)query.getSingleResult();
        }
        catch (Throwable e) {
            this.adapter.rollbackEntityManager(null, manager);
            throw IOExceptionSupport.create((Throwable)e);
        }
        this.adapter.commitEntityManager(null, manager);
        return rc.intValue();
    }

    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
        SubscriptionInfo rc = null;
        EntityManager manager = this.adapter.beginEntityManager(null);
        try {
            StoredSubscription ss = this.findStoredSubscription(manager, clientId, subscriptionName);
            if (ss != null) {
                rc = new SubscriptionInfo();
                rc.setClientId(ss.getClientId());
                rc.setDestination(this.destination);
                rc.setSelector(ss.getSelector());
                rc.setSubscriptionName(ss.getSubscriptionName());
                rc.setSubscribedDestination(this.toSubscribedDestination(ss));
            }
        }
        catch (Throwable e) {
            this.adapter.rollbackEntityManager(null, manager);
            throw IOExceptionSupport.create((Throwable)e);
        }
        this.adapter.commitEntityManager(null, manager);
        return rc;
    }

    public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
        EntityManager manager = this.adapter.beginEntityManager(null);
        try {
            StoredSubscription.SubscriptionId id = new StoredSubscription.SubscriptionId();
            id.setClientId(clientId);
            id.setSubscriptionName(subscriptionName);
            id.setDestination(this.destinationName);
            AtomicLong last = this.subscriberLastMessageMap.get(id);
            if (last == null) {
                StoredSubscription ss = this.findStoredSubscription(manager, clientId, subscriptionName);
                last = new AtomicLong(ss.getLastAckedId());
                this.subscriberLastMessageMap.put(id, last);
            }
            AtomicLong lastMessageId = last;
            Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
            query.setParameter(1, (Object)this.destinationName);
            query.setParameter(2, (Object)lastMessageId.get());
            query.setMaxResults(maxReturned);
            int count = 0;
            for (StoredMessageReference m : query.getResultList()) {
                MessageId mid = new MessageId(m.getMessageId());
                mid.setBrokerSequenceId(m.getId());
                listener.recoverMessageReference(mid);
                lastMessageId.set(m.getId());
                if (++count < maxReturned) continue;
                return;
            }
        }
        catch (Throwable e) {
            this.adapter.rollbackEntityManager(null, manager);
            throw IOExceptionSupport.create((Throwable)e);
        }
        this.adapter.commitEntityManager(null, manager);
    }

    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
        EntityManager manager = this.adapter.beginEntityManager(null);
        try {
            StoredSubscription ss = this.findStoredSubscription(manager, clientId, subscriptionName);
            Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
            query.setParameter(1, (Object)this.destinationName);
            query.setParameter(2, (Object)ss.getLastAckedId());
            for (StoredMessageReference m : query.getResultList()) {
                MessageId mid = new MessageId(m.getMessageId());
                mid.setBrokerSequenceId(m.getId());
                listener.recoverMessageReference(mid);
            }
        }
        catch (Throwable e) {
            this.adapter.rollbackEntityManager(null, manager);
            throw IOExceptionSupport.create((Throwable)e);
        }
        this.adapter.commitEntityManager(null, manager);
    }

    public void resetBatching(String clientId, String subscriptionName) {
        StoredSubscription.SubscriptionId id = new StoredSubscription.SubscriptionId();
        id.setClientId(clientId);
        id.setSubscriptionName(subscriptionName);
        id.setDestination(this.destinationName);
        this.subscriberLastMessageMap.remove(id);
    }

    public boolean acknowledgeReference(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
        this.acknowledge(context, clientId, subscriptionName, messageId);
        return true;
    }
}

