package org.apache.camel.component.websocket;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.StopWatch;

/* loaded from: input_file:org/apache/camel/component/websocket/WebsocketProducer.class */
public class WebsocketProducer extends DefaultProducer implements WebsocketProducerConsumer {
    private WebsocketStore store;
    private final Boolean sendToAll;
    private final WebsocketEndpoint endpoint;

    public WebsocketProducer(WebsocketEndpoint websocketEndpoint) {
        super(websocketEndpoint);
        this.sendToAll = websocketEndpoint.getSendToAll();
        this.endpoint = websocketEndpoint;
    }

    public void process(Exchange exchange) throws Exception {
        Message in = exchange.getIn();
        Object mandatoryBody = in.getMandatoryBody();
        if (mandatoryBody != null && !(mandatoryBody instanceof String) && !(mandatoryBody instanceof byte[])) {
            mandatoryBody = in.getMandatoryBody(String.class);
        }
        if (isSendToAllSet(in)) {
            sendToAll(this.store, mandatoryBody, exchange);
            return;
        }
        String str = (String) in.getHeader(WebsocketConstants.CONNECTION_KEY, String.class);
        if (str == null) {
            throw new WebsocketSendException("Failed to send message to single connection; connection key not set.", exchange);
        }
        DefaultWebsocket defaultWebsocket = this.store.get(str + (this.endpoint.getResourceUri() != null ? WebsocketComponent.createPathSpec(this.endpoint.getResourceUri()) : ""));
        this.log.debug("Sending to connection key {} -> {}", str, mandatoryBody);
        Future<Void> sendMessage = sendMessage(defaultWebsocket, mandatoryBody);
        if (sendMessage != null) {
            int intValue = this.endpoint.getSendTimeout().intValue();
            sendMessage.get(intValue, TimeUnit.MILLISECONDS);
            if (!sendMessage.isCancelled() && !sendMessage.isDone()) {
                throw new WebsocketSendException("Failed to send message to the connection within " + intValue + " millis.", exchange);
            }
        }
    }

    @Override // org.apache.camel.component.websocket.WebsocketProducerConsumer
    /* renamed from: getEndpoint, reason: merged with bridge method [inline-methods] */
    public WebsocketEndpoint m4getEndpoint() {
        return this.endpoint;
    }

    public void doStart() throws Exception {
        super.doStart();
        this.endpoint.connect(this);
    }

    public void doStop() throws Exception {
        this.endpoint.disconnect(this);
        super.doStop();
    }

    boolean isSendToAllSet(Message message) {
        Boolean bool = (Boolean) message.getHeader(WebsocketConstants.SEND_TO_ALL, this.sendToAll, Boolean.class);
        if (bool == null) {
            return false;
        }
        return bool.booleanValue();
    }

    void sendToAll(WebsocketStore websocketStore, Object obj, Exchange exchange) throws Exception {
        this.log.debug("Sending to all {}", obj);
        Collection<DefaultWebsocket> all = websocketStore.getAll();
        CamelExchangeException camelExchangeException = null;
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        for (DefaultWebsocket defaultWebsocket : all) {
            boolean z = false;
            if (this.endpoint.getResourceUri() == null) {
                z = true;
            } else if (defaultWebsocket.getPathSpec().equals(WebsocketComponent.createPathSpec(this.endpoint.getResourceUri()))) {
                z = true;
            }
            if (z) {
                try {
                    Future<Void> sendMessage = sendMessage(defaultWebsocket, obj);
                    if (sendMessage != null) {
                        copyOnWriteArrayList.add(sendMessage);
                    }
                } catch (Exception e) {
                    if (camelExchangeException == null) {
                        camelExchangeException = new WebsocketSendException("Failed to deliver message to one or more recipients.", exchange, e);
                    }
                }
            }
        }
        StopWatch stopWatch = new StopWatch();
        int intValue = this.endpoint.getSendTimeout().intValue();
        while (!copyOnWriteArrayList.isEmpty() && stopWatch.taken() < intValue) {
            copyOnWriteArrayList.removeIf(future -> {
                return future.isDone() || future.isCancelled();
            });
            if (!copyOnWriteArrayList.isEmpty()) {
                long min = Math.min(1000, intValue);
                this.log.debug("Sleeping {} millis waiting for sendToAll to complete sending with timeout {} millis", Long.valueOf(min), Integer.valueOf(intValue));
                try {
                    Thread.sleep(min);
                } catch (InterruptedException e2) {
                    handleSleepInterruptedException(e2, exchange);
                }
            }
        }
        if (!copyOnWriteArrayList.isEmpty()) {
            camelExchangeException = new WebsocketSendException("Failed to deliver message within " + this.endpoint.getSendTimeout() + " millis to one or more recipients.", exchange);
        }
        if (camelExchangeException != null) {
            throw camelExchangeException;
        }
    }

    Future<Void> sendMessage(DefaultWebsocket defaultWebsocket, Object obj) throws IOException {
        Future<Void> future = null;
        if (defaultWebsocket != null && defaultWebsocket.getSession().isOpen()) {
            this.log.trace("Sending to websocket {} -> {}", defaultWebsocket.getConnectionKey(), obj);
            if (obj instanceof String) {
                future = defaultWebsocket.getSession().getRemote().sendStringByFuture((String) obj);
            } else if (obj instanceof byte[]) {
                future = defaultWebsocket.getSession().getRemote().sendBytesByFuture(ByteBuffer.wrap((byte[]) obj));
            }
        }
        return future;
    }

    public void setStore(WebsocketStore websocketStore) {
        this.store = websocketStore;
    }

    protected void handleSleepInterruptedException(InterruptedException interruptedException, Exchange exchange) throws InterruptedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sleep interrupted, are we stopping? {}", Boolean.valueOf(isStopping() || isStopped()));
        }
        Thread.currentThread().interrupt();
        throw interruptedException;
    }
}
