/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.remoting.transport.jgroups;

import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.infinispan.CacheException;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.config.ConfigurationException;
import org.infinispan.config.parsing.XmlConfigHelper;
import org.infinispan.marshall.StreamingMarshaller;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.remoting.InboundInvocationHandler;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.ResponseFilter;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.transport.AbstractTransport;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.DistributedSync;
import org.infinispan.remoting.transport.jgroups.CommandAwareRpcDispatcher;
import org.infinispan.remoting.transport.jgroups.JGroupsAddress;
import org.infinispan.remoting.transport.jgroups.JGroupsChannelLookup;
import org.infinispan.remoting.transport.jgroups.JGroupsDistSync;
import org.infinispan.remoting.transport.jgroups.JGroupsResponseFilterAdapter;
import org.infinispan.remoting.transport.jgroups.MarshallerAdapter;
import org.infinispan.remoting.transport.jgroups.StateTransferMonitor;
import org.infinispan.statetransfer.StateTransferException;
import org.infinispan.util.FileLookup;
import org.infinispan.util.TypedProperties;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
import org.jgroups.Event;
import org.jgroups.ExtendedMembershipListener;
import org.jgroups.ExtendedMessageListener;
import org.jgroups.JChannel;
import org.jgroups.MergeView;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.RspFilter;
import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;

public class JGroupsTransport
extends AbstractTransport
implements ExtendedMembershipListener,
ExtendedMessageListener {
    public static final String CONFIGURATION_STRING = "configurationString";
    public static final String CONFIGURATION_XML = "configurationXml";
    public static final String CONFIGURATION_FILE = "configurationFile";
    public static final String CHANNEL_LOOKUP = "channelLookup";
    protected static final String DEFAULT_JGROUPS_CONFIGURATION_FILE = "jgroups-udp.xml";
    protected boolean startChannel = true;
    protected boolean stopChannel = true;
    protected Channel channel;
    protected boolean createdChannel = false;
    protected Address address;
    protected Address physicalAddress;
    protected volatile List<Address> members = Collections.emptyList();
    protected volatile boolean coordinator = false;
    protected final Object membersListLock = new Object();
    CommandAwareRpcDispatcher dispatcher;
    static final Log log = LogFactory.getLog(JGroupsTransport.class);
    static final boolean trace = log.isTraceEnabled();
    protected TypedProperties props;
    protected InboundInvocationHandler inboundInvocationHandler;
    protected StreamingMarshaller marshaller;
    protected ExecutorService asyncExecutor;
    protected CacheManagerNotifier notifier;
    final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new ConcurrentHashMap<String, StateTransferMonitor>();
    private final JGroupsDistSync flushTracker = new JGroupsDistSync();
    long distributedSyncTimeout;
    private boolean usingFlush = true;

    public JGroupsTransport(Channel channel) {
        this.channel = channel;
        if (channel == null) {
            throw new IllegalArgumentException("Cannot deal with a null channel!");
        }
        if (channel.isConnected()) {
            throw new IllegalArgumentException("Channel passed in cannot already be connected!");
        }
    }

    public JGroupsTransport() {
    }

    @Override
    public Log getLog() {
        return log;
    }

    @Override
    public void initialize(StreamingMarshaller marshaller, ExecutorService asyncExecutor, InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier) {
        this.marshaller = marshaller;
        this.asyncExecutor = asyncExecutor;
        this.inboundInvocationHandler = inboundInvocationHandler;
        this.notifier = notifier;
    }

    @Override
    public void start() {
        this.props = TypedProperties.toTypedProperties(this.configuration.getTransportProperties());
        this.distributedSyncTimeout = this.configuration.getDistributedSyncTimeout();
        if (log.isInfoEnabled()) {
            log.info("Starting JGroups Channel");
        }
        this.initChannelAndRPCDispatcher();
        this.startJGroupsChannelIfNeeded();
        boolean skipFlushCheck = Boolean.getBoolean("infinispan.experimental.transport.skip_flush_check");
        if (this.channel.getProtocolStack() != null && this.channel.getProtocolStack().findProtocol((Class<?>)FLUSH.class) == null) {
            if (skipFlushCheck) {
                this.usingFlush = false;
            } else {
                throw new ConfigurationException("Flush should be enabled. This is related to https://jira.jboss.org/jira/browse/ISPN-83");
            }
        }
    }

    protected void startJGroupsChannelIfNeeded() {
        if (this.startChannel) {
            try {
                this.channel.connect(this.configuration.getClusterName());
            }
            catch (ChannelException e) {
                throw new CacheException("Unable to start JGroups Channel", e);
            }
        }
        this.address = new JGroupsAddress(this.channel.getAddress());
        if (log.isInfoEnabled()) {
            log.info((Object)"Cache local address is %s, physical addresses are %s", this.getAddress(), this.getPhysicalAddresses());
        }
    }

    @Override
    public int getViewId() {
        return (int)this.channel.getView().getVid().getId();
    }

    @Override
    public void stop() {
        try {
            if (this.stopChannel && this.channel != null && this.channel.isOpen()) {
                log.info("Disconnecting and closing JGroups Channel");
                this.channel.disconnect();
                this.channel.close();
            }
        }
        catch (Exception toLog) {
            log.error((Object)"Problem closing channel; setting it to null", toLog);
        }
        this.channel = null;
        if (this.dispatcher != null) {
            log.info("Stopping the RpcDispatcher");
            this.dispatcher.stop();
        }
        this.members = Collections.emptyList();
        this.coordinator = false;
        this.dispatcher = null;
    }

    protected void initChannel() {
        if (this.channel == null) {
            this.createdChannel = true;
            this.buildChannel();
            String transportNodeName = this.configuration.getTransportNodeName();
            if (transportNodeName != null && transportNodeName.length() > 0) {
                long range = 65534L;
                long randomInRange = (long)(Math.random() * (double)range % (double)range) + 1L;
                transportNodeName = transportNodeName + "-" + randomInRange;
                this.channel.setName(transportNodeName);
            }
        }
        this.channel.setOpt(3, false);
    }

    private void initChannelAndRPCDispatcher() throws CacheException {
        this.initChannel();
        this.dispatcher = new CommandAwareRpcDispatcher(this.channel, this, this.asyncExecutor, this.inboundInvocationHandler, this.flushTracker, this.distributedSyncTimeout);
        MarshallerAdapter adapter = new MarshallerAdapter(this.marshaller);
        this.dispatcher.setRequestMarshaller(adapter);
        this.dispatcher.setResponseMarshaller(adapter);
    }

    private void buildChannel() {
        if (this.props != null) {
            String cfg;
            if (this.props.containsKey(CHANNEL_LOOKUP)) {
                String channelLookupClassName = this.props.getProperty(CHANNEL_LOOKUP);
                try {
                    JGroupsChannelLookup lookup = (JGroupsChannelLookup)Util.getInstance(channelLookupClassName);
                    this.channel = lookup.getJGroupsChannel(this.props);
                    this.startChannel = lookup.shouldStartAndConnect();
                    this.stopChannel = lookup.shouldStopAndDisconnect();
                }
                catch (ClassCastException e) {
                    log.error("Class [" + channelLookupClassName + "] cannot be cast to JGroupsChannelLookup!  Not using a channel lookup.");
                }
                catch (Exception e) {
                    log.error("Errors instantiating [" + channelLookupClassName + "]!  Not using a channel lookup.");
                }
            }
            if (this.channel == null && this.props.containsKey(CONFIGURATION_FILE)) {
                cfg = this.props.getProperty(CONFIGURATION_FILE);
                try {
                    this.channel = new JChannel(new FileLookup().lookupFileLocation(cfg));
                }
                catch (Exception e) {
                    log.error("Error while trying to create a channel using config files: " + cfg);
                    throw new CacheException(e);
                }
            }
            if (this.channel == null && this.props.containsKey(CONFIGURATION_XML)) {
                cfg = this.props.getProperty(CONFIGURATION_XML);
                try {
                    this.channel = new JChannel(XmlConfigHelper.stringToElement(cfg));
                }
                catch (Exception e) {
                    log.error("Error while trying to create a channel using config XML: " + cfg);
                    throw new CacheException(e);
                }
            }
            if (this.channel == null && this.props.containsKey(CONFIGURATION_STRING)) {
                cfg = this.props.getProperty(CONFIGURATION_STRING);
                try {
                    this.channel = new JChannel(cfg);
                }
                catch (Exception e) {
                    log.error("Error while trying to create a channel using config string: " + cfg);
                    throw new CacheException(e);
                }
            }
        }
        if (this.channel == null) {
            log.info((Object)"Unable to use any JGroups configuration mechanisms provided in properties %s.  Using default JGroups configuration!", this.props);
            try {
                this.channel = new JChannel(new FileLookup().lookupFileLocation(DEFAULT_JGROUPS_CONFIGURATION_FILE));
            }
            catch (ChannelException e) {
                throw new CacheException("Unable to start JGroups channel", e);
            }
        }
    }

    @Override
    public boolean isCoordinator() {
        return this.coordinator;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Address getCoordinator() {
        if (this.channel == null) {
            return null;
        }
        Object object = this.membersListLock;
        synchronized (object) {
            while (this.members.isEmpty()) {
                log.debug("Waiting on view being accepted");
                try {
                    this.membersListLock.wait();
                }
                catch (InterruptedException e) {
                    log.error((Object)"getCoordinator(): Interrupted while waiting for members to be set", e);
                    break;
                }
            }
            return this.members.isEmpty() ? null : this.members.get(0);
        }
    }

    @Override
    public List<Address> getMembers() {
        return this.members != null ? this.members : Collections.emptyList();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean retrieveState(String cacheName, Address address, long timeout) throws StateTransferException {
        boolean cleanup = false;
        try {
            StateTransferMonitor mon = new StateTransferMonitor();
            if (this.stateTransfersInProgress.putIfAbsent(cacheName, mon) != null) {
                throw new StateTransferException("There already appears to be a state transfer in progress for the cache named " + cacheName);
            }
            cleanup = true;
            ((JChannel)this.channel).getState(this.toJGroupsAddress(address), cacheName, timeout, false);
            mon.waitForState();
            boolean bl = mon.getSetStateException() == null;
            return bl;
        }
        catch (StateTransferException ste) {
            throw ste;
        }
        catch (Exception e) {
            if (log.isInfoEnabled()) {
                log.info((Object)("Unable to retrieve state from member " + address), e);
            }
            boolean bl = false;
            return bl;
        }
        finally {
            if (cleanup) {
                this.stateTransfersInProgress.remove(cacheName);
            }
        }
    }

    @Override
    public DistributedSync getDistributedSync() {
        return this.flushTracker;
    }

    @Override
    public boolean isSupportStateTransfer() {
        ProtocolStack stack;
        if (this.channel != null && (stack = this.channel.getProtocolStack()) != null) {
            if (stack.findProtocol((Class<?>)STREAMING_STATE_TRANSFER.class) == null) {
                log.error("Channel does not contain STREAMING_STATE_TRANSFER.  Cannot support state transfers!");
                return false;
            }
        } else {
            log.warn("Channel not set up properly!");
            return false;
        }
        return true;
    }

    @Override
    public Address getAddress() {
        if (this.address == null && this.channel != null) {
            this.address = new JGroupsAddress(this.channel.getAddress());
        }
        return this.address;
    }

    @Override
    public List<Address> getPhysicalAddresses() {
        if (this.physicalAddress == null && this.channel != null) {
            org.jgroups.Address addr = (org.jgroups.Address)this.channel.downcall(new Event(87, this.channel.getAddress()));
            this.physicalAddress = new JGroupsAddress(addr);
        }
        return Collections.singletonList(this.physicalAddress);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean supportReplay) throws Exception {
        boolean asyncMarshalling;
        if (recipients != null && recipients.isEmpty()) {
            log.trace("Destination list is empty: no need to send message");
            return Collections.emptyMap();
        }
        if (trace) {
            log.trace((Object)"dests=%s, command=%s, mode=%s, timeout=%s", new Object[]{recipients, rpcCommand, mode, timeout});
        }
        this.flushTracker.acquireProcessingLock(false, this.distributedSyncTimeout, TimeUnit.MILLISECONDS);
        boolean unlock = true;
        if (this.usingFlush) {
            this.flushTracker.blockUntilReleased(this.distributedSyncTimeout, TimeUnit.MILLISECONDS);
        }
        boolean bl = asyncMarshalling = mode == ResponseMode.ASYNCHRONOUS;
        if (!usePriorityQueue && ResponseMode.SYNCHRONOUS == mode) {
            usePriorityQueue = true;
        }
        try {
            RspList rsps = this.dispatcher.invokeRemoteCommands(this.toJGroupsAddressVector(recipients), rpcCommand, this.toJGroupsMode(mode), timeout, recipients != null, usePriorityQueue, this.toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling, recipients == null || recipients.size() == this.members.size());
            if (mode.isAsynchronous()) {
                Map<Address, Response> map = Collections.emptyMap();
                return map;
            }
            if (rsps == null) {
                Map<Address, Response> map = Collections.emptyMap();
                return map;
            }
            HashMap<Address, Response> retval = new HashMap<Address, Response>(rsps.size());
            boolean noValidResponses = true;
            for (Rsp rsp : rsps.values()) {
                noValidResponses = this.parseResponseAndAddToResponseList(rsp.getValue(), retval, rsp.wasSuspected(), rsp.wasReceived(), new JGroupsAddress(rsp.getSender()), responseFilter != null) && noValidResponses;
            }
            if (noValidResponses) {
                throw new TimeoutException("Timed out waiting for valid responses!");
            }
            HashMap<Address, Response> hashMap = retval;
            return hashMap;
        }
        finally {
            if (unlock) {
                this.flushTracker.releaseProcessingLock(false);
            }
        }
    }

    private int toJGroupsMode(ResponseMode mode) {
        switch (mode) {
            case ASYNCHRONOUS: 
            case ASYNCHRONOUS_WITH_SYNC_MARSHALLING: {
                return 6;
            }
            case SYNCHRONOUS: {
                return 2;
            }
            case WAIT_FOR_VALID_RESPONSE: {
                return 3;
            }
        }
        throw new CacheException("Unknown response mode " + (Object)((Object)mode));
    }

    private RspFilter toJGroupsFilter(ResponseFilter responseFilter) {
        return responseFilter == null ? null : new JGroupsResponseFilterAdapter(responseFilter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void viewAccepted(View newView) {
        Vector<org.jgroups.Address> newMembers = newView.getMembers();
        List<Address> oldMembers = null;
        boolean hasNotifier = this.notifier != null;
        Notify n = null;
        if (hasNotifier) {
            if (newView instanceof MergeView) {
                if (log.isInfoEnabled()) {
                    log.info((Object)"Received new, MERGED cluster view: %s", newView);
                }
                n = new NotifyMerge();
            } else {
                if (log.isInfoEnabled()) {
                    log.info((Object)"Received new cluster view: %s", newView);
                }
                n = new NotifyViewChange();
            }
        }
        Object object = this.membersListLock;
        synchronized (object) {
            boolean needNotification = false;
            if (newMembers != null) {
                oldMembers = this.members;
                this.members = this.fromJGroupsAddressList(newMembers);
                needNotification = true;
            }
            boolean bl = this.coordinator = this.members != null && !this.members.isEmpty() && this.members.get(0).equals(this.getAddress());
            if (needNotification && hasNotifier) {
                n.emitNotification(oldMembers, newView);
            }
            this.membersListLock.notifyAll();
        }
    }

    private boolean needsToRejoin(View v) {
        if (v instanceof MergeView) {
            MergeView mv = (MergeView)v;
            org.jgroups.Address coord = v.getMembers().get(0);
            View winningPartition = null;
            for (View p : mv.getSubgroups()) {
                if (!p.getMembers().get(0).equals(coord)) continue;
                winningPartition = p;
                break;
            }
            return winningPartition == null || !winningPartition.containsMember(this.channel.getAddress());
        }
        return false;
    }

    @Override
    public void suspect(org.jgroups.Address suspected_mbr) {
    }

    @Override
    public void block() {
        if (this.usingFlush) {
            this.flushTracker.signalJoinInProgress();
        }
    }

    @Override
    public void unblock() {
        if (this.usingFlush) {
            this.flushTracker.signalJoinCompleted();
        }
    }

    @Override
    public void receive(Message msg) {
    }

    @Override
    public byte[] getState() {
        throw new UnsupportedOperationException("Retrieving state for the entire cache system is not supported!");
    }

    @Override
    public void setState(byte[] state) {
        throw new UnsupportedOperationException("Setting state for the entire cache system is not supported!");
    }

    @Override
    public byte[] getState(String state_id) {
        throw new UnsupportedOperationException("Non-stream-based state retrieval is not supported!  Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
    }

    @Override
    public void setState(String state_id, byte[] state) {
        throw new UnsupportedOperationException("Non-stream-based state retrieval is not supported!  Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
    }

    @Override
    public void getState(OutputStream ostream) {
        throw new UnsupportedOperationException("Retrieving state for the entire cache system is not supported!");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void getState(String cacheName, OutputStream ostream) {
        if (trace) {
            log.trace((Object)"Received request to generate state for cache named '%s'.  Attempting to generate state.", cacheName);
        }
        try {
            this.inboundInvocationHandler.generateState(cacheName, ostream);
        }
        catch (StateTransferException e) {
            log.error((Object)"Caught while responding to state transfer request", e);
        }
        finally {
            Util.flushAndCloseStream(ostream);
        }
    }

    @Override
    public void setState(InputStream istream) {
        throw new UnsupportedOperationException("Setting state for the entire cache system is not supported!");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setState(String cacheName, InputStream istream) {
        StateTransferMonitor mon = null;
        try {
            if (trace) {
                log.trace((Object)"Received state for cache named '%s'.  Attempting to apply state.", cacheName);
            }
            mon = (StateTransferMonitor)this.stateTransfersInProgress.get(cacheName);
            this.inboundInvocationHandler.applyState(cacheName, istream);
            mon.notifyStateReceiptSucceeded();
        }
        catch (Exception e) {
            mon.notifyStateReceiptFailed(e instanceof StateTransferException ? (StateTransferException)e : new StateTransferException(e));
            log.error((Object)"Caught while requesting or applying state", e);
        }
        finally {
            Util.close(istream);
        }
    }

    private Vector<org.jgroups.Address> toJGroupsAddressVector(Collection<Address> list) {
        if (list == null) {
            return null;
        }
        if (list.isEmpty()) {
            return new Vector<org.jgroups.Address>();
        }
        Vector<org.jgroups.Address> retval = new Vector<org.jgroups.Address>(list.size());
        for (Address a : list) {
            retval.add(this.toJGroupsAddress(a));
        }
        return retval;
    }

    private org.jgroups.Address toJGroupsAddress(Address a) {
        return ((JGroupsAddress)a).address;
    }

    private List<Address> fromJGroupsAddressList(List<org.jgroups.Address> list) {
        if (list == null || list.isEmpty()) {
            return Collections.emptyList();
        }
        int sz = list.size();
        ArrayList<Address> retval = new ArrayList<Address>(sz);
        if (sz == 1) {
            retval.add(new JGroupsAddress(list.get(0)));
        } else {
            for (org.jgroups.Address a : list) {
                retval.add(new JGroupsAddress(a));
            }
        }
        return retval;
    }

    public CommandAwareRpcDispatcher getCommandAwareRpcDispatcher() {
        return this.dispatcher;
    }

    public Channel getChannel() {
        return this.channel;
    }

    private class NotifyMerge
    implements Notify {
        private NotifyMerge() {
        }

        @Override
        public void emitNotification(List<Address> oldMembers, View newView) {
            MergeView mv = (MergeView)newView;
            Address address = JGroupsTransport.this.getAddress();
            int viewId = (int)newView.getVid().getId();
            boolean needsRejoin = JGroupsTransport.this.needsToRejoin(newView);
            JGroupsTransport.this.notifier.notifyMerge(JGroupsTransport.this.members, oldMembers, address, viewId, needsRejoin, this.getSubgroups(mv.getSubgroups()));
            JGroupsTransport.this.notifier.notifyViewChange(JGroupsTransport.this.members, oldMembers, address, viewId, needsRejoin, true);
        }

        private List<List<Address>> getSubgroups(Vector<View> subviews) {
            ArrayList<List<Address>> l = new ArrayList<List<Address>>(subviews.size());
            for (View v : subviews) {
                l.add(JGroupsTransport.this.fromJGroupsAddressList(v.getMembers()));
            }
            return l;
        }
    }

    private class NotifyViewChange
    implements Notify {
        private NotifyViewChange() {
        }

        @Override
        public void emitNotification(List<Address> oldMembers, View newView) {
            JGroupsTransport.this.notifier.notifyViewChange(JGroupsTransport.this.members, oldMembers, JGroupsTransport.this.getAddress(), (int)newView.getVid().getId(), JGroupsTransport.this.needsToRejoin(newView), false);
        }
    }

    private static interface Notify {
        public void emitNotification(List<Address> var1, View var2);
    }
}

