/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageAccessor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.utils.MessageUtils;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ProcessQueue {
    public static final long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
    public static final long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
    private static final long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
    private final InternalLogger log = ClientLogger.getLog();
    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap();
    private final AtomicLong msgCount = new AtomicLong();
    private final AtomicLong msgSize = new AtomicLong();
    private final AtomicLong mergeProgress = new AtomicLong();
    private final AtomicLong consumeProgress = new AtomicLong();
    private final ReadWriteLock lockConsume = new ReentrantReadWriteLock();
    private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap();
    private final PartitionalMessageList partitionalMessageList;
    private final AtomicLong tryUnlockTimes = new AtomicLong(0L);
    private volatile long safeMergeSize = 0L;
    private volatile long nextOffset = 0L;
    private volatile long queueOffsetMax = 0L;
    private volatile boolean dropped = false;
    private volatile long lastPullTimestamp = System.currentTimeMillis();
    private volatile long lastConsumeTimestamp = System.currentTimeMillis();
    private volatile boolean locked = false;
    private volatile long lastLockTimestamp = System.currentTimeMillis();
    private volatile long msgAccCnt = 0L;
    private volatile boolean isNormalMsgClean = true;
    private volatile boolean isReceivedHAMsg = false;
    private MessageQueue messageQueue;

    public ProcessQueue() {
        this.partitionalMessageList = new PartitionalMessageList(1);
    }

    public ProcessQueue(int shardNum) {
        this.partitionalMessageList = new PartitionalMessageList(shardNum);
    }

    public boolean isLockExpired() {
        return System.currentTimeMillis() - this.lastLockTimestamp > REBALANCE_LOCK_MAX_LIVE_TIME;
    }

    public boolean isPullExpired() {
        return System.currentTimeMillis() - this.lastPullTimestamp > PULL_MAX_IDLE_TIME;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
        if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
            return;
        }
        int loop = Math.min(this.msgTreeMap.size(), 16);
        for (int i = 0; i < loop; ++i) {
            Message msg = null;
            try {
                this.lockTreeMap.readLock().lockInterruptibly();
                try {
                    String consumeStartTimestamp;
                    if (this.msgTreeMap.isEmpty() || (consumeStartTimestamp = MessageAccessor.getConsumeStartTimeStamp(this.msgTreeMap.firstEntry().getValue())) == null || System.currentTimeMillis() - Long.parseLong(consumeStartTimestamp) <= pushConsumer.getConsumeTimeout() * 60L * 1000L) break;
                    msg = this.msgTreeMap.firstEntry().getValue();
                }
                finally {
                    this.lockTreeMap.readLock().unlock();
                }
            }
            catch (Exception e) {
                this.log.error("getExpiredMsg exception", e);
            }
            if (null == msg) {
                this.log.warn("Failed to peek first message from tree-map, possibly due to concurrent remove");
                return;
            }
            try {
                pushConsumer.sendMessageBack((MessageExt)msg, 0);
                this.log.info("Expired msg sent back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), ((MessageExt)msg).getMsgId(), ((MessageExt)msg).getStoreHost(), ((MessageExt)msg).getQueueId(), ((MessageExt)msg).getQueueOffset());
                try {
                    this.lockTreeMap.writeLock().lockInterruptibly();
                    try {
                        if (this.msgTreeMap.isEmpty() || ((MessageExt)msg).getQueueOffset() != this.msgTreeMap.firstKey().longValue()) continue;
                        try {
                            long offset = this.removeMessage(Collections.singletonList(msg));
                            if (offset <= 0L || null == this.messageQueue) continue;
                            this.log.info("New offset after removal of expired message: {}", (Object)offset);
                        }
                        catch (Exception e) {
                            this.log.error("Remove messages from tree-map raised an exception", e);
                        }
                    }
                    finally {
                        this.lockTreeMap.writeLock().unlock();
                    }
                }
                catch (InterruptedException e) {
                    this.log.error("getExpiredMsg exception", e);
                }
                continue;
            }
            catch (Exception e) {
                this.log.error("send expired msg exception", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void putMessage(List<MessageExt> msgs) {
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                long accTotal;
                MessageExt messageExt;
                String property;
                int validMsgCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt old = this.msgTreeMap.put(msg.getQueueOffset(), msg);
                    if (null == old) {
                        ++validMsgCnt;
                        this.queueOffsetMax = Math.max(this.queueOffsetMax, msg.getQueueOffset());
                        this.msgSize.addAndGet(msg.getBody() != null ? (long)msg.getBody().length : 0L);
                    }
                    this.partitionalMessageList.putMessage(msg);
                }
                this.msgCount.addAndGet(validMsgCnt);
                if (!msgs.isEmpty() && (property = (messageExt = msgs.get(msgs.size() - 1)).getProperty("MAX_OFFSET")) != null && (accTotal = Long.parseLong(property) - messageExt.getQueueOffset()) > 0L) {
                    this.msgAccCnt = accTotal;
                }
            }
            finally {
                this.lockTreeMap.writeLock().unlock();
            }
        }
        catch (InterruptedException e) {
            this.log.error("putMessage exception", e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public long getMaxSpan() {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (this.msgTreeMap.isEmpty()) return 0L;
                long l = this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
                return l;
            }
            finally {
                this.lockTreeMap.readLock().unlock();
            }
        }
        catch (InterruptedException e) {
            this.log.error("getMaxSpan exception", e);
        }
        return 0L;
    }

    public long removeMessage(List<MessageExt> msgs) {
        return this.removeMessage(msgs, -1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public long removeMessage(List<MessageExt> msgs, int shardingKeyIndex) {
        if (msgs == null) return -1L;
        if (msgs.isEmpty()) {
            return -1L;
        }
        long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (this.msgTreeMap.isEmpty()) return -1L;
                int removedCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt prev = this.msgTreeMap.remove(msg.getQueueOffset());
                    if (prev != null) {
                        --removedCnt;
                        this.msgSize.addAndGet(msg.getBody() != null ? (long)(-msg.getBody().length) : 0L);
                    }
                    this.partitionalMessageList.removeMessage(msg, shardingKeyIndex);
                    this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
                }
                this.msgCount.addAndGet(removedCnt);
                this.mergeProgress.addAndGet(removedCnt);
                this.consumeProgress.addAndGet(removedCnt);
                long l = this.msgTreeMap.isEmpty() ? this.queueOffsetMax + 1L : this.msgTreeMap.firstKey();
                return l;
            }
            finally {
                this.lockTreeMap.writeLock().unlock();
            }
        }
        catch (Throwable t) {
            this.log.error("removeMessage exception", t);
        }
        return -1L;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean hasMessage(MessageExt msg) {
        if (msg == null) {
            return false;
        }
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (this.msgTreeMap.isEmpty()) return false;
                boolean bl = this.msgTreeMap.containsKey(msg.getQueueOffset());
                return bl;
            }
            finally {
                this.lockTreeMap.readLock().unlock();
            }
        }
        catch (Throwable t) {
            this.log.error("check has message exception", t);
        }
        return false;
    }

    public TreeMap<Long, MessageExt> getMsgTreeMap() {
        return this.msgTreeMap;
    }

    public AtomicLong getMsgCount() {
        return this.msgCount;
    }

    public AtomicLong getMsgSize() {
        return this.msgSize;
    }

    public AtomicLong getMergeProgress() {
        return this.mergeProgress;
    }

    public AtomicLong getConsumeProgress() {
        return this.consumeProgress;
    }

    public boolean isDropped() {
        return this.dropped;
    }

    public void setDropped(boolean dropped) {
        this.dropped = dropped;
    }

    public boolean isLocked() {
        return this.locked;
    }

    public void setLocked(boolean locked) {
        this.locked = locked;
    }

    public boolean isNormalMsgClean() {
        return this.isNormalMsgClean;
    }

    public void setNormalMsgClean(boolean normalMsgClean) {
        this.isNormalMsgClean = normalMsgClean;
    }

    public boolean isReceivedHAMsg() {
        return this.isReceivedHAMsg;
    }

    public void setReceivedHAMsg(boolean receivedHAMsg) {
        this.isReceivedHAMsg = receivedHAMsg;
    }

    public void rollback(List<MessageExt> msgs) {
        this.makeMessageToConsumeAgain(msgs);
    }

    public void rollback(List<MessageExt> msgs, int shardingKeyIndex) {
        this.makeMessageToConsumeAgain(msgs, shardingKeyIndex);
    }

    public long commit(List<MessageExt> msgs) {
        return this.removeMessage(msgs);
    }

    public long commit(List<MessageExt> msgs, int shardingKeyIndex) {
        return this.removeMessage(msgs, shardingKeyIndex);
    }

    public void makeMessageToConsumeAgain(List<MessageExt> msgs) {
        this.makeMessageToConsumeAgain(msgs, -1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void makeMessageToConsumeAgain(List<MessageExt> msgs, int shardingKeyIndex) {
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                for (MessageExt msg : msgs) {
                    this.msgTreeMap.put(msg.getQueueOffset(), msg);
                    this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
                    this.partitionalMessageList.putMessage(msg, shardingKeyIndex);
                }
                if (this.consumeProgress.longValue() <= (long)msgs.size()) {
                    this.consumeProgress.set(0L);
                } else {
                    this.consumeProgress.addAndGet(-msgs.size());
                }
            }
            finally {
                this.lockTreeMap.writeLock().unlock();
            }
        }
        catch (InterruptedException e) {
            this.log.error("makeMessageToConsumeAgain exception", e);
        }
    }

    public List<MessageExt> peekMessagesToMerge() {
        List<MessageExt> msgList;
        this.lockTreeMap.readLock().lock();
        try {
            int mergeProgress = this.getMergeProgress().intValue();
            msgList = new ArrayList<MessageExt>(this.msgTreeMap.values());
            msgList = msgList.subList(mergeProgress, (int)this.safeMergeSize + mergeProgress);
        }
        finally {
            this.lockTreeMap.readLock().unlock();
        }
        return msgList;
    }

    public List<MessageExt> takeMessages(int batchSize) {
        return this.takeMessagesByShardingKeyIndex(0, batchSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<MessageExt> takeMessagesByShardingKeyIndex(int shardingKeyIndex, int batchSize) {
        List<MessageExt> result = new ArrayList<MessageExt>();
        long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    result = this.partitionalMessageList.pollMessages(shardingKeyIndex, batchSize);
                    for (MessageExt msg : result) {
                        this.consumingMsgOrderlyTreeMap.put(msg.getQueueOffset(), msg);
                    }
                    this.consumeProgress.addAndGet(result.size());
                }
            }
            finally {
                this.lockTreeMap.writeLock().unlock();
            }
        }
        catch (InterruptedException e) {
            this.log.error("take Messages exception", e);
        }
        return result;
    }

    public boolean hasTempMessage() {
        this.lockTreeMap.readLock().lockInterruptibly();
        try {
            boolean bl = !this.msgTreeMap.isEmpty();
            this.lockTreeMap.readLock().unlock();
            return bl;
        }
        catch (Throwable throwable) {
            try {
                this.lockTreeMap.readLock().unlock();
                throw throwable;
            }
            catch (InterruptedException e) {
                this.log.error("check temp messages exception", e);
                return true;
            }
        }
    }

    public long unmergedMessageSize() {
        this.lockTreeMap.readLock().lockInterruptibly();
        try {
            long l = this.msgCount.get() - this.mergeProgress.get();
            this.lockTreeMap.readLock().unlock();
            return l;
        }
        catch (Throwable throwable) {
            try {
                this.lockTreeMap.readLock().unlock();
                throw throwable;
            }
            catch (InterruptedException e) {
                this.log.error("check unmerged message exception", e);
                return 0L;
            }
        }
    }

    public void clear() {
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                this.msgTreeMap.clear();
                this.consumingMsgOrderlyTreeMap.clear();
                this.partitionalMessageList.clear();
                this.msgCount.set(0L);
                this.msgSize.set(0L);
                this.queueOffsetMax = 0L;
            }
            finally {
                this.lockTreeMap.writeLock().unlock();
            }
        }
        catch (InterruptedException e) {
            this.log.error("rollback exception", e);
        }
    }

    public long getNextOffset() {
        return this.nextOffset;
    }

    public void setNextOffset(long nextOffset) {
        this.nextOffset = nextOffset;
    }

    public void setSafeMergeSize(long safeMergeSize) {
        this.safeMergeSize = safeMergeSize;
    }

    public long getQueueOffsetMax() {
        return this.queueOffsetMax;
    }

    public long getLastLockTimestamp() {
        return this.lastLockTimestamp;
    }

    public void setLastLockTimestamp(long lastLockTimestamp) {
        this.lastLockTimestamp = lastLockTimestamp;
    }

    public ReadWriteLock getLockConsume() {
        return this.lockConsume;
    }

    public long getLastPullTimestamp() {
        return this.lastPullTimestamp;
    }

    public void setLastPullTimestamp(long lastPullTimestamp) {
        this.lastPullTimestamp = lastPullTimestamp;
    }

    public long getMsgAccCnt() {
        return this.msgAccCnt;
    }

    public void setMsgAccCnt(long msgAccCnt) {
        this.msgAccCnt = msgAccCnt;
    }

    public long getTryUnlockTimes() {
        return this.tryUnlockTimes.get();
    }

    public void incTryUnlockTimes() {
        this.tryUnlockTimes.incrementAndGet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void fillProcessQueueInfo(ProcessQueueInfo info) {
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            if (!this.msgTreeMap.isEmpty()) {
                info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
                info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
                info.setCachedMsgCount(this.msgTreeMap.size());
                info.setCachedMsgSizeInMiB((int)(this.msgSize.get() / 0x100000L));
                int cnt = 0;
                for (Map.Entry<Long, MessageExt> entry : this.msgTreeMap.entrySet()) {
                    if ((long)cnt++ != this.mergeProgress.get()) continue;
                    info.setMergeOffset(entry.getKey() + 1L);
                    break;
                }
            } else {
                info.setMergeOffset(info.getCommitOffset());
            }
            if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
                info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());
                info.setTransactionMsgMaxOffset(this.consumingMsgOrderlyTreeMap.lastKey());
                info.setTransactionMsgCount(this.consumingMsgOrderlyTreeMap.size());
            }
            info.setLocked(this.locked);
            info.setTryUnlockTimes(this.tryUnlockTimes.get());
            info.setLastLockTimestamp(this.lastLockTimestamp);
            info.setDropped(this.dropped);
            info.setLastPullTimestamp(this.lastPullTimestamp);
            info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
        }
        catch (Exception exception) {
        }
        finally {
            this.lockTreeMap.readLock().unlock();
        }
    }

    public long getLastConsumeTimestamp() {
        return this.lastConsumeTimestamp;
    }

    public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
        this.lastConsumeTimestamp = lastConsumeTimestamp;
    }

    public void setMessageQueue(MessageQueue messageQueue) {
        this.messageQueue = messageQueue;
    }

    public static class PartitionalMessageList {
        private final ArrayList<TreeMap<Long, MessageExt>> partitionalMsgMapList;

        public PartitionalMessageList(int shardNum) {
            this.partitionalMsgMapList = new ArrayList(shardNum);
            for (int i = 0; i < shardNum; ++i) {
                this.partitionalMsgMapList.add(new TreeMap());
            }
        }

        private String getShardingKey(MessageExt msg) {
            String shardingKey = msg.getProperty("__SHARDINGKEY");
            if (shardingKey == null) {
                shardingKey = "";
            }
            return shardingKey;
        }

        private int getShardIndex(MessageExt msg) {
            String shardingKey = this.getShardingKey(msg);
            return MessageUtils.getShardingKeyIndex(shardingKey, this.partitionalMsgMapList.size());
        }

        public void putMessage(MessageExt msg) {
            this.putMessage(msg, -1);
        }

        public void putMessage(MessageExt msg, int shardIndex) {
            TreeMap<Long, MessageExt> msgMap;
            if (shardIndex < 0 || shardIndex >= this.partitionalMsgMapList.size()) {
                shardIndex = this.getShardIndex(msg);
            }
            if ((msgMap = this.partitionalMsgMapList.get(shardIndex)) == null) {
                msgMap = new TreeMap();
                this.partitionalMsgMapList.set(shardIndex, msgMap);
            }
            msgMap.put(msg.getQueueOffset(), msg);
        }

        public void removeMessage(MessageExt msg) {
            this.removeMessage(msg, -1);
        }

        public void removeMessage(MessageExt msg, int shardIndex) {
            TreeMap<Long, MessageExt> msgMap;
            if (shardIndex < 0 || shardIndex >= this.partitionalMsgMapList.size()) {
                shardIndex = this.getShardIndex(msg);
            }
            if ((msgMap = this.partitionalMsgMapList.get(shardIndex)) == null) {
                return;
            }
            msgMap.remove(msg.getQueueOffset());
        }

        public List<MessageExt> pollMessages(int shardIndex, int batchSize) {
            ArrayList<MessageExt> msgs = new ArrayList<MessageExt>(batchSize);
            TreeMap<Long, MessageExt> msgMap = this.partitionalMsgMapList.get(shardIndex);
            if (msgMap != null) {
                Map.Entry<Long, MessageExt> entry;
                for (int i = 0; i < batchSize && (entry = msgMap.pollFirstEntry()) != null; ++i) {
                    msgs.add(entry.getValue());
                }
            }
            return msgs;
        }

        public void clear() {
            for (TreeMap<Long, MessageExt> m : this.partitionalMsgMapList) {
                if (m == null) continue;
                m.clear();
            }
        }
    }
}

