/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.resp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.RespServer;
import org.infinispan.server.resp.logging.Log;

public class SubscriberHandler
implements RespRequestHandler {
    private static final Log log = (Log)LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);
    public static final byte[] PREFIX_CHANNEL_BYTES = new byte[]{-114, 16, 78, -3, 127};
    private final Resp3Handler handler;
    private final RespServer respServer;
    Map<WrappedByteArray, PubSubListener> specificChannelSubscribers = new HashMap<WrappedByteArray, PubSubListener>();

    public SubscriberHandler(RespServer respServer, Resp3Handler prevHandler) {
        this.respServer = respServer;
        this.handler = prevHandler;
    }

    public static byte[] keyToChannel(byte[] keyBytes) {
        byte[] result = new byte[keyBytes.length + PREFIX_CHANNEL_BYTES.length];
        System.arraycopy(PREFIX_CHANNEL_BYTES, 0, result, 0, PREFIX_CHANNEL_BYTES.length);
        System.arraycopy(keyBytes, 0, result, PREFIX_CHANNEL_BYTES.length, keyBytes.length);
        return result;
    }

    public static byte[] channelToKey(byte[] channelBytes) {
        return Arrays.copyOfRange(channelBytes, PREFIX_CHANNEL_BYTES.length, channelBytes.length);
    }

    @Override
    public RespRequestHandler handleRequest(ChannelHandlerContext ctx, String type, List<byte[]> arguments) {
        switch (type) {
            case "SUBSCRIBE": {
                for (byte[] keyChannel : arguments) {
                    WrappedByteArray wrappedByteArray;
                    if (log.isTraceEnabled()) {
                        log.tracef("Subscriber for channel: " + CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel)), new Object[0]);
                    }
                    if (this.specificChannelSubscribers.get(wrappedByteArray = new WrappedByteArray(keyChannel)) != null) continue;
                    PubSubListener pubSubListener = new PubSubListener(ctx.channel());
                    this.specificChannelSubscribers.put(wrappedByteArray, pubSubListener);
                    byte[] channel = SubscriberHandler.keyToChannel(keyChannel);
                    this.respServer.getCache().addListenerAsync((Object)pubSubListener, (key, prevValue, prevMetadata, value, metadata, eventType) -> Arrays.equals(key, channel), null).whenComplete((ignore, t) -> {
                        if (t != null) {
                            log.exceptionWhileRegisteringListener((Throwable)t, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(channel)));
                            ctx.writeAndFlush((Object)"-ERR Failure adding client listener");
                        } else {
                            ByteBuf subscribeBuffer = ctx.alloc().buffer(20 + (int)Math.log10(keyChannel.length) + 1 + keyChannel.length + 2);
                            subscribeBuffer.writeCharSequence((CharSequence)("*2\r\n$9\r\nsubscribe\r\n$" + keyChannel.length + "\r\n"), CharsetUtil.UTF_8);
                            subscribeBuffer.writeBytes(keyChannel);
                            subscribeBuffer.writeCharSequence((CharSequence)"\r\n", CharsetUtil.UTF_8);
                            ctx.writeAndFlush((Object)subscribeBuffer);
                        }
                    });
                }
                break;
            }
            case "UNSUBSCRIBE": {
                if (arguments.size() == 0) {
                    Iterator<Map.Entry<WrappedByteArray, PubSubListener>> iterator = this.specificChannelSubscribers.entrySet().iterator();
                    while (iterator.hasNext()) {
                        Map.Entry<WrappedByteArray, PubSubListener> entry = iterator.next();
                        PubSubListener listener = entry.getValue();
                        this.respServer.getCache().removeListenerAsync((Object)listener);
                        iterator.remove();
                        this.sendUnsubscribe(ctx, entry.getKey().getBytes());
                    }
                } else {
                    for (byte[] keyChannel : arguments) {
                        WrappedByteArray wrappedByteArray = new WrappedByteArray(keyChannel);
                        PubSubListener listener = this.specificChannelSubscribers.remove(wrappedByteArray);
                        if (listener != null) {
                            this.respServer.getCache().removeListenerAsync((Object)listener).whenComplete((ignore, t) -> {
                                if (t != null) {
                                    log.exceptionWhileRemovingListener((Throwable)t, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel)));
                                    ctx.writeAndFlush((Object)"-ERR Failure unsubscribing client listener");
                                } else {
                                    this.sendUnsubscribe(ctx, keyChannel);
                                }
                            });
                            continue;
                        }
                        this.sendUnsubscribe(ctx, keyChannel);
                    }
                }
                break;
            }
            case "PING": {
                this.handler.handleRequest(ctx, type, arguments);
                break;
            }
            case "RESET": {
                Iterator<Map.Entry<WrappedByteArray, PubSubListener>> iterator = this.specificChannelSubscribers.entrySet().iterator();
                while (iterator.hasNext()) {
                    Map.Entry<WrappedByteArray, PubSubListener> entry = iterator.next();
                    PubSubListener listener = entry.getValue();
                    this.respServer.getCache().removeListenerAsync((Object)listener);
                    iterator.remove();
                    this.sendUnsubscribe(ctx, entry.getKey().getBytes());
                }
                return this.handler.handleRequest(ctx, type, arguments);
            }
            case "QUIT": {
                ctx.close();
                break;
            }
            case "PSUBSCRIBE": 
            case "PUNSUBSCRIBE": {
                ctx.writeAndFlush((Object)RespRequestHandler.stringToByteBuf("-ERR not implemented yet\r\n", ctx.alloc()));
                break;
            }
            default: {
                return RespRequestHandler.super.handleRequest(ctx, type, arguments);
            }
        }
        return this;
    }

    private void sendUnsubscribe(ChannelHandlerContext ctx, byte[] keyChannel) {
        ByteBuf subscribeBuffer = ctx.alloc().buffer(22 + (int)Math.log10(keyChannel.length) + 1 + keyChannel.length + 2);
        subscribeBuffer.writeCharSequence((CharSequence)("*2\r\n$11\r\nunsubscribe\r\n$" + keyChannel.length + "\r\n"), CharsetUtil.UTF_8);
        subscribeBuffer.writeBytes(keyChannel);
        subscribeBuffer.writeCharSequence((CharSequence)"\r\n", CharsetUtil.UTF_8);
        ctx.writeAndFlush((Object)subscribeBuffer);
    }

    @Listener(clustered=true)
    static class PubSubListener {
        private final Channel channel;

        PubSubListener(Channel channel) {
            this.channel = channel;
        }

        @CacheEntryCreated
        @CacheEntryModified
        public CompletionStage<Void> onEvent(CacheEntryEvent<byte[], byte[]> entryEvent) {
            byte[] key = SubscriberHandler.channelToKey((byte[])entryEvent.getKey());
            byte[] value = (byte[])entryEvent.getValue();
            if (key.length > 0 && value != null && value.length > 0) {
                ByteBuf byteBuf = this.channel.alloc().buffer(14 + (int)Math.log10(key.length) + 1 + 2 + key.length + 2 + 1 + (int)Math.log10(value.length) + 1 + 2 + value.length + 2);
                byteBuf.writeCharSequence((CharSequence)("*3\r\n$7\r\nmessage\r\n$" + key.length + "\r\n"), CharsetUtil.UTF_8);
                byteBuf.writeBytes(key);
                byteBuf.writeCharSequence((CharSequence)("\r\n$" + value.length + "\r\n"), CharsetUtil.UTF_8);
                byteBuf.writeBytes(value);
                byteBuf.writeCharSequence((CharSequence)"\r\n", CharsetUtil.UTF_8);
                this.channel.writeAndFlush((Object)byteBuf);
            }
            return CompletableFutures.completedNull();
        }
    }
}

