package org.jgroups.protocols.pbcast;

import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.TimeoutException;
import org.jgroups.View;
import org.jgroups.stack.GossipRouter;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.util.Promise;
import org.jgroups.util.Streamable;
import org.jgroups.util.Util;

/* loaded from: input_file:org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER.class */
public class STREAMING_STATE_TRANSFER extends Protocol {
    long start;
    long stop;
    static final String NAME = "STREAMING_STATE_TRANSFER";
    private InetAddress bind_addr;
    private StateProviderThreadSpawner spawner;
    private long pool_thread_keep_alive;
    private boolean use_reading_thread;
    private Promise flush_promise;
    private volatile boolean use_flush;
    private long flush_timeout;
    private int threadCounter;
    static Class class$org$jgroups$protocols$pbcast$Digest;
    static Class class$org$jgroups$stack$IpAddress;
    Address local_addr = null;
    final Vector members = new Vector();
    long state_id = 1;
    final Map state_requesters = new HashMap();
    boolean waiting_for_state_response = false;
    Digest digest = null;
    final HashMap map = new HashMap();
    int num_state_reqs = 0;
    long num_bytes_sent = 0;
    double avg_state_size = 0.0d;
    private int bind_port = 0;
    private int max_pool = 5;
    private int socket_buffer_size = 8192;
    private final Object poolLock = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER$1, reason: invalid class name */
    /* loaded from: input_file:org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER$1.class */
    public class AnonymousClass1 implements ThreadFactory {
        private final STREAMING_STATE_TRANSFER this$0;

        AnonymousClass1(STREAMING_STATE_TRANSFER streaming_state_transfer) {
            this.this$0 = streaming_state_transfer;
        }

        public Thread newThread(Runnable runnable) {
            synchronized (this.this$0.poolLock) {
                STREAMING_STATE_TRANSFER.access$108(this.this$0);
            }
            return new Thread(this, Util.getGlobalThreadGroup(), new StringBuffer().append("STREAMING_STATE_TRANSFER.poolid=").append(this.this$0.threadCounter).toString(), runnable) { // from class: org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.1.1
                private final Runnable val$command;
                private final AnonymousClass1 this$1;

                {
                    this.this$1 = this;
                    this.val$command = runnable;
                }

                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    if (this.this$1.this$0.log.isDebugEnabled()) {
                        this.this$1.this$0.log.debug(new StringBuffer().append(Thread.currentThread()).append(" started.").toString());
                    }
                    this.val$command.run();
                    if (this.this$1.this$0.log.isDebugEnabled()) {
                        this.this$1.this$0.log.debug(new StringBuffer().append(Thread.currentThread()).append(" stopped.").toString());
                    }
                }
            };
        }
    }

    /* loaded from: input_file:org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER$StateHeader.class */
    public static class StateHeader extends Header implements Streamable {
        public static final byte STATE_REQ = 1;
        public static final byte STATE_RSP = 2;
        public static final byte STATE_REMOVE_REQUESTER = 3;
        long id;
        byte type;
        Address sender;
        Digest my_digest;
        IpAddress bind_addr;
        String state_id;

        public StateHeader() {
            this.id = 0L;
            this.type = (byte) 0;
            this.my_digest = null;
            this.bind_addr = null;
            this.state_id = null;
        }

        public StateHeader(byte b, Address address, String str) {
            this.id = 0L;
            this.type = (byte) 0;
            this.my_digest = null;
            this.bind_addr = null;
            this.state_id = null;
            this.type = b;
            this.sender = address;
            this.state_id = str;
        }

        public StateHeader(byte b, Address address, long j, Digest digest) {
            this.id = 0L;
            this.type = (byte) 0;
            this.my_digest = null;
            this.bind_addr = null;
            this.state_id = null;
            this.type = b;
            this.sender = address;
            this.id = j;
            this.my_digest = digest;
        }

        public StateHeader(byte b, Address address, IpAddress ipAddress, Digest digest, String str) {
            this.id = 0L;
            this.type = (byte) 0;
            this.my_digest = null;
            this.bind_addr = null;
            this.state_id = null;
            this.type = b;
            this.sender = address;
            this.my_digest = digest;
            this.bind_addr = ipAddress;
            this.state_id = str;
        }

        public int getType() {
            return this.type;
        }

        public Digest getDigest() {
            return this.my_digest;
        }

        public String getStateId() {
            return this.state_id;
        }

        public boolean equals(Object obj) {
            if (this.sender == null || obj == null || !(obj instanceof StateHeader)) {
                return false;
            }
            StateHeader stateHeader = (StateHeader) obj;
            return this.sender.equals(stateHeader.sender) && this.id == stateHeader.id;
        }

        public int hashCode() {
            return this.sender != null ? this.sender.hashCode() + ((int) this.id) : (int) this.id;
        }

        @Override // org.jgroups.Header
        public String toString() {
            StringBuffer stringBuffer = new StringBuffer();
            stringBuffer.append("type=").append(type2Str(this.type));
            if (this.sender != null) {
                stringBuffer.append(", sender=").append(this.sender).append(" id=").append(this.id);
            }
            if (this.my_digest != null) {
                stringBuffer.append(", digest=").append(this.my_digest);
            }
            return stringBuffer.toString();
        }

        static String type2Str(int i) {
            switch (i) {
                case 1:
                    return "STATE_REQ";
                case 2:
                    return "STATE_RSP";
                case 3:
                    return "STATE_REMOVE_REQUESTER";
                default:
                    return "<unknown>";
            }
        }

        @Override // java.io.Externalizable
        public void writeExternal(ObjectOutput objectOutput) throws IOException {
            objectOutput.writeObject(this.sender);
            objectOutput.writeLong(this.id);
            objectOutput.writeByte(this.type);
            objectOutput.writeObject(this.my_digest);
            objectOutput.writeObject(this.bind_addr);
            objectOutput.writeUTF(this.state_id);
        }

        @Override // java.io.Externalizable
        public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            this.sender = (Address) objectInput.readObject();
            this.id = objectInput.readLong();
            this.type = objectInput.readByte();
            this.my_digest = (Digest) objectInput.readObject();
            this.bind_addr = (IpAddress) objectInput.readObject();
            this.state_id = objectInput.readUTF();
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutputStream dataOutputStream) throws IOException {
            dataOutputStream.writeByte(this.type);
            dataOutputStream.writeLong(this.id);
            Util.writeAddress(this.sender, dataOutputStream);
            Util.writeStreamable(this.my_digest, dataOutputStream);
            Util.writeStreamable(this.bind_addr, dataOutputStream);
            Util.writeString(this.state_id, dataOutputStream);
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInputStream dataInputStream) throws IOException, IllegalAccessException, InstantiationException {
            Class cls;
            Class cls2;
            this.type = dataInputStream.readByte();
            this.id = dataInputStream.readLong();
            this.sender = Util.readAddress(dataInputStream);
            if (STREAMING_STATE_TRANSFER.class$org$jgroups$protocols$pbcast$Digest == null) {
                cls = STREAMING_STATE_TRANSFER.class$("org.jgroups.protocols.pbcast.Digest");
                STREAMING_STATE_TRANSFER.class$org$jgroups$protocols$pbcast$Digest = cls;
            } else {
                cls = STREAMING_STATE_TRANSFER.class$org$jgroups$protocols$pbcast$Digest;
            }
            this.my_digest = (Digest) Util.readStreamable(cls, dataInputStream);
            if (STREAMING_STATE_TRANSFER.class$org$jgroups$stack$IpAddress == null) {
                cls2 = STREAMING_STATE_TRANSFER.class$("org.jgroups.stack.IpAddress");
                STREAMING_STATE_TRANSFER.class$org$jgroups$stack$IpAddress = cls2;
            } else {
                cls2 = STREAMING_STATE_TRANSFER.class$org$jgroups$stack$IpAddress;
            }
            this.bind_addr = (IpAddress) Util.readStreamable(cls2, dataInputStream);
            this.state_id = Util.readString(dataInputStream);
        }

        @Override // org.jgroups.Header
        public long size() {
            long size = 9 + Util.size(this.sender) + 1;
            if (this.my_digest != null) {
                size += this.my_digest.serializedSize();
            }
            long j = size + 1;
            if (this.state_id != null) {
                j += this.state_id.length() + 2;
            }
            return j;
        }
    }

    /* loaded from: input_file:org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER$StateProviderHandler.class */
    private class StateProviderHandler {
        private final STREAMING_STATE_TRANSFER this$0;

        private StateProviderHandler(STREAMING_STATE_TRANSFER streaming_state_transfer) {
            this.this$0 = streaming_state_transfer;
        }

        public void process(Socket socket) {
            StreamingOutputStreamWrapper streamingOutputStreamWrapper = null;
            try {
                try {
                    int sendBufferSize = socket.getSendBufferSize();
                    socket.setSendBufferSize(this.this$0.socket_buffer_size);
                    if (this.this$0.log.isDebugEnabled()) {
                        this.this$0.log.debug(new StringBuffer().append("Running on ").append(Thread.currentThread()).append(". Accepted request for state transfer from ").append(socket.getInetAddress()).append(":").append(socket.getPort()).append(", original buffer size was ").append(sendBufferSize).append(" and was reset to ").append(socket.getSendBufferSize()).append(", passing outputstream up... ").toString());
                    }
                    ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                    String str = (String) objectInputStream.readObject();
                    Address address = (Address) objectInputStream.readObject();
                    this.this$0.removeFromStateRequesters(address, str);
                    streamingOutputStreamWrapper = new StreamingOutputStreamWrapper(this.this$0, socket);
                    this.this$0.passUp(new Event(72, new StateTransferInfo(address, streamingOutputStreamWrapper, str)));
                    if (socket == null || socket.isConnected()) {
                        return;
                    }
                    if (this.this$0.warn) {
                        this.this$0.log.warn(new StringBuffer().append("Accepted request for state transfer but socket ").append(socket).append(" not connected properly. Closing it...").toString());
                    }
                    try {
                        if (streamingOutputStreamWrapper != null) {
                            streamingOutputStreamWrapper.close();
                        } else {
                            socket.close();
                        }
                    } catch (IOException e) {
                    }
                } catch (IOException e2) {
                    if (this.this$0.warn) {
                        this.this$0.log.warn("State writer socket thread spawned abnormaly", e2);
                    }
                    if (socket == null || socket.isConnected()) {
                        return;
                    }
                    if (this.this$0.warn) {
                        this.this$0.log.warn(new StringBuffer().append("Accepted request for state transfer but socket ").append(socket).append(" not connected properly. Closing it...").toString());
                    }
                    try {
                        if (streamingOutputStreamWrapper != null) {
                            streamingOutputStreamWrapper.close();
                        } else {
                            socket.close();
                        }
                    } catch (IOException e3) {
                    }
                } catch (ClassNotFoundException e4) {
                    if (socket == null || socket.isConnected()) {
                        return;
                    }
                    if (this.this$0.warn) {
                        this.this$0.log.warn(new StringBuffer().append("Accepted request for state transfer but socket ").append(socket).append(" not connected properly. Closing it...").toString());
                    }
                    try {
                        if (streamingOutputStreamWrapper != null) {
                            streamingOutputStreamWrapper.close();
                        } else {
                            socket.close();
                        }
                    } catch (IOException e5) {
                    }
                }
            } catch (Throwable th) {
                if (socket != null && !socket.isConnected()) {
                    if (this.this$0.warn) {
                        this.this$0.log.warn(new StringBuffer().append("Accepted request for state transfer but socket ").append(socket).append(" not connected properly. Closing it...").toString());
                    }
                    try {
                        if (streamingOutputStreamWrapper != null) {
                            streamingOutputStreamWrapper.close();
                        } else {
                            socket.close();
                        }
                    } catch (IOException e6) {
                    }
                }
                throw th;
            }
        }

        StateProviderHandler(STREAMING_STATE_TRANSFER streaming_state_transfer, AnonymousClass1 anonymousClass1) {
            this(streaming_state_transfer);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER$StateProviderThreadSpawner.class */
    public class StateProviderThreadSpawner implements Runnable {
        PooledExecutor pool;
        ServerSocket serverSocket;
        IpAddress address;
        volatile boolean running = true;
        private final STREAMING_STATE_TRANSFER this$0;

        public StateProviderThreadSpawner(STREAMING_STATE_TRANSFER streaming_state_transfer, PooledExecutor pooledExecutor, ServerSocket serverSocket) {
            this.this$0 = streaming_state_transfer;
            this.pool = pooledExecutor;
            this.serverSocket = serverSocket;
            this.address = new IpAddress(streaming_state_transfer.bind_addr, this.serverSocket.getLocalPort());
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    if (this.this$0.log.isDebugEnabled()) {
                        this.this$0.log.debug(new StringBuffer().append("StateProviderThreadSpawner listening at ").append(getServerSocketAddress()).append("...").toString());
                    }
                    if (this.this$0.log.isDebugEnabled()) {
                        this.this$0.log.debug(new StringBuffer().append("Pool has ").append(this.pool.getPoolSize()).append(" active threads").toString());
                    }
                    this.pool.execute(new Runnable(this, this.serverSocket.accept()) { // from class: org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.StateProviderThreadSpawner.1
                        private final Socket val$socket;
                        private final StateProviderThreadSpawner this$1;

                        {
                            this.this$1 = this;
                            this.val$socket = r5;
                        }

                        @Override // java.lang.Runnable
                        public void run() {
                            if (this.this$1.this$0.log.isDebugEnabled()) {
                                this.this$1.this$0.log.debug(new StringBuffer().append("Accepted request for state transfer from ").append(this.val$socket.getInetAddress()).append(":").append(this.val$socket.getPort()).append(" handing of to PooledExecutor thread").toString());
                            }
                            new StateProviderHandler(this.this$1.this$0, null).process(this.val$socket);
                        }
                    });
                } catch (IOException e) {
                    if (this.this$0.warn && this.serverSocket != null && !this.serverSocket.isClosed()) {
                        this.this$0.log.warn("Spawning socket from server socket finished abnormaly", e);
                    }
                } catch (InterruptedException e2) {
                }
            }
        }

        public IpAddress getServerSocketAddress() {
            return this.address;
        }

        public void stop() {
            this.running = false;
            try {
                if (this.serverSocket != null && !this.serverSocket.isClosed()) {
                    this.serverSocket.close();
                }
                if (this.this$0.log.isDebugEnabled()) {
                    this.this$0.log.debug("Shutting the thread pool down... ");
                }
                this.pool.shutdownNow();
            } catch (IOException e) {
                if (this.this$0.log.isDebugEnabled()) {
                    this.this$0.log.debug("Shutting the thread pool down... ");
                }
                this.pool.shutdownNow();
            } catch (Throwable th) {
                if (this.this$0.log.isDebugEnabled()) {
                    this.this$0.log.debug("Shutting the thread pool down... ");
                }
                this.pool.shutdownNow();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER$StreamingInputStreamWrapper.class */
    public class StreamingInputStreamWrapper extends InputStream {
        private Socket inputStreamOwner;
        private InputStream delegate;
        private Channel channelOwner;
        private final STREAMING_STATE_TRANSFER this$0;

        public StreamingInputStreamWrapper(STREAMING_STATE_TRANSFER streaming_state_transfer, Socket socket) throws IOException {
            this.this$0 = streaming_state_transfer;
            this.inputStreamOwner = socket;
            this.delegate = socket.getInputStream();
            this.channelOwner = streaming_state_transfer.stack.getChannel();
        }

        @Override // java.io.InputStream
        public int available() throws IOException {
            return this.delegate.available();
        }

        @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.this$0.log.isDebugEnabled()) {
                this.this$0.log.debug(new StringBuffer().append("State reader ").append(this.inputStreamOwner).append(" is closing the socket ").toString());
            }
            if (this.channelOwner != null && this.channelOwner.isConnected()) {
                this.channelOwner.down(new Event(73));
            }
            this.inputStreamOwner.close();
        }

        @Override // java.io.InputStream
        public synchronized void mark(int i) {
            this.delegate.mark(i);
        }

        @Override // java.io.InputStream
        public boolean markSupported() {
            return this.delegate.markSupported();
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            return this.delegate.read();
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            return this.delegate.read(bArr, i, i2);
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr) throws IOException {
            return this.delegate.read(bArr);
        }

        @Override // java.io.InputStream
        public synchronized void reset() throws IOException {
            this.delegate.reset();
        }

        @Override // java.io.InputStream
        public long skip(long j) throws IOException {
            return this.delegate.skip(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jgroups/protocols/pbcast/STREAMING_STATE_TRANSFER$StreamingOutputStreamWrapper.class */
    public class StreamingOutputStreamWrapper extends OutputStream {
        private Socket outputStreamOwner;
        private OutputStream delegate;
        private long bytesWrittenCounter = 0;
        private final STREAMING_STATE_TRANSFER this$0;

        public StreamingOutputStreamWrapper(STREAMING_STATE_TRANSFER streaming_state_transfer, Socket socket) throws IOException {
            this.this$0 = streaming_state_transfer;
            this.outputStreamOwner = socket;
            this.delegate = socket.getOutputStream();
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.this$0.log.isDebugEnabled()) {
                this.this$0.log.debug(new StringBuffer().append("State writer ").append(this.outputStreamOwner).append(" is closing the socket ").toString());
            }
            try {
                try {
                    this.outputStreamOwner.close();
                    if (this.this$0.stats) {
                        synchronized (this.this$0.state_requesters) {
                            this.this$0.num_bytes_sent += this.bytesWrittenCounter;
                            this.this$0.avg_state_size = this.this$0.num_bytes_sent / this.this$0.num_state_reqs;
                        }
                    }
                } catch (IOException e) {
                    throw e;
                }
            } catch (Throwable th) {
                if (this.this$0.stats) {
                    synchronized (this.this$0.state_requesters) {
                        this.this$0.num_bytes_sent += this.bytesWrittenCounter;
                        this.this$0.avg_state_size = this.this$0.num_bytes_sent / this.this$0.num_state_reqs;
                    }
                }
                throw th;
            }
        }

        @Override // java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            this.delegate.flush();
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            this.delegate.write(bArr, i, i2);
            this.bytesWrittenCounter += i2;
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr) throws IOException {
            this.delegate.write(bArr);
            if (bArr != null) {
                this.bytesWrittenCounter += bArr.length;
            }
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            this.delegate.write(i);
            this.bytesWrittenCounter++;
        }
    }

    @Override // org.jgroups.stack.Protocol
    public final String getName() {
        return NAME;
    }

    public int getNumberOfStateRequests() {
        return this.num_state_reqs;
    }

    public long getNumberOfStateBytesSent() {
        return this.num_bytes_sent;
    }

    public double getAverageStateSize() {
        return this.avg_state_size;
    }

    @Override // org.jgroups.stack.Protocol
    public Vector requiredDownServices() {
        Vector vector = new Vector();
        vector.addElement(new Integer(42));
        vector.addElement(new Integer(41));
        return vector;
    }

    @Override // org.jgroups.stack.Protocol
    public Vector requiredUpServices() {
        Vector vector = new Vector();
        if (this.use_flush) {
            vector.addElement(new Integer(68));
            vector.addElement(new Integer(70));
        }
        return vector;
    }

    @Override // org.jgroups.stack.Protocol
    public void resetStats() {
        super.resetStats();
        this.num_state_reqs = 0;
        this.num_bytes_sent = 0L;
        this.avg_state_size = 0.0d;
    }

    @Override // org.jgroups.stack.Protocol
    public boolean setProperties(Properties properties) {
        super.setProperties(properties);
        this.use_flush = Util.parseBoolean(properties, "use_flush", false);
        if (this.use_flush) {
            this.flush_promise = new Promise();
        }
        this.flush_timeout = Util.parseLong(properties, "flush_timeout", 10000L);
        try {
            this.bind_addr = Util.parseBindAddress(properties, "bind_addr");
            this.bind_port = Util.parseInt(properties, "start_port", 0);
            this.socket_buffer_size = Util.parseInt(properties, "socket_buffer_size", 8192);
            this.max_pool = Util.parseInt(properties, "max_pool", 5);
            this.pool_thread_keep_alive = Util.parseLong(properties, "pool_thread_keep_alive", GossipRouter.EXPIRY_TIME);
            this.use_reading_thread = Util.parseBoolean(properties, "use_reading_thread", false);
            if (properties.size() <= 0) {
                return true;
            }
            this.log.error(new StringBuffer().append("the following properties are not recognized: ").append(properties).toString());
            return false;
        } catch (UnknownHostException e) {
            this.log.error(new StringBuffer().append("(bind_addr): host ").append(e.getLocalizedMessage()).append(" not known").toString());
            return false;
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void init() throws Exception {
        this.map.put("state_transfer", Boolean.TRUE);
        this.map.put("protocol_class", getClass().getName());
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        passUp(new Event(56, this.map));
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        super.stop();
        this.waiting_for_state_response = false;
        if (this.spawner != null) {
            this.spawner.stop();
        }
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public void up(Event event) {
        switch (event.getType()) {
            case 1:
                StateHeader stateHeader = (StateHeader) ((Message) event.getArg()).removeHeader(getName());
                if (stateHeader != null) {
                    switch (stateHeader.type) {
                        case 1:
                            handleStateReq(stateHeader);
                            return;
                        case 2:
                            handleStateRsp(stateHeader);
                            return;
                        case 3:
                            removeFromStateRequesters(stateHeader.sender, stateHeader.state_id);
                            return;
                        default:
                            if (this.log.isErrorEnabled()) {
                                this.log.error(new StringBuffer().append("type ").append((int) stateHeader.type).append(" not known in StateHeader").toString());
                                return;
                            }
                            return;
                    }
                }
                break;
            case 6:
            case Event.TMP_VIEW /* 15 */:
                handleViewChange((View) event.getArg());
                break;
            case 8:
                this.local_addr = (Address) event.getArg();
                break;
            case Event.GET_DIGEST_STATE_OK /* 43 */:
                synchronized (this.state_requesters) {
                    this.digest = (Digest) event.getArg();
                    if (this.log.isDebugEnabled()) {
                        this.log.debug(new StringBuffer().append("GET_DIGEST_STATE_OK: digest is ").append(this.digest).toString());
                    }
                }
                respondToStateRequester();
                return;
            case Event.CONFIG /* 56 */:
                if (this.bind_addr == null) {
                    this.bind_addr = (InetAddress) ((Map) event.getArg()).get("bind_addr");
                    if (this.log.isDebugEnabled()) {
                        this.log.debug(new StringBuffer().append("using bind_addr from CONFIG event ").append(this.bind_addr).toString());
                        break;
                    }
                }
                break;
        }
        passUp(event);
    }

    @Override // org.jgroups.stack.Protocol
    public void down(Event event) {
        Address address;
        switch (event.getType()) {
            case 6:
            case Event.TMP_VIEW /* 15 */:
                handleViewChange((View) event.getArg());
                break;
            case Event.GET_STATE /* 19 */:
                StateTransferInfo stateTransferInfo = (StateTransferInfo) event.getArg();
                if (stateTransferInfo.target == null) {
                    address = determineCoordinator();
                } else {
                    address = stateTransferInfo.target;
                    if (address.equals(this.local_addr)) {
                        if (this.log.isErrorEnabled()) {
                            this.log.error("GET_STATE: cannot fetch state from myself !");
                        }
                        address = null;
                    }
                }
                if (address == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("GET_STATE: first member (no state)");
                    }
                    passUp(new Event(20, new StateTransferInfo()));
                    return;
                }
                if (this.use_flush) {
                    startFlush(this.flush_timeout);
                }
                Message message = new Message(address, (Address) null, (byte[]) null);
                message.putHeader(NAME, new StateHeader((byte) 1, this.local_addr, stateTransferInfo.state_id));
                if (this.log.isDebugEnabled()) {
                    this.log.debug(new StringBuffer().append("GET_STATE: asking ").append(address).append(" for state").toString());
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("passing down a SUSPEND_STABLE event");
                }
                passDown(new Event(65, new Long(stateTransferInfo.timeout)));
                this.waiting_for_state_response = true;
                this.start = System.currentTimeMillis();
                passDown(new Event(1, message));
                return;
            case Event.SUSPEND_OK /* 69 */:
                if (this.use_flush) {
                    this.flush_promise.setResult(Boolean.TRUE);
                    break;
                }
                break;
            case Event.STATE_TRANSFER_INPUTSTREAM_CLOSED /* 73 */:
                if (this.use_flush) {
                    stopFlush();
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("STATE_TRANSFER_INPUTSTREAM_CLOSED received");
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("passing down a RESUME_STABLE event");
                }
                passDown(new Event(66));
                return;
        }
        passDown(event);
    }

    private boolean isDigestNeeded() {
        return !this.use_flush;
    }

    private void respondToStateRequester() {
        if (this.spawner == null) {
            this.spawner = new StateProviderThreadSpawner(this, setupThreadPool(), Util.createServerSocket(this.bind_addr, this.bind_port));
            new Thread(Util.getGlobalThreadGroup(), this.spawner, "StateProviderThreadSpawner").start();
        }
        synchronized (this.state_requesters) {
            if (this.state_requesters.isEmpty()) {
                if (this.warn) {
                    this.log.warn("Should be responding to state requester, but there are no requesters !");
                }
                return;
            }
            if (this.digest == null && isDigestNeeded()) {
                if (this.warn) {
                    this.log.warn("Should be responding to state requester, but there is no digest !");
                } else {
                    this.digest = this.digest.copy();
                }
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug(new StringBuffer().append("Iterating state requesters ").append(this.state_requesters).toString());
            }
            for (String str : this.state_requesters.keySet()) {
                for (Address address : (Set) this.state_requesters.get(str)) {
                    Message message = new Message(address);
                    message.putHeader(NAME, new StateHeader((byte) 2, this.local_addr, this.spawner.getServerSocketAddress(), this.digest, str));
                    if (this.log.isDebugEnabled()) {
                        this.log.debug(new StringBuffer().append("Responding to state requester ").append(address).append(" with address ").append(this.spawner.getServerSocketAddress()).append(" and digest ").append(this.digest).toString());
                    }
                    passDown(new Event(1, message));
                    if (this.stats) {
                        this.num_state_reqs++;
                    }
                }
            }
        }
    }

    private boolean startFlush(long j) {
        boolean z = false;
        passUp(new Event(68));
        try {
            this.flush_promise.reset();
            this.flush_promise.getResultWithTimeout(j);
            z = true;
        } catch (TimeoutException e) {
        }
        return z;
    }

    private void stopFlush() {
        passUp(new Event(70));
    }

    private PooledExecutor setupThreadPool() {
        PooledExecutor pooledExecutor = new PooledExecutor(this.max_pool);
        pooledExecutor.waitWhenBlocked();
        pooledExecutor.setMinimumPoolSize(1);
        pooledExecutor.setKeepAliveTime(this.pool_thread_keep_alive);
        pooledExecutor.setThreadFactory(new AnonymousClass1(this));
        return pooledExecutor;
    }

    private Address determineCoordinator() {
        synchronized (this.members) {
            if (this.members != null && !this.members.isEmpty()) {
                for (int i = 0; i < this.members.size(); i++) {
                    if (!this.local_addr.equals(this.members.elementAt(i))) {
                        return (Address) this.members.elementAt(i);
                    }
                }
            }
            return null;
        }
    }

    private void handleViewChange(View view) {
        Address address;
        Vector members = view.getMembers();
        boolean z = false;
        synchronized (this.members) {
            address = (Address) (this.members.size() > 0 ? this.members.firstElement() : null);
            this.members.clear();
            this.members.addAll(members);
            if (this.waiting_for_state_response && address != null && !this.members.contains(address)) {
                z = true;
            }
        }
        if (z) {
            this.log.warn(new StringBuffer().append("discovered that the state provider (").append(address).append(") crashed; will return null state to application").toString());
        }
    }

    private void handleStateReq(StateHeader stateHeader) {
        Address address = stateHeader.sender;
        if (address == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error("sender is null !");
                return;
            }
            return;
        }
        String str = stateHeader.state_id;
        synchronized (this.state_requesters) {
            boolean isEmpty = this.state_requesters.isEmpty();
            Set set = (Set) this.state_requesters.get(str);
            if (set == null) {
                set = new HashSet();
            }
            set.add(address);
            this.state_requesters.put(str, set);
            if (!isDigestNeeded()) {
                respondToStateRequester();
            } else if (isEmpty) {
                this.digest = null;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("passing down GET_DIGEST_STATE");
                }
                passDown(new Event(42));
            }
        }
    }

    void handleStateRsp(StateHeader stateHeader) {
        Digest digest = stateHeader.my_digest;
        this.waiting_for_state_response = false;
        if (isDigestNeeded()) {
            if (digest != null) {
                passDown(new Event(41, digest));
            } else if (this.warn) {
                this.log.warn(new StringBuffer().append("digest received from ").append(stateHeader.sender).append(" is null, skipping setting digest !").toString());
            }
        }
        this.stop = System.currentTimeMillis();
        connectToStateProvider(stateHeader);
    }

    void removeFromStateRequesters(Address address, String str) {
        synchronized (this.state_requesters) {
            Set set = (Set) this.state_requesters.get(str);
            if (set != null && !set.isEmpty()) {
                boolean remove = set.remove(address);
                if (this.log.isDebugEnabled()) {
                    this.log.debug(new StringBuffer().append("Attempted to clear ").append(address).append(" from requesters, successful=").append(remove).toString());
                }
                if (set.isEmpty()) {
                    this.state_requesters.remove(str);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug(new StringBuffer().append("Cleared all requesters for state ").append(str).append(",state_requesters=").append(this.state_requesters).toString());
                    }
                }
            } else if (this.warn) {
                this.log.warn(new StringBuffer().append("Attempted to clear ").append(address).append(" and state_id ").append(str).append(" from requesters,but the entry does not exist").toString());
            }
        }
    }

    private void connectToStateProvider(StateHeader stateHeader) {
        Socket socket = null;
        IpAddress ipAddress = stateHeader.bind_addr;
        String stateId = stateHeader.getStateId();
        AutoCloseable autoCloseable = null;
        try {
            try {
                Socket socket2 = new Socket();
                int receiveBufferSize = socket2.getReceiveBufferSize();
                socket2.setReceiveBufferSize(this.socket_buffer_size);
                if (this.log.isDebugEnabled()) {
                    this.log.debug(new StringBuffer().append("Connecting to state provider ").append(ipAddress.getIpAddress()).append(":").append(ipAddress.getPort()).append(", original buffer size was ").append(receiveBufferSize).append(" and was reset to ").append(socket2.getReceiveBufferSize()).toString());
                }
                socket2.connect(new InetSocketAddress(ipAddress.getIpAddress(), ipAddress.getPort()));
                if (this.log.isDebugEnabled()) {
                    this.log.debug(new StringBuffer().append("Connected to state provider, my end of the socket is ").append(socket2.getLocalAddress()).append(":").append(socket2.getLocalPort()).append(" passing inputstream up...").toString());
                }
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket2.getOutputStream());
                objectOutputStream.writeObject(stateId);
                objectOutputStream.writeObject(this.local_addr);
                StreamingInputStreamWrapper streamingInputStreamWrapper = new StreamingInputStreamWrapper(this, socket2);
                Runnable runnable = new Runnable(this, new StateTransferInfo(stateHeader.sender, streamingInputStreamWrapper, stateId)) { // from class: org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.2
                    private final StateTransferInfo val$sti;
                    private final STREAMING_STATE_TRANSFER this$0;

                    {
                        this.this$0 = this;
                        this.val$sti = r5;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        this.this$0.passUp(new Event(71, this.val$sti));
                    }
                };
                if (this.use_reading_thread) {
                    new Thread(Util.getGlobalThreadGroup(), runnable, "STREAMING_STATE_TRANSFER.reader").start();
                } else {
                    runnable.run();
                }
                if (socket2.isConnected()) {
                    return;
                }
                if (this.warn) {
                    this.log.warn("Could not connect to state provider. Closing socket...");
                }
                try {
                    if (streamingInputStreamWrapper != null) {
                        streamingInputStreamWrapper.close();
                    } else {
                        socket2.close();
                    }
                } catch (IOException e) {
                }
                Message message = new Message(stateHeader.sender);
                message.putHeader(NAME, new StateHeader((byte) 3, this.local_addr, stateId));
                passDown(new Event(1, message));
            } catch (IOException e2) {
                if (this.warn) {
                    this.log.warn("State reader socket thread spawned abnormaly", e2);
                }
                if (socket.isConnected()) {
                    return;
                }
                if (this.warn) {
                    this.log.warn("Could not connect to state provider. Closing socket...");
                }
                try {
                    if (0 != 0) {
                        autoCloseable.close();
                    } else {
                        socket.close();
                    }
                } catch (IOException e3) {
                }
                Message message2 = new Message(stateHeader.sender);
                message2.putHeader(NAME, new StateHeader((byte) 3, this.local_addr, stateId));
                passDown(new Event(1, message2));
            }
        } catch (Throwable th) {
            if (!socket.isConnected()) {
                if (this.warn) {
                    this.log.warn("Could not connect to state provider. Closing socket...");
                }
                try {
                    if (0 != 0) {
                        autoCloseable.close();
                    } else {
                        socket.close();
                    }
                } catch (IOException e4) {
                }
                Message message3 = new Message(stateHeader.sender);
                message3.putHeader(NAME, new StateHeader((byte) 3, this.local_addr, stateId));
                passDown(new Event(1, message3));
            }
            throw th;
        }
    }

    static int access$108(STREAMING_STATE_TRANSFER streaming_state_transfer) {
        int i = streaming_state_transfer.threadCounter;
        streaming_state_transfer.threadCounter = i + 1;
        return i;
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }
}
