/*
 * Decompiled with CFR 0.152.
 */
package org.hawkular.agent.monitor.cmd;

import com.squareup.okhttp.Response;
import com.squareup.okhttp.ws.WebSocket;
import com.squareup.okhttp.ws.WebSocketCall;
import com.squareup.okhttp.ws.WebSocketListener;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import okio.Buffer;
import okio.BufferedSink;
import okio.BufferedSource;
import org.hawkular.agent.monitor.cmd.AddDatasourceCommand;
import org.hawkular.agent.monitor.cmd.AddJdbcDriverCommand;
import org.hawkular.agent.monitor.cmd.Command;
import org.hawkular.agent.monitor.cmd.CommandContext;
import org.hawkular.agent.monitor.cmd.DeployApplicationCommand;
import org.hawkular.agent.monitor.cmd.EchoCommand;
import org.hawkular.agent.monitor.cmd.ExecuteOperationCommand;
import org.hawkular.agent.monitor.cmd.ExportJdrCommand;
import org.hawkular.agent.monitor.cmd.GenericErrorResponseCommand;
import org.hawkular.agent.monitor.cmd.RemoveDatasourceCommand;
import org.hawkular.agent.monitor.cmd.RemoveJdbcDriverCommand;
import org.hawkular.agent.monitor.cmd.UpdateDatasourceCommand;
import org.hawkular.agent.monitor.extension.MonitorServiceConfiguration;
import org.hawkular.agent.monitor.log.AgentLoggers;
import org.hawkular.agent.monitor.log.MsgLogger;
import org.hawkular.agent.monitor.service.MonitorService;
import org.hawkular.agent.monitor.storage.HttpClientBuilder;
import org.hawkular.agent.monitor.util.Util;
import org.hawkular.bus.common.BasicMessage;
import org.hawkular.bus.common.BasicMessageWithExtraData;
import org.hawkular.bus.common.BinaryData;
import org.hawkular.cmdgw.api.ApiDeserializer;
import org.hawkular.cmdgw.api.AuthMessage;
import org.hawkular.cmdgw.api.Authentication;
import org.hawkular.cmdgw.api.GenericErrorResponse;
import org.hawkular.cmdgw.api.GenericErrorResponseBuilder;

public class FeedCommProcessor
implements WebSocketListener {
    private static final MsgLogger log = AgentLoggers.getLogger(FeedCommProcessor.class);
    private static final Map<String, Class<? extends Command<?, ?>>> VALID_COMMANDS = new HashMap();
    private final int disconnectCode = 1000;
    private final String disconnectReason = "Shutting down FeedCommProcessor";
    private final HttpClientBuilder httpClientBuilder;
    private final MonitorServiceConfiguration config;
    private final MonitorService discoveryService;
    private final String feedcommUrl;
    private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
    private final AtomicReference<ReconnectJobThread> reconnectJobThread = new AtomicReference();
    private WebSocketCall webSocketCall;
    private WebSocket webSocket;
    private boolean destroyed = false;

    public FeedCommProcessor(HttpClientBuilder httpClientBuilder, MonitorServiceConfiguration config, String feedId, MonitorService discoveryService) {
        if (feedId == null || feedId.isEmpty()) {
            throw new IllegalArgumentException("Must have a valid feed ID to communicate with the server");
        }
        this.httpClientBuilder = httpClientBuilder;
        this.config = config;
        this.discoveryService = discoveryService;
        try {
            StringBuilder url = Util.getContextUrlString(config.getStorageAdapter().getUrl(), config.getStorageAdapter().getFeedcommContext());
            url.append("feed/").append(feedId);
            this.feedcommUrl = url.toString().replaceFirst("https?:", config.getStorageAdapter().isUseSSL() ? "wss:" : "ws:");
            log.infoFeedCommUrl(this.feedcommUrl);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot build URL to the server command-gateway endpoint", e);
        }
    }

    public boolean isConnected() {
        return this.webSocket != null;
    }

    public void connect() throws Exception {
        this.disconnect();
        if (this.destroyed) {
            return;
        }
        log.debugf("About to connect a feed WebSocket client to endpoint [%s]", this.feedcommUrl);
        this.webSocketCall = this.httpClientBuilder.createWebSocketCall(this.feedcommUrl, null);
        this.webSocketCall.enqueue((WebSocketListener)this);
    }

    public void disconnect() {
        if (this.webSocket != null) {
            try {
                this.webSocket.close(1000, "Shutting down FeedCommProcessor");
            }
            catch (Exception e) {
                log.warnFailedToCloseWebSocket(1000, "Shutting down FeedCommProcessor", e);
            }
            this.webSocket = null;
        }
        if (this.webSocketCall != null) {
            try {
                this.webSocketCall.cancel();
            }
            catch (Exception e) {
                log.errorCannotCloseWebSocketCall(e);
            }
            this.webSocketCall = null;
        }
    }

    public void destroy() {
        this.destroyed = true;
        this.stopReconnectJobThread();
        this.disconnect();
    }

    public void sendAsync(final BasicMessageWithExtraData<? extends BasicMessage> messageWithData) {
        if (this.webSocket == null) {
            throw new IllegalStateException("WebSocket connection was closed. Cannot send any messages");
        }
        final BasicMessage message = messageWithData.getBasicMessage();
        this.configurationAuthentication(message);
        this.sendExecutor.execute(new Runnable(){

            @Override
            public void run() {
                block6: {
                    try {
                        if (messageWithData.getBinaryData() == null) {
                            String messageString = ApiDeserializer.toHawkularFormat((BasicMessage)message);
                            Buffer buffer = new Buffer();
                            buffer.writeUtf8(messageString);
                            FeedCommProcessor.this.webSocket.sendMessage(WebSocket.PayloadType.TEXT, buffer);
                            break block6;
                        }
                        BinaryData messageData = ApiDeserializer.toHawkularFormat((BasicMessage)message, (InputStream)messageWithData.getBinaryData());
                        try (BufferedSink sink = FeedCommProcessor.this.webSocket.newMessageSink(WebSocket.PayloadType.BINARY);){
                            FeedCommProcessor.this.emitToSink(messageData, sink);
                        }
                    }
                    catch (Throwable t) {
                        log.errorFailedToSendOverFeedComm(message.getClass().getName(), t);
                    }
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendSync(BasicMessageWithExtraData<? extends BasicMessage> messageWithData) throws Exception {
        if (this.webSocket == null) {
            throw new IllegalStateException("WebSocket connection was closed. Cannot send any messages");
        }
        BasicMessage message = messageWithData.getBasicMessage();
        this.configurationAuthentication(message);
        if (messageWithData.getBinaryData() == null) {
            String messageString = ApiDeserializer.toHawkularFormat((BasicMessage)message);
            Buffer buffer = new Buffer();
            buffer.writeUtf8(messageString);
            this.webSocket.sendMessage(WebSocket.PayloadType.TEXT, buffer);
        } else {
            BinaryData messageData = ApiDeserializer.toHawkularFormat((BasicMessage)message, (InputStream)messageWithData.getBinaryData());
            try (BufferedSink sink = this.webSocket.newMessageSink(WebSocket.PayloadType.BINARY);){
                this.emitToSink(messageData, sink);
            }
        }
    }

    private void emitToSink(BinaryData in, BufferedSink out) throws RuntimeException {
        int bufferSize = 32768;
        try {
            BufferedInputStream input = new BufferedInputStream((InputStream)in, bufferSize);
            byte[] buffer = new byte[bufferSize];
            int bytesRead = ((InputStream)input).read(buffer);
            while (bytesRead != -1) {
                out.write(buffer, 0, bytesRead);
                out.flush();
                bytesRead = ((InputStream)input).read(buffer);
            }
        }
        catch (IOException ioe) {
            throw new RuntimeException("Failed to emit to sink", ioe);
        }
    }

    public void onOpen(WebSocket webSocket, Response response) {
        this.webSocket = webSocket;
        this.stopReconnectJobThread();
        log.infoOpenedFeedComm(this.feedcommUrl);
    }

    public void onClose(int reasonCode, String reason) {
        this.webSocket = null;
        log.infoClosedFeedComm(this.feedcommUrl, reasonCode, reason);
        if (1000 != reasonCode || !"Shutting down FeedCommProcessor".equals(reason)) {
            switch (reasonCode) {
                case 1008: {
                    break;
                }
                default: {
                    this.startReconnectJobThread();
                }
            }
        }
    }

    public void onFailure(IOException e, Response response) {
        if (response == null) {
            log.tracef("Feed communications had a failure - a reconnection is likely required: %s", e);
        } else {
            log.warnFeedCommFailure(response.toString(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onMessage(BufferedSource payload, WebSocket.PayloadType payloadType) throws IOException {
        BasicMessageWithExtraData response;
        String requestClassName = "?";
        try {
            try {
                BasicMessageWithExtraData msgWithData;
                switch (payloadType) {
                    case TEXT: {
                        String nameAndJsonStr = payload.readUtf8();
                        msgWithData = new ApiDeserializer().deserialize(nameAndJsonStr);
                        break;
                    }
                    case BINARY: {
                        InputStream input = payload.inputStream();
                        msgWithData = new ApiDeserializer().deserialize(input);
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Unknown payload type, please report this bug: " + payloadType);
                    }
                }
                log.debug("Received message from server");
                BasicMessage msg = msgWithData.getBasicMessage();
                requestClassName = msg.getClass().getName();
                Class<Command<?, ?>> commandClass = VALID_COMMANDS.get(requestClassName);
                if (commandClass == null) {
                    log.errorInvalidCommandRequestFeed(requestClassName);
                    String errorMessage = "Invalid command request: " + requestClassName;
                    GenericErrorResponse errorMsg = new GenericErrorResponseBuilder().setErrorMessage(errorMessage).build();
                    response = new BasicMessageWithExtraData((BasicMessage)errorMsg, null);
                } else {
                    Command<?, ?> command = commandClass.newInstance();
                    CommandContext context = new CommandContext(this, this.config, this.discoveryService);
                    response = command.execute(msgWithData, context);
                }
            }
            finally {
                payload.close();
            }
        }
        catch (Throwable t) {
            log.errorCommandExecutionFailureFeed(requestClassName, t);
            String errorMessage = "Command failed [" + requestClassName + "]";
            GenericErrorResponse errorMsg = new GenericErrorResponseBuilder().setThrowable(t).setErrorMessage(errorMessage).build();
            response = new BasicMessageWithExtraData((BasicMessage)errorMsg, null);
        }
        if (response != null) {
            try {
                this.sendSync((BasicMessageWithExtraData<? extends BasicMessage>)response);
            }
            catch (Exception e) {
                log.errorFailedToSendOverFeedComm(response.getClass().getName(), e);
            }
        }
    }

    public void onPong(Buffer buffer) {
    }

    private void configurationAuthentication(BasicMessage message) {
        if (!(message instanceof AuthMessage)) {
            return;
        }
        AuthMessage authMessage = (AuthMessage)message;
        Authentication auth = authMessage.getAuthentication();
        if (auth != null) {
            return;
        }
        auth = new Authentication();
        if (this.config.getStorageAdapter().getSecurityKey() != null) {
            auth.setUsername(this.config.getStorageAdapter().getSecurityKey());
            auth.setPassword(this.config.getStorageAdapter().getSecuritySecret());
        } else {
            auth.setUsername(this.config.getStorageAdapter().getUsername());
            auth.setPassword(this.config.getStorageAdapter().getPassword());
        }
        authMessage.setAuthentication(auth);
    }

    private void startReconnectJobThread() {
        ReconnectJobThread newReconnectJob = !this.destroyed ? new ReconnectJobThread() : null;
        ReconnectJobThread oldReconnectJob = this.reconnectJobThread.getAndSet(newReconnectJob);
        if (oldReconnectJob != null) {
            oldReconnectJob.interrupt();
        }
        if (newReconnectJob != null) {
            newReconnectJob.start();
        }
    }

    private void stopReconnectJobThread() {
        ReconnectJobThread reconnectJob = this.reconnectJobThread.getAndSet(null);
        if (reconnectJob != null) {
            reconnectJob.interrupt();
        }
    }

    static {
        VALID_COMMANDS.put(EchoCommand.REQUEST_CLASS.getName(), EchoCommand.class);
        VALID_COMMANDS.put(GenericErrorResponseCommand.REQUEST_CLASS.getName(), GenericErrorResponseCommand.class);
        VALID_COMMANDS.put(ExecuteOperationCommand.REQUEST_CLASS.getName(), ExecuteOperationCommand.class);
        VALID_COMMANDS.put(DeployApplicationCommand.REQUEST_CLASS.getName(), DeployApplicationCommand.class);
        VALID_COMMANDS.put(AddJdbcDriverCommand.REQUEST_CLASS.getName(), AddJdbcDriverCommand.class);
        VALID_COMMANDS.put(AddDatasourceCommand.REQUEST_CLASS.getName(), AddDatasourceCommand.class);
        VALID_COMMANDS.put(ExportJdrCommand.REQUEST_CLASS.getName(), ExportJdrCommand.class);
        VALID_COMMANDS.put(RemoveDatasourceCommand.REQUEST_CLASS.getName(), RemoveDatasourceCommand.class);
        VALID_COMMANDS.put(RemoveJdbcDriverCommand.REQUEST_CLASS.getName(), RemoveJdbcDriverCommand.class);
        VALID_COMMANDS.put(UpdateDatasourceCommand.REQUEST_CLASS.getName(), UpdateDatasourceCommand.class);
    }

    private class ReconnectJobThread
    extends Thread {
        public ReconnectJobThread() {
            super("Hawkular WildFly Monitor Websocket Reconnect Thread");
            this.setDaemon(true);
        }

        @Override
        public void run() {
            int attemptCount = 0;
            long sleepInterval = 1000L;
            boolean keepTrying = true;
            while (keepTrying && !FeedCommProcessor.this.destroyed) {
                try {
                    ++attemptCount;
                    Thread.sleep(1000L);
                    if (!FeedCommProcessor.this.isConnected()) {
                        if (attemptCount % 60 == 0) {
                            log.errorCannotReconnectToWebSocket(new Exception("Attempt #" + attemptCount));
                        }
                        FeedCommProcessor.this.connect();
                        continue;
                    }
                    keepTrying = false;
                }
                catch (InterruptedException ie) {
                    keepTrying = false;
                }
                catch (Exception e) {
                    if (attemptCount % 60 != 0) continue;
                    log.errorCannotReconnectToWebSocket(e);
                }
            }
        }
    }
}

