/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;

public class NioEchoServer
extends Thread {
    private static final double EPS = 1.0E-4;
    private final int port;
    private final ServerSocketChannel serverSocketChannel;
    private final List<SocketChannel> newChannels;
    private final List<SocketChannel> socketChannels;
    private final AcceptorThread acceptorThread;
    private final Selector selector;
    private volatile WritableByteChannel outputChannel;
    private final CredentialCache credentialCache;
    private final Metrics metrics;
    private volatile int numSent = 0;
    private volatile boolean closeKafkaChannels;
    private final DelegationTokenCache tokenCache;

    public NioEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig config, String serverHost, ChannelBuilder channelBuilder, CredentialCache credentialCache) throws Exception {
        super("echoserver");
        this.setDaemon(true);
        this.serverSocketChannel = ServerSocketChannel.open();
        this.serverSocketChannel.configureBlocking(false);
        this.serverSocketChannel.socket().bind(new InetSocketAddress(serverHost, 0));
        this.port = this.serverSocketChannel.socket().getLocalPort();
        this.socketChannels = Collections.synchronizedList(new ArrayList());
        this.newChannels = Collections.synchronizedList(new ArrayList());
        this.credentialCache = credentialCache;
        this.tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames());
        if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
            for (String mechanism : ScramMechanism.mechanismNames()) {
                if (credentialCache.cache(mechanism, ScramCredential.class) != null) continue;
                credentialCache.createCache(mechanism, ScramCredential.class);
            }
        }
        if (channelBuilder == null) {
            channelBuilder = ChannelBuilders.serverChannelBuilder((ListenerName)listenerName, (boolean)false, (SecurityProtocol)securityProtocol, (AbstractConfig)config, (CredentialCache)credentialCache, (DelegationTokenCache)this.tokenCache);
        }
        this.metrics = new Metrics();
        this.selector = new Selector(5000L, this.metrics, (Time)new MockTime(), "MetricGroup", channelBuilder, new LogContext());
        this.acceptorThread = new AcceptorThread();
    }

    public int port() {
        return this.port;
    }

    public CredentialCache credentialCache() {
        return this.credentialCache;
    }

    public DelegationTokenCache tokenCache() {
        return this.tokenCache;
    }

    public double metricValue(String name) {
        for (Map.Entry entry : this.metrics.metrics().entrySet()) {
            if (!((MetricName)entry.getKey()).name().equals(name)) continue;
            return ((KafkaMetric)entry.getValue()).value();
        }
        throw new IllegalStateException("Metric not found, " + name + ", found=" + this.metrics.metrics().keySet());
    }

    public void verifyAuthenticationMetrics(int successfulAuthentications, int failedAuthentications) throws InterruptedException {
        this.waitForMetric("successful-authentication", successfulAuthentications);
        this.waitForMetric("failed-authentication", failedAuthentications);
    }

    private void waitForMetric(String name, final double expectedValue) throws InterruptedException {
        final String totalName = name + "-total";
        final String rateName = name + "-rate";
        if (expectedValue == 0.0) {
            Assert.assertEquals((double)expectedValue, (double)this.metricValue(totalName), (double)1.0E-4);
            Assert.assertEquals((double)expectedValue, (double)this.metricValue(rateName), (double)1.0E-4);
        } else {
            TestUtils.waitForCondition(new TestCondition(){

                @Override
                public boolean conditionMet() {
                    return Math.abs(NioEchoServer.this.metricValue(totalName) - expectedValue) <= 1.0E-4;
                }
            }, "Metric not updated " + totalName);
            TestUtils.waitForCondition(new TestCondition(){

                @Override
                public boolean conditionMet() {
                    return NioEchoServer.this.metricValue(rateName) > 0.0;
                }
            }, "Metric not updated " + rateName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            this.acceptorThread.start();
            while (this.serverSocketChannel.isOpen()) {
                this.selector.poll(1000L);
                List<SocketChannel> list = this.newChannels;
                synchronized (list) {
                    for (SocketChannel socketChannel : this.newChannels) {
                        String id = this.id(socketChannel);
                        this.selector.register(id, socketChannel);
                        this.socketChannels.add(socketChannel);
                    }
                    this.newChannels.clear();
                }
                if (this.closeKafkaChannels) {
                    for (KafkaChannel channel : this.selector.channels()) {
                        this.selector.close(channel.id());
                    }
                }
                List completedReceives = this.selector.completedReceives();
                for (NetworkReceive rcv : completedReceives) {
                    KafkaChannel channel = this.channel(rcv.source());
                    String channelId = channel.id();
                    this.selector.mute(channelId);
                    NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
                    if (this.outputChannel == null) {
                        this.selector.send((Send)send);
                        continue;
                    }
                    for (ByteBuffer buffer : send.buffers) {
                        this.outputChannel.write(buffer);
                    }
                    this.selector.unmute(channelId);
                }
                for (Send send : this.selector.completedSends()) {
                    this.selector.unmute(send.destination());
                    ++this.numSent;
                }
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    public int numSent() {
        return this.numSent;
    }

    private String id(SocketChannel channel) {
        return channel.socket().getLocalAddress().getHostAddress() + ":" + channel.socket().getLocalPort() + "-" + channel.socket().getInetAddress().getHostAddress() + ":" + channel.socket().getPort();
    }

    private KafkaChannel channel(String id) {
        KafkaChannel channel = this.selector.channel(id);
        return channel == null ? this.selector.closingChannel(id) : channel;
    }

    public void outputChannel(WritableByteChannel channel) {
        this.outputChannel = channel;
    }

    public Selector selector() {
        return this.selector;
    }

    public void closeKafkaChannels() throws IOException {
        this.closeKafkaChannels = true;
        this.selector.wakeup();
        try {
            TestUtils.waitForCondition(() -> this.selector.channels().isEmpty(), "Channels not closed");
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        finally {
            this.closeKafkaChannels = false;
        }
    }

    public void closeSocketChannels() throws IOException {
        for (SocketChannel channel : this.socketChannels) {
            channel.close();
        }
        this.socketChannels.clear();
    }

    public void close() throws IOException, InterruptedException {
        this.serverSocketChannel.close();
        this.closeSocketChannels();
        this.acceptorThread.interrupt();
        this.acceptorThread.join();
        this.interrupt();
        this.join();
    }

    private class AcceptorThread
    extends Thread {
        public AcceptorThread() throws IOException {
            this.setName("acceptor");
        }

        @Override
        public void run() {
            try {
                java.nio.channels.Selector acceptSelector = java.nio.channels.Selector.open();
                NioEchoServer.this.serverSocketChannel.register(acceptSelector, 16);
                while (NioEchoServer.this.serverSocketChannel.isOpen()) {
                    if (acceptSelector.select(1000L) <= 0) continue;
                    Iterator<SelectionKey> it = acceptSelector.selectedKeys().iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        if (key.isAcceptable()) {
                            SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();
                            socketChannel.configureBlocking(false);
                            NioEchoServer.this.newChannels.add(socketChannel);
                            NioEchoServer.this.selector.wakeup();
                        }
                        it.remove();
                    }
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

