/*
 * Decompiled with CFR 0.152.
 */
package org.modeshape.clustering;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelException;
import org.jgroups.ChannelListener;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.conf.ProtocolStackConfigurator;
import org.jgroups.conf.XmlConfigurator;
import org.jgroups.util.Util;
import org.modeshape.clustering.ClusteringI18n;
import org.modeshape.common.SystemFailureException;
import org.modeshape.common.util.CheckArg;
import org.modeshape.common.util.Logger;
import org.modeshape.graph.observe.ChangeObservers;
import org.modeshape.graph.observe.Changes;
import org.modeshape.graph.observe.ObservationBus;
import org.modeshape.graph.observe.Observer;

public class ClusteredObservationBus
implements ObservationBus {
    protected static final Logger LOGGER = Logger.getLogger(ClusteredObservationBus.class);
    protected final ChangeObservers observers = new ChangeObservers();
    private final Listener listener = new Listener();
    private final Receiver receiver = new Receiver();
    protected final AtomicBoolean isOpen = new AtomicBoolean(false);
    protected final AtomicBoolean multipleAddressesInCluster = new AtomicBoolean(false);
    private String configuration;
    private String clusterName;
    private JChannel channel;

    public String getConfiguration() {
        return this.configuration;
    }

    public void setConfiguration(String configuration) {
        if (this.channel != null) {
            String name = this.clusterName;
            throw new IllegalStateException(ClusteringI18n.clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown.text(new Object[]{name}));
        }
        this.configuration = configuration;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public void setClusterName(String clusterName) {
        CheckArg.isNotNull((Object)clusterName, (String)"clusterName");
        if (this.channel != null) {
            String name = this.clusterName;
            throw new IllegalStateException(ClusteringI18n.clusteringChannelIsRunningAndCannotBeChangedUnlessShutdown.text(new Object[]{name}));
        }
        this.clusterName = clusterName;
    }

    public synchronized void start() {
        if (this.clusterName == null) {
            throw new IllegalStateException(ClusteringI18n.clusterNameRequired.text(new Object[0]));
        }
        if (this.channel != null) {
            this.channel.removeChannelListener((ChannelListener)this.listener);
            this.channel.setReceiver(null);
        }
        try {
            this.channel = this.newChannel(this.configuration);
            assert (this.channel != null);
            this.channel.addChannelListener((ChannelListener)this.listener);
            this.channel.setReceiver((org.jgroups.Receiver)this.receiver);
            this.channel.connect(this.clusterName);
        }
        catch (ChannelException e) {
            throw new IllegalStateException(ClusteringI18n.errorWhileStartingJGroups.text(new Object[]{this.configuration}), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected JChannel newChannel(String configuration) throws ChannelException {
        if (configuration == null) {
            return new JChannel();
        }
        XmlConfigurator configurator = null;
        ByteArrayInputStream stream = new ByteArrayInputStream(configuration.getBytes());
        try {
            configurator = XmlConfigurator.getInstance((InputStream)stream);
        }
        catch (IOException e) {
        }
        finally {
            try {
                ((InputStream)stream).close();
            }
            catch (IOException e) {}
        }
        if (configurator != null) {
            return new JChannel((ProtocolStackConfigurator)configurator);
        }
        return new JChannel(configuration);
    }

    public void notify(Changes changes) {
        if (changes == null) {
            return;
        }
        if (!this.isOpen.get()) {
            return;
        }
        if (!this.multipleAddressesInCluster.get()) {
            if (!this.observers.isEmpty()) {
                this.observers.broadcast(changes);
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Received on cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}", new Object[]{this.getClusterName(), changes.getChangeRequests().size(), changes.getSourceName(), changes.getUserName(), changes.getProcessId(), changes.getTimestamp()});
                }
            }
            return;
        }
        try {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Sending to cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}", new Object[]{this.clusterName, changes.getChangeRequests().size(), changes.getSourceName(), changes.getUserName(), changes.getProcessId(), changes.getTimestamp()});
            }
            byte[] data = ClusteredObservationBus.serialize(changes);
            Message message = new Message(null, null, data);
            this.channel.send(message);
        }
        catch (ChannelClosedException e) {
            LOGGER.warn(ClusteringI18n.unableToNotifyChangesBecauseClusterChannelHasClosed, new Object[]{this.clusterName, changes.getChangeRequests().size(), changes.getSourceName(), changes.getUserName(), changes.getProcessId(), changes.getTimestamp()});
        }
        catch (ChannelNotConnectedException e) {
            LOGGER.warn(ClusteringI18n.unableToNotifyChangesBecauseClusterChannelIsNotConnected, new Object[]{this.clusterName, changes.getChangeRequests().size(), changes.getSourceName(), changes.getUserName(), changes.getProcessId(), changes.getTimestamp()});
        }
        catch (Exception e) {
            String msg = ClusteringI18n.errorSerializingChanges.text(new Object[]{this.clusterName, changes.getChangeRequests().size(), changes.getSourceName(), changes.getUserName(), changes.getProcessId(), changes.getTimestamp(), changes});
            throw new SystemFailureException(msg, (Throwable)e);
        }
    }

    public boolean register(Observer observer) {
        return this.observers.register(observer);
    }

    public boolean unregister(Observer observer) {
        return this.observers.unregister(observer);
    }

    public boolean hasObservers() {
        return !this.observers.isEmpty();
    }

    public boolean isStarted() {
        return this.channel != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void shutdown() {
        if (this.channel != null) {
            this.isOpen.set(false);
            try {
                this.channel.removeChannelListener((ChannelListener)this.listener);
                this.channel.setReceiver(null);
                this.channel.close();
            }
            finally {
                this.channel = null;
                this.observers.shutdown();
            }
        }
    }

    protected static byte[] serialize(Changes changes) throws Exception {
        return Util.objectToByteBuffer((Object)changes);
    }

    protected static Changes deserialize(byte[] data) throws Exception {
        return (Changes)Util.objectFromByteBuffer((byte[])data);
    }

    protected class Listener
    implements ChannelListener {
        protected Listener() {
        }

        public void channelClosed(Channel channel) {
            ClusteredObservationBus.this.isOpen.set(false);
        }

        public void channelConnected(Channel channel) {
            ClusteredObservationBus.this.isOpen.set(true);
        }

        public void channelDisconnected(Channel channel) {
            ClusteredObservationBus.this.isOpen.set(false);
        }

        public void channelReconnected(Address addr) {
            ClusteredObservationBus.this.isOpen.set(true);
        }

        public void channelShunned() {
            ClusteredObservationBus.this.isOpen.set(false);
        }
    }

    protected class Receiver
    implements org.jgroups.Receiver {
        private byte[] state;

        protected Receiver() {
        }

        public void block() {
            ClusteredObservationBus.this.isOpen.set(false);
        }

        public void receive(Message message) {
            if (!ClusteredObservationBus.this.observers.isEmpty()) {
                try {
                    Changes changes = ClusteredObservationBus.deserialize(message.getBuffer());
                    ClusteredObservationBus.this.observers.broadcast(changes);
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("Received on cluster '{0}' {1} changes in source '{2}' made by {3} from process '{4}' at {5}", new Object[]{ClusteredObservationBus.this.getClusterName(), changes.getChangeRequests().size(), changes.getSourceName(), changes.getUserName(), changes.getProcessId(), changes.getTimestamp()});
                    }
                }
                catch (Exception e) {
                    String msg = ClusteringI18n.errorDeserializingChanges.text(new Object[]{ClusteredObservationBus.this.getClusterName()});
                    throw new SystemFailureException(msg, (Throwable)e);
                }
            }
        }

        public byte[] getState() {
            return this.state;
        }

        public void setState(byte[] state) {
            this.state = state;
        }

        public void suspect(Address suspectedMbr) {
            LOGGER.error(ClusteringI18n.memberOfClusterIsSuspect, new Object[]{ClusteredObservationBus.this.getClusterName(), suspectedMbr});
        }

        public void viewAccepted(View newView) {
            LOGGER.trace("Members of '{0}' cluster have changed: {1}", new Object[]{ClusteredObservationBus.this.getClusterName(), newView});
            if (newView.getMembers().size() > 1) {
                if (ClusteredObservationBus.this.multipleAddressesInCluster.compareAndSet(false, true)) {
                    LOGGER.debug("There are now multiple members of cluster '{0}'; changes will be propagated throughout the cluster", new Object[]{ClusteredObservationBus.this.getClusterName()});
                }
            } else if (ClusteredObservationBus.this.multipleAddressesInCluster.compareAndSet(true, false)) {
                LOGGER.debug("There is only one member of cluster '{0}'; changes will be propagated locally only", new Object[]{ClusteredObservationBus.this.getClusterName()});
            }
        }
    }
}

