/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.client.impl.nio;

import com.rabbitmq.client.impl.Environment;
import com.rabbitmq.client.impl.Frame;
import com.rabbitmq.client.impl.nio.NioLoopContext;
import com.rabbitmq.client.impl.nio.NioParams;
import com.rabbitmq.client.impl.nio.SelectorHolder;
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerState;
import com.rabbitmq.client.impl.nio.SocketChannelRegistration;
import com.rabbitmq.client.impl.nio.WriteRequest;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NioLoop
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(NioLoop.class);
    private final NioLoopContext context;
    private final NioParams nioParams;

    public NioLoop(NioParams nioParams, NioLoopContext loopContext) {
        this.nioParams = nioParams;
        this.context = loopContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        SelectorHolder selectorState = this.context.readSelectorState;
        Selector selector = selectorState.selector;
        Set<SocketChannelRegistration> registrations = selectorState.registrations;
        ByteBuffer buffer = this.context.readBuffer;
        SelectorHolder writeSelectorState = this.context.writeSelectorState;
        Selector writeSelector = writeSelectorState.selector;
        Set<SocketChannelRegistration> writeRegistrations = writeSelectorState.registrations;
        boolean writeRegistered = false;
        try {
            while (!Thread.currentThread().isInterrupted()) {
                int select;
                for (SelectionKey selectionKey : selector.keys()) {
                    long now;
                    SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState)selectionKey.attachment();
                    if (state.getConnection() == null || state.getConnection().getHeartbeat() <= 0 || (now = System.currentTimeMillis()) - state.getLastActivity() <= (long)(state.getConnection().getHeartbeat() * 1000 * 2)) continue;
                    try {
                        state.getConnection().handleHeartbeatFailure();
                    }
                    catch (Exception e) {
                        LOGGER.warn("Error after heartbeat failure of connection {}", (Object)state.getConnection());
                    }
                    finally {
                        selectionKey.cancel();
                    }
                }
                if (!writeRegistered && registrations.isEmpty() && writeRegistrations.isEmpty()) {
                    boolean clean;
                    select = selector.select(1000L);
                    if (selector.keys().size() == 0 && (clean = this.context.cleanUp())) {
                        return;
                    }
                } else {
                    select = selector.selectNow();
                }
                writeRegistered = false;
                Iterator<SocketChannelRegistration> registrationIterator = registrations.iterator();
                while (registrationIterator.hasNext()) {
                    SocketChannelRegistration registration = registrationIterator.next();
                    registrationIterator.remove();
                    int operations = registration.operations;
                    registration.state.getChannel().register(selector, operations, registration.state);
                }
                if (select > 0) {
                    Set<SelectionKey> readyKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = readyKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (!key.isValid() || !key.isReadable()) continue;
                        SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState)key.attachment();
                        try {
                            if (!state.getChannel().isOpen()) {
                                key.cancel();
                                continue;
                            }
                            if (state.getConnection() == null) continue;
                            DataInputStream inputStream = state.inputStream;
                            state.prepareForReadSequence();
                            while (state.continueReading()) {
                                Frame frame = Frame.readFrom(inputStream);
                                try {
                                    boolean noProblem = state.getConnection().handleReadFrame(frame);
                                    if (!noProblem || state.getConnection().isRunning() && !state.getConnection().hasBrokerInitiatedShutdown()) continue;
                                    this.dispatchShutdownToConnection(state);
                                    key.cancel();
                                }
                                catch (Throwable ex) {
                                    this.handleIoError(state, ex);
                                    key.cancel();
                                }
                                break;
                            }
                            state.setLastActivity(System.currentTimeMillis());
                        }
                        catch (Exception e) {
                            LOGGER.warn("Error during reading frames", (Throwable)e);
                            this.handleIoError(state, e);
                            key.cancel();
                        }
                        finally {
                            buffer.clear();
                        }
                    }
                }
                select = writeSelector.selectNow();
                Iterator<SocketChannelRegistration> writeRegistrationIterator = writeRegistrations.iterator();
                while (writeRegistrationIterator.hasNext()) {
                    SocketChannelRegistration writeRegistration = writeRegistrationIterator.next();
                    writeRegistrationIterator.remove();
                    int operations = writeRegistration.operations;
                    try {
                        if (!writeRegistration.state.getChannel().isOpen()) continue;
                        writeRegistration.state.getChannel().register(writeSelector, operations, writeRegistration.state);
                        writeRegistered = true;
                    }
                    catch (Exception e) {
                        LOGGER.info("Error while registering socket channel for write: {}", (Object)e.getMessage());
                    }
                }
                if (select <= 0) continue;
                Set<SelectionKey> readyKeys = writeSelector.selectedKeys();
                Iterator<SelectionKey> iterator = readyKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState)key.attachment();
                    if (!key.isValid() || !key.isWritable()) continue;
                    boolean cancelKey = true;
                    try {
                        WriteRequest request;
                        if (!state.getChannel().isOpen()) {
                            key.cancel();
                            continue;
                        }
                        state.prepareForWriteSequence();
                        int toBeWritten = state.getWriteQueue().size();
                        DataOutputStream outputStream = state.outputStream;
                        for (int written = 0; written <= toBeWritten && (request = state.getWriteQueue().poll()) != null; ++written) {
                            request.handle(outputStream);
                        }
                        outputStream.flush();
                        if (state.getWriteQueue().isEmpty()) continue;
                        cancelKey = true;
                    }
                    catch (Exception e) {
                        this.handleIoError(state, e);
                    }
                    finally {
                        state.endWriteSequence();
                        if (!cancelKey) continue;
                        key.cancel();
                    }
                }
            }
        }
        catch (Exception e) {
            LOGGER.error("Error in NIO loop", (Throwable)e);
        }
    }

    protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex) {
        if (this.needToDispatchIoError(state)) {
            this.dispatchIoErrorToConnection(state, ex);
        } else {
            try {
                state.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    protected boolean needToDispatchIoError(SocketChannelFrameHandlerState state) {
        return state.getConnection().isOpen();
    }

    protected void dispatchIoErrorToConnection(final SocketChannelFrameHandlerState state, final Throwable ex) {
        Runnable shutdown = new Runnable(){

            @Override
            public void run() {
                state.getConnection().handleIoError(ex);
            }
        };
        if (this.executorService() == null) {
            String name = "rabbitmq-connection-shutdown-" + state.getConnection();
            Thread shutdownThread = Environment.newThread(this.threadFactory(), shutdown, name);
            shutdownThread.start();
        } else {
            this.executorService().submit(shutdown);
        }
    }

    protected void dispatchShutdownToConnection(final SocketChannelFrameHandlerState state) {
        Runnable shutdown = new Runnable(){

            @Override
            public void run() {
                state.getConnection().doFinalShutdown();
            }
        };
        if (this.executorService() == null) {
            String name = "rabbitmq-connection-shutdown-" + state.getConnection();
            Thread shutdownThread = Environment.newThread(this.threadFactory(), shutdown, name);
            shutdownThread.start();
        } else {
            this.executorService().submit(shutdown);
        }
    }

    private ExecutorService executorService() {
        return this.nioParams.getNioExecutor();
    }

    private ThreadFactory threadFactory() {
        return this.nioParams.getThreadFactory();
    }
}

