/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cxf.ws.rm.soap;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.io.AbstractCachedOutputStream;
import org.apache.cxf.io.CachedOutputStreamCallback;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.rm.Identifier;
import org.apache.cxf.ws.rm.RMContextUtils;
import org.apache.cxf.ws.rm.RMManager;
import org.apache.cxf.ws.rm.RMProperties;
import org.apache.cxf.ws.rm.RetransmissionCallback;
import org.apache.cxf.ws.rm.RetransmissionQueue;
import org.apache.cxf.ws.rm.SequenceType;
import org.apache.cxf.ws.rm.SourceSequence;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.policy.RMAssertion;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class RetransmissionQueueImpl
implements RetransmissionQueue {
    private static final Logger LOG = LogUtils.getL7dLogger(RetransmissionQueueImpl.class);
    private Map<String, List<ResendCandidate>> candidates = new HashMap<String, List<ResendCandidate>>();
    private Resender resender;
    private Runnable resendInitiator;
    private Timer timer;
    private RMManager manager;

    public RetransmissionQueueImpl(RMManager m) {
        this.manager = m;
    }

    public RMManager getManager() {
        return this.manager;
    }

    public void setManager(RMManager m) {
        this.manager = m;
    }

    public long getBaseRetransmissionInterval() {
        RMAssertion rma;
        RMAssertion rMAssertion = rma = null == this.manager ? null : this.manager.getRMAssertion();
        if (null != rma && null != rma.getBaseRetransmissionInterval() && null != rma.getBaseRetransmissionInterval().getMilliseconds()) {
            return rma.getBaseRetransmissionInterval().getMilliseconds().longValue();
        }
        return new BigInteger("3000").longValue();
    }

    @Override
    public void addUnacknowledged(Message message) {
        this.cacheUnacknowledged(message);
    }

    @Override
    public synchronized int countUnacknowledged(SourceSequence seq) {
        List<ResendCandidate> sequenceCandidates = this.getSequenceCandidates(seq);
        return sequenceCandidates == null ? 0 : sequenceCandidates.size();
    }

    @Override
    public boolean isEmpty() {
        return 0 == this.getUnacknowledged().size();
    }

    @Override
    public void populate(Collection<SourceSequence> sss) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void purgeAcknowledged(SourceSequence seq) {
        RMStore store;
        ArrayList<BigInteger> purged = new ArrayList<BigInteger>();
        RetransmissionQueueImpl retransmissionQueueImpl = this;
        synchronized (retransmissionQueueImpl) {
            LOG.fine("Start purging resend candidates.");
            List<ResendCandidate> sequenceCandidates = this.getSequenceCandidates(seq);
            if (null != sequenceCandidates) {
                for (int i = sequenceCandidates.size() - 1; i >= 0; --i) {
                    ResendCandidate candidate = sequenceCandidates.get(i);
                    RMProperties properties = RMContextUtils.retrieveRMProperties(candidate.getMessage(), true);
                    SequenceType st = properties.getSequence();
                    BigInteger m = st.getMessageNumber();
                    if (!seq.isAcknowledged(m)) continue;
                    sequenceCandidates.remove(i);
                    candidate.resolved();
                    purged.add(m);
                }
            }
            LOG.fine("Completed purging resend candidates.");
        }
        if (purged.size() > 0 && null != (store = this.manager.getStore())) {
            store.removeMessages(seq.getIdentifier(), purged, true);
        }
    }

    @Override
    public void start() {
        if (null != this.timer) {
            return;
        }
        LOG.fine("Starting retransmission queue");
        if (null == this.resender) {
            this.resender = this.getDefaultResender();
        }
        TimerTask task = new TimerTask(){

            public void run() {
                RetransmissionQueueImpl.this.getResendInitiator().run();
            }
        };
        this.timer = new Timer();
        this.timer.schedule(task, this.getBaseRetransmissionInterval() / 2L, this.getBaseRetransmissionInterval());
    }

    @Override
    public void stop() {
        if (null != this.timer) {
            LOG.fine("Stopping retransmission queue");
            this.timer.cancel();
            this.timer = null;
        }
    }

    protected int getExponentialBackoff() {
        return 2;
    }

    protected Runnable getResendInitiator() {
        if (this.resendInitiator == null) {
            this.resendInitiator = new ResendInitiator();
        }
        return this.resendInitiator;
    }

    protected ResendCandidate createResendCandidate(Message message) {
        return new ResendCandidate(message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ResendCandidate cacheUnacknowledged(Message message) {
        ResendCandidate candidate = null;
        RMProperties rmps = RMContextUtils.retrieveRMProperties(message, true);
        SequenceType st = rmps.getSequence();
        Identifier sid = st.getIdentifier();
        RetransmissionQueueImpl retransmissionQueueImpl = this;
        synchronized (retransmissionQueueImpl) {
            String key = sid.getValue();
            List<ResendCandidate> sequenceCandidates = this.getSequenceCandidates(key);
            if (null == sequenceCandidates) {
                sequenceCandidates = new ArrayList<ResendCandidate>();
                this.candidates.put(key, sequenceCandidates);
            }
            candidate = new ResendCandidate(message);
            sequenceCandidates.add(candidate);
        }
        LOG.fine("Cached unacknowledged message.");
        return candidate;
    }

    protected Map<String, List<ResendCandidate>> getUnacknowledged() {
        return this.candidates;
    }

    protected List<ResendCandidate> getSequenceCandidates(SourceSequence seq) {
        return this.getSequenceCandidates(seq.getIdentifier().getValue());
    }

    protected List<ResendCandidate> getSequenceCandidates(String key) {
        return this.candidates.get(key);
    }

    private void clientResend(Message message) {
        Conduit c = message.getExchange().getConduit();
        try {
            OutputStream os = (OutputStream)message.getContent(OutputStream.class);
            List callbacks = null;
            if (os instanceof AbstractCachedOutputStream) {
                callbacks = ((AbstractCachedOutputStream)os).getCallbacks();
            }
            c.send(message);
            os = (OutputStream)message.getContent(OutputStream.class);
            if (os instanceof AbstractCachedOutputStream && callbacks.size() > 1) {
                for (CachedOutputStreamCallback cb : callbacks) {
                    if (cb instanceof RetransmissionCallback) continue;
                    ((AbstractCachedOutputStream)os).registerCallback(cb);
                }
            }
            ByteArrayOutputStream savedOutputStream = (ByteArrayOutputStream)message.get((Object)"org.apache.cxf.ws.rm.outputstream");
            ByteArrayInputStream bis = new ByteArrayInputStream(savedOutputStream.toByteArray());
            AbstractCachedOutputStream.copyStream((InputStream)bis, (OutputStream)os, (int)1024);
            os.flush();
            os.close();
        }
        catch (IOException ex) {
            LOG.log(Level.SEVERE, "RESEND_FAILED_MSG", ex);
        }
    }

    private void serverResend(Message message) {
    }

    protected final Resender getDefaultResender() {
        return new Resender(){

            public void resend(Message message, boolean requestAcknowledge) {
                RMProperties properties = RMContextUtils.retrieveRMProperties(message, true);
                SequenceType st = properties.getSequence();
                if (st != null) {
                    LOG.log(Level.INFO, "RESEND_MSG", st.getMessageNumber());
                }
                try {
                    if (RMContextUtils.isRequestor(message)) {
                        RetransmissionQueueImpl.this.clientResend(message);
                    } else {
                        RetransmissionQueueImpl.this.serverResend(message);
                    }
                }
                catch (Exception e) {
                    LOG.log(Level.WARNING, "RESEND_FAILED_MSG", e);
                }
            }
        };
    }

    protected void replaceResender(Resender replacement) {
        this.resender = replacement;
    }

    public static interface Resender {
        public void resend(Message var1, boolean var2);
    }

    protected class ResendCandidate
    implements Runnable {
        private Message message;
        private int skips;
        private int skipped;
        private boolean pending;
        private boolean includeAckRequested;

        protected ResendCandidate(Message m) {
            this.message = m;
            this.skipped = -1;
            this.skips = 1;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            try {
                if (this.isPending()) {
                    RetransmissionQueueImpl.this.resender.resend(this.message, this.includeAckRequested);
                    this.includeAckRequested = false;
                }
            }
            finally {
                this.attempted();
            }
        }

        protected synchronized boolean isDue() {
            boolean due = false;
            if (!this.pending && ++this.skipped == this.skips) {
                this.skips *= RetransmissionQueueImpl.this.getExponentialBackoff();
                this.skipped = 0;
                due = true;
            }
            return due;
        }

        protected synchronized boolean isPending() {
            return this.pending;
        }

        protected synchronized void initiate(boolean requestAcknowledge) {
            this.includeAckRequested = requestAcknowledge;
            this.pending = true;
            Endpoint ep = (Endpoint)this.message.getExchange().get(Endpoint.class);
            Executor executor = ep.getExecutor();
            if (null == executor) {
                executor = ep.getService().getExecutor();
            }
            try {
                executor.execute(this);
            }
            catch (RejectedExecutionException ex) {
                LOG.log(Level.SEVERE, "RESEND_INITIATION_FAILED_MSG", ex);
            }
        }

        protected synchronized void resolved() {
            this.pending = false;
            this.skips = Integer.MAX_VALUE;
        }

        protected Message getMessage() {
            return this.message;
        }

        private synchronized void attempted() {
            this.pending = false;
        }
    }

    protected class ResendInitiator
    implements Runnable {
        protected ResendInitiator() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            RetransmissionQueueImpl retransmissionQueueImpl = RetransmissionQueueImpl.this;
            synchronized (retransmissionQueueImpl) {
                LOG.fine("Starting ResendInitiator on thread " + Thread.currentThread());
                Iterator sequences = RetransmissionQueueImpl.this.candidates.entrySet().iterator();
                while (sequences.hasNext()) {
                    Iterator sequenceCandidates = ((List)sequences.next().getValue()).iterator();
                    boolean requestAck = true;
                    try {
                        while (sequenceCandidates.hasNext()) {
                            ResendCandidate candidate = (ResendCandidate)sequenceCandidates.next();
                            if (!candidate.isDue()) continue;
                            candidate.initiate(requestAck);
                            requestAck = false;
                        }
                    }
                    catch (ConcurrentModificationException ex) {
                        LOG.log(Level.WARNING, "RESEND_CANDIDATES_CONCURRENT_MODIFICATION_MSG");
                    }
                }
                LOG.fine("Completed ResendInitiator");
            }
        }
    }
}

