package org.jgroups.mux;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelException;
import org.jgroups.ChannelListenerAdapter;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.Event;
import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.MergeView;
import org.jgroups.Message;
import org.jgroups.TimeoutException;
import org.jgroups.UpHandler;
import org.jgroups.View;
import org.jgroups.ViewId;
import org.jgroups.annotations.Experimental;
import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.stack.GossipRouter;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.util.AckCollector;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.FIFOMessageQueue;
import org.jgroups.util.ShutdownRejectedExecutionHandler;
import org.jgroups.util.Util;

@Experimental(comment = "because of impedance mismatches between a MuxChannel and JChannel, this might get deprecated in the future. The replacement would be a shared transport (see the documentation for details)")
@Deprecated
/* loaded from: input_file:org/jgroups/mux/Multiplexer.class */
public class Multiplexer implements UpHandler {
    private static final Log log = LogFactory.getLog(Multiplexer.class);
    private static final String SEPARATOR = "::";
    private static final short SEPARATOR_LEN = (short) SEPARATOR.length();
    private static final String NAME = "MUX";
    private final JChannel channel;
    private final ExecutorService thread_pool;
    private final ConcurrentMap<String, MuxChannel> services = new ConcurrentHashMap();
    private final FIFOMessageQueue<String, Runnable> fifo_queue = new FIFOMessageQueue<>();
    private final AckCollector service_ack_collector = new AckCollector();
    protected long service_ack_timeout = 2000;
    private volatile View view = null;
    private final Map<String, Boolean> state_transfer_listeners = new HashMap();
    private final Map<String, List<Address>> service_state = new HashMap();
    private final Map<Address, Set<String>> service_responses = new HashMap();
    private final List<Address> services_merged_collector = new ArrayList();
    private AtomicBoolean services_merged = new AtomicBoolean(false);
    private long service_response_timeout = Global.THREADPOOL_SHUTDOWN_WAIT_TIME;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jgroups/mux/Multiplexer$ExecuteTask.class */
    public static class ExecuteTask implements Runnable {
        FIFOMessageQueue<String, Runnable> queue;

        public ExecuteTask(FIFOMessageQueue<String, Runnable> fIFOMessageQueue) {
            this.queue = fIFOMessageQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.queue.take().run();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /* loaded from: input_file:org/jgroups/mux/Multiplexer$MultiplexerChannelListener.class */
    private class MultiplexerChannelListener extends ChannelListenerAdapter {
        private MultiplexerChannelListener() {
        }

        @Override // org.jgroups.ChannelListenerAdapter, org.jgroups.ChannelListener
        public void channelReconnected(Address address) {
            if (Multiplexer.log.isDebugEnabled()) {
                Multiplexer.log.debug("Reconnecting services " + Multiplexer.this.services.keySet());
            }
            for (MuxChannel muxChannel : Multiplexer.this.services.values()) {
                try {
                    if (Multiplexer.log.isDebugEnabled()) {
                        Multiplexer.log.debug("Reconnecting service " + muxChannel.getId());
                    }
                    muxChannel.open();
                    boolean booleanValue = ((Boolean) muxChannel.getOpt(5)).booleanValue();
                    boolean booleanValue2 = ((Boolean) muxChannel.getOpt(6)).booleanValue();
                    if (booleanValue && booleanValue2) {
                        muxChannel.connect(muxChannel.getClusterName(), null, null, 10000L);
                        muxChannel.fireChannelReconnected(muxChannel.getLocalAddress());
                    } else {
                        if (booleanValue) {
                            muxChannel.connect(muxChannel.getClusterName());
                            muxChannel.fireChannelReconnected(muxChannel.getLocalAddress());
                        }
                        if (booleanValue2) {
                            muxChannel.getState(null, 5000L);
                        }
                    }
                } catch (ChannelException e) {
                    if (Multiplexer.log.isErrorEnabled()) {
                        Multiplexer.log.error("MuxChannel reconnect failed " + e);
                    }
                }
            }
        }

        @Override // org.jgroups.ChannelListenerAdapter, org.jgroups.ChannelListener
        public void channelShunned() {
            Iterator it = Multiplexer.this.services.values().iterator();
            while (it.hasNext()) {
                ((MuxChannel) it.next()).fireChannelShunned();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jgroups/mux/Multiplexer$Task.class */
    public static class Task implements Runnable {
        Exchanger<Object> exchanger;
        MuxChannel channel;
        Event evt;
        FIFOMessageQueue<String, Runnable> queue;
        Address sender;
        String dest;

        Task(MuxChannel muxChannel, Event event, FIFOMessageQueue<String, Runnable> fIFOMessageQueue, Address address, String str, boolean z) {
            this.channel = muxChannel;
            this.evt = event;
            this.queue = fIFOMessageQueue;
            this.sender = address;
            this.dest = str;
            if (z) {
                this.exchanger = new Exchanger<>();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    Object up = this.channel.up(this.evt);
                    if (this.exchanger != null) {
                        this.exchanger.exchange(up);
                    }
                    this.queue.done(this.sender, this.dest);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.queue.done(this.sender, this.dest);
                }
            } catch (Throwable th) {
                this.queue.done(this.sender, this.dest);
                throw th;
            }
        }
    }

    public Multiplexer(JChannel jChannel) {
        if (jChannel == null || !jChannel.isOpen()) {
            throw new IllegalArgumentException("Channel " + jChannel + " cannot be used for Multiplexer");
        }
        this.channel = jChannel;
        this.channel.addChannelListener(new MultiplexerChannelListener());
        this.channel.setUpHandler(this);
        this.channel.setOpt(0, Boolean.TRUE);
        if (Global.getPropertyAsBoolean(Global.MUX_ENABLED, true)) {
            this.thread_pool = createThreadPool();
        } else {
            this.thread_pool = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JChannel getChannel() {
        return this.channel;
    }

    public Set getApplicationIds() {
        return getServiceIds();
    }

    public Set<String> getServiceIds() {
        return Collections.unmodifiableSet(this.services.keySet());
    }

    public long getServicesResponseTimeout() {
        return this.service_response_timeout;
    }

    public void setServicesResponseTimeout(long j) {
        this.service_response_timeout = j;
    }

    public long getServiceAckTimeout() {
        return this.service_ack_timeout;
    }

    public void setServiceAckTimeout(long j) {
        this.service_ack_timeout = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public View getServiceView(String str) {
        List<Address> list = this.service_state.get(str);
        if (list == null) {
            return null;
        }
        return generateServiceView(list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean stateTransferListenersPresent() {
        return !this.state_transfer_listeners.isEmpty();
    }

    public synchronized void registerForStateTransfer(String str, String str2) {
        String str3 = str;
        if (str2 != null && str2.length() > 0) {
            str3 = str3 + SEPARATOR + str2;
        }
        this.state_transfer_listeners.put(str3, Boolean.FALSE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean getState(Address address, String str, long j) throws ChannelNotConnectedException, ChannelClosedException {
        if (this.state_transfer_listeners.isEmpty()) {
            return false;
        }
        Iterator<Map.Entry<String, Boolean>> it = this.state_transfer_listeners.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<String, Boolean> next = it.next();
            String key = next.getKey();
            int indexOf = key.indexOf(SEPARATOR);
            if (indexOf > -1 ? str.equals(key.substring(0, indexOf)) : str.equals(key)) {
                next.setValue(Boolean.TRUE);
                break;
            }
        }
        if (!Util.all(this.state_transfer_listeners.values(), Boolean.TRUE)) {
            return true;
        }
        boolean fetchServiceStates = fetchServiceStates(address, new HashSet(this.state_transfer_listeners.keySet()), j);
        this.state_transfer_listeners.clear();
        return fetchServiceStates;
    }

    protected ThreadPoolExecutor createThreadPool() {
        this.channel.getInfo();
        return new ThreadPoolExecutor(Global.getPropertyAsInteger(Global.MUX_MIN_THREADS, 1), Global.getPropertyAsInteger(Global.MUX_MAX_THREADS, 4), Global.getPropertyAsLong(Global.MUX_KEEPALIVE, GossipRouter.EXPIRY_TIME), TimeUnit.MILLISECONDS, new SynchronousQueue(), new DefaultThreadFactory(Util.getGlobalThreadGroup(), "Multiplexer", false, true), new ShutdownRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()));
    }

    protected void shutdownThreadPool() {
        if (this.thread_pool == null || this.thread_pool.isShutdown()) {
            return;
        }
        this.thread_pool.shutdownNow();
        try {
            this.thread_pool.awaitTermination(Global.THREADPOOL_SHUTDOWN_WAIT_TIME, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private boolean fetchServiceStates(Address address, Set<String> set, long j) throws ChannelClosedException, ChannelNotConnectedException {
        boolean z = false;
        boolean startFlush = this.channel.startFlush(false);
        try {
            if (startFlush) {
                try {
                    for (String str : set) {
                        if (!this.channel.getState(address, str, j, false)) {
                            throw new Exception("Failed transfer for state id " + str + ", state provider was " + address);
                        }
                    }
                    z = true;
                    this.channel.stopFlush();
                } catch (Exception e) {
                    log.warn("Failed multiple state transfer under one flush phase ", e);
                    this.channel.stopFlush();
                }
            }
            return startFlush && z;
        } catch (Throwable th) {
            this.channel.stopFlush();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendServiceUpMessage(String str) throws Exception {
        sendServiceMessage(true, (byte) 3, str, null, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendServiceDownMessage(String str) throws Exception {
        sendServiceMessage(true, (byte) 4, str, null, false);
    }

    @Override // org.jgroups.UpHandler
    public Object up(Event event) {
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                MuxHeader muxHeader = (MuxHeader) message.getHeader(NAME);
                if (muxHeader == null) {
                    log.error("MuxHeader not present - discarding message " + message);
                    return null;
                }
                Address src = message.getSrc();
                if (!(muxHeader.info != null)) {
                    MuxChannel muxChannel = this.services.get(muxHeader.id);
                    if (muxChannel == null) {
                        return null;
                    }
                    return passToMuxChannel(muxChannel, event, this.fifo_queue, src, muxHeader.id, false, message.isFlagSet((byte) 1));
                }
                try {
                    handleServiceMessage(muxHeader.info, src);
                    return null;
                } catch (Exception e) {
                    if (!log.isErrorEnabled()) {
                        return null;
                    }
                    log.error("failure in handling service message " + muxHeader.info + " from sender " + src, e);
                    return null;
                }
            case 6:
                Vector<Address> members = this.view != null ? this.view.getMembers() : null;
                this.view = (View) event.getArg();
                Vector<Address> determineLeftMembers = Util.determineLeftMembers(members, this.view != null ? this.view.getMembers() : null);
                if (this.view instanceof MergeView) {
                    MergeView mergeView = (MergeView) this.view.clone();
                    if (log.isTraceEnabled()) {
                        log.trace("received a MergeView: " + mergeView + ", adjusting the service view");
                    }
                    try {
                        try {
                            handleMergeView(mergeView);
                            synchronized (this.service_responses) {
                                this.service_responses.clear();
                            }
                            synchronized (this.services_merged_collector) {
                                this.services_merged_collector.clear();
                            }
                            this.services_merged.set(false);
                        } catch (Throwable th) {
                            synchronized (this.service_responses) {
                                this.service_responses.clear();
                                synchronized (this.services_merged_collector) {
                                    this.services_merged_collector.clear();
                                    this.services_merged.set(false);
                                    throw th;
                                }
                            }
                        }
                    } catch (Exception e2) {
                        if (log.isErrorEnabled()) {
                            log.error("failed handling merge view", e2);
                        }
                        synchronized (this.service_responses) {
                            this.service_responses.clear();
                            synchronized (this.services_merged_collector) {
                                this.services_merged_collector.clear();
                                this.services_merged.set(false);
                            }
                        }
                    }
                } else {
                    HashMap hashMap = (HashMap) this.view.getPayload("service_state");
                    if (hashMap != null) {
                        synchronized (this.service_state) {
                            this.service_state.putAll(hashMap);
                        }
                    }
                }
                this.service_ack_collector.handleView(this.view);
                Iterator<Address> it = determineLeftMembers.iterator();
                while (it.hasNext()) {
                    try {
                        adjustServiceView(it.next());
                    } catch (Throwable th2) {
                        if (log.isErrorEnabled()) {
                            log.error("failed adjusting service views", th2);
                        }
                    }
                }
                return null;
            case 8:
                passToAllMuxChannels(event);
                return null;
            case 9:
                this.service_ack_collector.suspect((Address) event.getArg());
                passToAllMuxChannels(event);
                return null;
            case 10:
                passToAllMuxChannels(event, true, true);
                return null;
            case Event.GET_APPLSTATE /* 17 */:
                return handleStateRequest(event, true);
            case Event.GET_STATE_OK /* 20 */:
            case Event.STATE_TRANSFER_INPUTSTREAM /* 71 */:
                handleStateResponse(event, true);
                return null;
            case Event.EXIT /* 46 */:
                closeAll();
                return null;
            case Event.STATE_TRANSFER_OUTPUTSTREAM /* 72 */:
                handleStateRequest(event, true);
                return null;
            case Event.UNBLOCK /* 75 */:
                passToAllMuxChannels(event);
                return null;
            case Event.PREPARE_VIEW /* 86 */:
                View view = (View) event.getArg();
                Vector<Address> members2 = this.view != null ? this.view.getMembers() : new Vector<>();
                Vector vector = new Vector(view.getMembers());
                vector.removeAll(members2);
                if (vector.isEmpty()) {
                    return null;
                }
                synchronized (this.service_state) {
                    view.addPayload("service_state", this.service_state);
                }
                return null;
            default:
                passToAllMuxChannels(event);
                return null;
        }
    }

    public Channel createMuxChannel(String str, String str2) throws Exception {
        if (this.services.containsKey(str)) {
            throw new Exception("service ID \"" + str + "\" is already registered at channel" + getLocalAddress() + ", cannot register service with duplicate ID at the same channel");
        }
        MuxChannel muxChannel = new MuxChannel(str, str2, this);
        this.services.put(str, muxChannel);
        return muxChannel;
    }

    private void passToAllMuxChannels(Event event) {
        passToAllMuxChannels(event, false, true);
    }

    private void passToAllMuxChannels(Event event, boolean z, boolean z2) {
        for (Map.Entry<String, MuxChannel> entry : this.services.entrySet()) {
            passToMuxChannel(entry.getValue(), event, this.fifo_queue, null, entry.getKey(), z, z2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addServiceIfNotPresent(String str, MuxChannel muxChannel) {
        this.services.putIfAbsent(str, muxChannel);
    }

    protected MuxChannel removeService(String str) {
        MuxChannel remove = this.services.remove(str);
        if (remove != null) {
            remove.up(new Event(75));
        }
        return remove;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void disconnect() {
        boolean z = true;
        Iterator<MuxChannel> it = this.services.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            } else if (it.next().isConnected()) {
                z = false;
                break;
            }
        }
        if (z) {
            if (log.isTraceEnabled()) {
                log.trace("disconnecting underlying JChannel as all MuxChannels are disconnected");
            }
            this.channel.disconnect();
        }
    }

    public boolean close() {
        boolean z = true;
        Iterator<MuxChannel> it = this.services.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().isOpen()) {
                z = false;
                break;
            }
        }
        if (z) {
            if (log.isTraceEnabled()) {
                log.trace("closing underlying JChannel as all MuxChannels are closed");
            }
            this.channel.close();
            this.services.clear();
            shutdownThreadPool();
        }
        return z;
    }

    public void closeAll() {
        for (MuxChannel muxChannel : this.services.values()) {
            muxChannel.setConnected(false);
            muxChannel.setClosed(true);
            muxChannel.closeMessageQueue(true);
        }
    }

    boolean shutdown() {
        boolean z = true;
        Iterator<MuxChannel> it = this.services.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().isOpen()) {
                z = false;
                break;
            }
        }
        if (z) {
            if (log.isTraceEnabled()) {
                log.trace("shutting down underlying JChannel as all MuxChannels are closed");
            }
            this.channel.shutdown();
            this.services.clear();
            shutdownThreadPool();
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Address getLocalAddress() {
        return this.channel.getLocalAddress();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean flushSupported() {
        return this.channel.flushSupported();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean startFlush(boolean z) {
        return this.channel.startFlush(z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopFlush() {
        this.channel.stopFlush();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConnected() {
        return this.channel.isConnected();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connect(String str) throws ChannelException {
        this.channel.connect(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isOpen() {
        return this.channel.isOpen();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void open() throws ChannelException {
        this.channel.open();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Address getStateProvider(Address address, String str) {
        Address address2 = null;
        List<Address> list = this.service_state.get(str);
        if (list != null && !list.isEmpty()) {
            address2 = list.contains(address) ? address : list.get(0);
        }
        return address2;
    }

    private void sendServiceMessage(boolean z, byte b, String str, byte[] bArr, boolean z2) throws Exception {
        Address localAddress = getLocalAddress();
        if (localAddress == null) {
            if (log.isWarnEnabled()) {
                log.warn("local_addr is null, cannot send ServiceInfo." + ServiceInfo.typeToString(b) + " message");
                return;
            }
            return;
        }
        if (!this.channel.isOpen() || !this.channel.isConnected()) {
            if (log.isWarnEnabled()) {
                log.warn("Underlying multiplexer channel " + this.channel.getLocalAddress() + " is not connected, cannot send ServiceInfo." + ServiceInfo.typeToString(b) + " message");
                return;
            }
            return;
        }
        Message message = new Message();
        message.putHeader(NAME, new MuxHeader(new ServiceInfo(b, str, localAddress, bArr)));
        if (z2) {
            message.setFlag((byte) 1);
        }
        if (this.channel.flushSupported()) {
            message.putHeader(FLUSH.NAME, new FLUSH.FlushHeader((byte) 6));
        }
        if (!z) {
            this.channel.send(message);
            return;
        }
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        copyOnWriteArrayList.add(localAddress);
        List<Address> list = this.service_state.get(str);
        if (list != null && !list.isEmpty()) {
            copyOnWriteArrayList.addAllAbsent(list);
        }
        this.service_ack_collector.reset(null, copyOnWriteArrayList);
        int size = this.service_ack_collector.size();
        this.channel.send(message);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.service_ack_collector.waitForAllAcks(this.service_ack_timeout);
            if (log.isTraceEnabled()) {
                log.trace("received all service ACKs (" + size + ")  in " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
            }
        } catch (TimeoutException e) {
            log.warn("failed to collect all service ACKs (" + size + ") for " + message + " after " + this.service_ack_timeout + "ms, missing ACKs from " + this.service_ack_collector.printMissing() + " (received=" + this.service_ack_collector.printReceived() + "), local_addr=" + getLocalAddress());
        }
    }

    private Object handleStateRequest(Event event, boolean z) {
        StateTransferInfo stateTransferInfo = (StateTransferInfo) event.getArg();
        String str = stateTransferInfo.state_id;
        Address address = stateTransferInfo.target;
        if (str == null) {
            if (log.isWarnEnabled()) {
                log.warn("Invalid state request " + stateTransferInfo + " arrived at Multiplexer, dropping it");
            }
            return new StateTransferInfo(null, str, 0L, null);
        }
        try {
            int indexOf = str.indexOf(SEPARATOR);
            if (indexOf > -1) {
                stateTransferInfo.state_id = str.substring(indexOf + SEPARATOR_LEN);
                str = str.substring(0, indexOf);
            } else {
                stateTransferInfo.state_id = null;
            }
            MuxChannel muxChannel = this.services.get(str);
            if (muxChannel == null) {
                if (log.isWarnEnabled()) {
                    log.warn("State provider " + this.channel.getLocalAddress() + " does not have service with id " + str + ", returning null state");
                }
                return new StateTransferInfo(null, str, 0L, null);
            }
            StateTransferInfo stateTransferInfo2 = (StateTransferInfo) passToMuxChannel(muxChannel, event, this.fifo_queue, address, str, z);
            if (stateTransferInfo2 == null) {
                return new StateTransferInfo(null, str, 0L, null);
            }
            stateTransferInfo2.state_id = str;
            return stateTransferInfo2;
        } catch (Throwable th) {
            if (log.isErrorEnabled()) {
                log.error("failed returning the application state, will return null", th);
            }
            return new StateTransferInfo(null, str, 0L, null);
        }
    }

    private void handleStateResponse(Event event, boolean z) {
        String str;
        String str2;
        StateTransferInfo stateTransferInfo = (StateTransferInfo) event.getArg();
        Address address = stateTransferInfo.target;
        String str3 = stateTransferInfo.state_id;
        if (str3 == null) {
            if (log.isTraceEnabled()) {
                log.trace("state is null, not passing up: " + stateTransferInfo);
                return;
            }
            return;
        }
        int indexOf = str3.indexOf(SEPARATOR);
        if (indexOf > -1) {
            str = str3.substring(0, indexOf);
            str2 = str3.substring(indexOf + SEPARATOR_LEN);
        } else {
            str = str3;
            str2 = null;
        }
        MuxChannel muxChannel = this.services.get(str);
        if (muxChannel == null) {
            log.error("State receiver " + this.channel.getLocalAddress() + " does not have service with id " + str);
            return;
        }
        StateTransferInfo copy = stateTransferInfo.copy();
        copy.state_id = str2;
        passToMuxChannel(muxChannel, new Event(event.getType(), copy), this.fifo_queue, address, str, z);
    }

    private void handleServiceMessage(ServiceInfo serviceInfo, Address address) throws Exception {
        switch (serviceInfo.type) {
            case 3:
                handleServiceUp(serviceInfo.service, serviceInfo.host);
                ackServiceMessage(serviceInfo, address);
                return;
            case 4:
                handleServiceDown(serviceInfo.service, serviceInfo.host);
                ackServiceMessage(serviceInfo, address);
                return;
            case 5:
                handleServicesRsp(address, serviceInfo.state);
                return;
            case 6:
                this.service_ack_collector.ack(address);
                return;
            case 7:
                synchronized (this.services_merged_collector) {
                    if (!this.services_merged_collector.contains(address)) {
                        this.services_merged_collector.add(address);
                    }
                    this.services_merged.set(this.view != null && this.services_merged_collector.containsAll(this.view.getMembers()));
                    if (log.isDebugEnabled()) {
                        log.debug(getLocalAddress() + " got service merged from " + address + " merged so far " + this.services_merged_collector + " view is " + this.view.size());
                    }
                }
                return;
            default:
                if (log.isErrorEnabled()) {
                    log.error("service request type " + ((int) serviceInfo.type) + " not known");
                    return;
                }
                return;
        }
    }

    private void ackServiceMessage(ServiceInfo serviceInfo, Address address) throws ChannelNotConnectedException, ChannelClosedException {
        Message message = new Message(address, (Address) null, (byte[]) null);
        message.setFlag((byte) 1);
        message.putHeader(NAME, new MuxHeader(new ServiceInfo((byte) 6, serviceInfo.service, serviceInfo.host, null)));
        if (this.channel.isConnected()) {
            this.channel.send(message);
        }
    }

    private void handleServicesRsp(Address address, byte[] bArr) throws Exception {
        boolean z;
        Set set = (Set) Util.objectFromByteBuffer(bArr);
        synchronized (this.service_responses) {
            Set<String> set2 = this.service_responses.get(address);
            if (set2 == null) {
                set2 = new HashSet();
            }
            set2.addAll(set);
            this.service_responses.put(address, set2);
            if (log.isDebugEnabled()) {
                log.debug(getLocalAddress() + " received service response: " + address + "(" + set.toString() + ")");
            }
            z = this.view != null && this.service_responses.keySet().containsAll(this.view.getMembers());
        }
        if (z) {
            if (log.isDebugEnabled()) {
                log.debug(getLocalAddress() + " sent service merged " + this.service_responses.keySet() + " view is " + this.view.getMembers());
            }
            sendServiceMessage(false, (byte) 7, null, null, true);
        }
    }

    private void handleServiceDown(String str, Address address) {
        synchronized (this.service_state) {
            List<Address> list = this.service_state.get(str);
            if (list == null) {
                return;
            }
            boolean remove = list.remove(address);
            ArrayList arrayList = new ArrayList(list);
            if (remove) {
                View generateServiceView = generateServiceView(arrayList);
                MuxChannel muxChannel = this.services.get(str);
                if (muxChannel != null && muxChannel.isConnected()) {
                    passToMuxChannel(muxChannel, new Event(6, generateServiceView), this.fifo_queue, null, str, false, true);
                } else if (log.isTraceEnabled()) {
                    log.trace("service " + str + " not found, cannot dispatch service view " + generateServiceView);
                }
            }
            Address localAddress = getLocalAddress();
            if (localAddress != null && localAddress.equals(address)) {
                removeService(str);
            }
        }
    }

    private void handleServiceUp(String str, Address address) {
        ArrayList arrayList;
        boolean z = false;
        synchronized (this.service_state) {
            List<Address> list = this.service_state.get(str);
            if (list == null) {
                list = new CopyOnWriteArrayList();
                this.service_state.put(str, list);
            }
            if (!list.contains(address)) {
                list.add(address);
                z = true;
            }
            arrayList = new ArrayList(list);
        }
        if (z) {
            View generateServiceView = generateServiceView(arrayList);
            MuxChannel muxChannel = this.services.get(str);
            if (muxChannel != null) {
                passToMuxChannel(muxChannel, new Event(6, generateServiceView), this.fifo_queue, null, str, false, true);
            } else if (log.isTraceEnabled()) {
                log.trace("service " + str + " not found, cannot dispatch service view " + generateServiceView);
            }
        }
    }

    private void handleMergeView(MergeView mergeView) throws Exception {
        HashMap hashMap;
        long j = this.service_response_timeout;
        long currentTimeMillis = System.currentTimeMillis();
        byte[] objectToByteBuffer = Util.objectToByteBuffer(new HashSet(this.services.keySet()));
        while (j > 0 && !this.services_merged.get()) {
            sendServiceMessage(false, (byte) 5, null, objectToByteBuffer, true);
            Util.sleep(500L);
            j = this.service_response_timeout - (System.currentTimeMillis() - currentTimeMillis);
        }
        if (j <= 0 && !this.services_merged.get()) {
            log.warn("Services not merged at " + getLocalAddress() + " received merge from " + this.services_merged_collector);
        }
        synchronized (this.service_responses) {
            hashMap = new HashMap(this.service_responses);
        }
        if (log.isDebugEnabled()) {
            log.debug("At " + getLocalAddress() + " emitting views to MuxChannels " + hashMap);
        }
        mergeServiceState(mergeView, hashMap);
    }

    private void mergeServiceState(MergeView mergeView, Map<Address, Set<String>> map) {
        HashSet<String> hashSet = new HashSet();
        synchronized (this.service_state) {
            for (Map.Entry<Address, Set<String>> entry : map.entrySet()) {
                Address key = entry.getKey();
                Set<String> value = entry.getValue();
                if (value != null) {
                    for (String str : value) {
                        List<Address> list = this.service_state.get(str);
                        if (list == null) {
                            list = new CopyOnWriteArrayList();
                            this.service_state.put(str, list);
                        }
                        if (list.add(key)) {
                            hashSet.add(str);
                        }
                    }
                }
            }
        }
        for (String str2 : hashSet) {
            MuxChannel muxChannel = this.services.get(str2);
            if (muxChannel != null) {
                List<Address> list2 = this.service_state.get(str2);
                Vector vector = new Vector(mergeView.getMembers());
                vector.retainAll(list2);
                passToMuxChannel(muxChannel, new Event(6, new MergeView(mergeView.getVid(), vector, mergeView.getSubgroups())), this.fifo_queue, null, str2, false);
            }
        }
    }

    private void adjustServiceView(Address address) {
        Address localAddress = getLocalAddress();
        synchronized (this.service_state) {
            for (Map.Entry<String, List<Address>> entry : this.service_state.entrySet()) {
                String key = entry.getKey();
                List<Address> value = entry.getValue();
                if (value != null) {
                    if (value.remove(address)) {
                        View generateServiceView = generateServiceView(new ArrayList(value));
                        MuxChannel muxChannel = this.services.get(key);
                        if (muxChannel != null && muxChannel.isConnected()) {
                            passToMuxChannel(muxChannel, new Event(6, generateServiceView), this.fifo_queue, null, key, false, true);
                        } else if (log.isTraceEnabled()) {
                            log.trace("service " + key + " not found, cannot dispatch service view " + generateServiceView);
                        }
                    }
                    if (localAddress != null && localAddress.equals(address)) {
                        removeService(key);
                    }
                }
            }
        }
    }

    private View generateServiceView(List<Address> list) {
        if (this.view == null) {
            Vector vector = new Vector();
            vector.add(getLocalAddress());
            this.view = new View(new ViewId(getLocalAddress()), vector);
        }
        Vector vector2 = new Vector(this.view.getMembers());
        vector2.retainAll(list);
        return new View(this.view.getVid(), vector2);
    }

    private Object passToMuxChannel(MuxChannel muxChannel, Event event, FIFOMessageQueue<String, Runnable> fIFOMessageQueue, Address address, String str, boolean z) {
        return passToMuxChannel(muxChannel, event, fIFOMessageQueue, address, str, z, false);
    }

    private Object passToMuxChannel(MuxChannel muxChannel, Event event, FIFOMessageQueue<String, Runnable> fIFOMessageQueue, Address address, String str, boolean z, boolean z2) {
        if (this.thread_pool == null || z2) {
            return muxChannel.up(event);
        }
        Task task = new Task(muxChannel, event, fIFOMessageQueue, address, str, z);
        ExecuteTask executeTask = new ExecuteTask(this.fifo_queue);
        try {
            this.fifo_queue.put(address, str, task);
            this.thread_pool.execute(executeTask);
            if (z) {
                try {
                    return task.exchanger.exchange(null);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return null;
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            return null;
        }
    }
}
