/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.netty.channel.AllocatingTransformer;
import io.reactivex.netty.channel.AppendTransformerEvent;
import io.reactivex.netty.channel.ChannelOperations;
import io.reactivex.netty.channel.FlushSelectorOperator;
import io.reactivex.netty.channel.SubscriberToChannelFutureBridge;
import io.reactivex.netty.channel.events.ConnectionEventListener;
import io.reactivex.netty.events.Clock;
import io.reactivex.netty.events.EventPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Actions;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;

public class DefaultChannelOperations<W>
implements ChannelOperations<W> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultChannelOperations.class);
    private static final AtomicIntegerFieldUpdater<DefaultChannelOperations> CLOSE_ISSUED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultChannelOperations.class, "closeIssued");
    private volatile int closeIssued;
    private final Channel nettyChannel;
    private final ConnectionEventListener eventListener;
    private final EventPublisher eventPublisher;
    private final Observable<Void> closeObservable;
    private final Observable<Void> flushAndCloseObservable;
    private final Func1<W, Boolean> flushOnEachSelector = new Func1<W, Boolean>(){

        public Boolean call(W w) {
            return true;
        }
    };

    public DefaultChannelOperations(Channel nettyChannel, ConnectionEventListener eventListener, EventPublisher eventPublisher) {
        this.nettyChannel = nettyChannel;
        this.eventListener = eventListener;
        this.eventPublisher = eventPublisher;
        this.closeObservable = Observable.create((Observable.OnSubscribe)new OnSubscribeForClose(nettyChannel));
        this.flushAndCloseObservable = this.closeObservable.doOnSubscribe(new Action0(){

            public void call() {
                DefaultChannelOperations.this.flush();
            }
        });
    }

    @Override
    public Observable<Void> write(Observable<W> msgs) {
        return this._write(msgs);
    }

    @Override
    public Observable<Void> write(Observable<W> msgs, Func1<W, Boolean> flushSelector) {
        return this._write(msgs, flushSelector);
    }

    @Override
    public Observable<Void> writeAndFlushOnEach(Observable<W> msgs) {
        return this._write(msgs, this.flushOnEachSelector);
    }

    @Override
    public Observable<Void> writeString(Observable<String> msgs) {
        return this._write(msgs);
    }

    @Override
    public Observable<Void> writeString(Observable<String> msgs, Func1<String, Boolean> flushSelector) {
        return this._write(msgs, flushSelector);
    }

    @Override
    public Observable<Void> writeStringAndFlushOnEach(Observable<String> msgs) {
        return this.writeString(msgs, (Func1<String, Boolean>)FLUSH_ON_EACH_STRING);
    }

    @Override
    public Observable<Void> writeBytes(Observable<byte[]> msgs) {
        return this._write(msgs);
    }

    @Override
    public Observable<Void> writeBytes(Observable<byte[]> msgs, Func1<byte[], Boolean> flushSelector) {
        return this._write(msgs, flushSelector);
    }

    @Override
    public Observable<Void> writeBytesAndFlushOnEach(Observable<byte[]> msgs) {
        return this._write(msgs, FLUSH_ON_EACH_BYTES);
    }

    @Override
    public Observable<Void> writeFileRegion(Observable<FileRegion> msgs) {
        return this._write(msgs);
    }

    @Override
    public Observable<Void> writeFileRegion(Observable<FileRegion> msgs, Func1<FileRegion, Boolean> flushSelector) {
        return this._write(msgs, flushSelector);
    }

    @Override
    public Observable<Void> writeFileRegionAndFlushOnEach(Observable<FileRegion> msgs) {
        return this.writeFileRegion(msgs, (Func1<FileRegion, Boolean>)FLUSH_ON_EACH_FILE_REGION);
    }

    @Override
    public <WW> ChannelOperations<WW> transformWrite(AllocatingTransformer<WW, W> transformer) {
        this.nettyChannel.pipeline().fireUserEventTriggered(new AppendTransformerEvent<WW, W>(transformer));
        return new DefaultChannelOperations<W>(this.nettyChannel, this.eventListener, this.eventPublisher);
    }

    @Override
    public void flush() {
        if (this.eventPublisher.publishingEnabled()) {
            final long startTimeNanos = Clock.newStartTimeNanos();
            this.eventListener.onFlushStart();
            if (this.nettyChannel.eventLoop().inEventLoop()) {
                this._flushInEventloop(startTimeNanos);
            } else {
                this.nettyChannel.eventLoop().execute(new Runnable(){

                    @Override
                    public void run() {
                        DefaultChannelOperations.this._flushInEventloop(startTimeNanos);
                    }
                });
            }
        } else {
            this.nettyChannel.flush();
        }
    }

    @Override
    public Observable<Void> close() {
        return this.close(true);
    }

    @Override
    public Observable<Void> close(boolean flush) {
        return flush ? this.flushAndCloseObservable : this.closeObservable;
    }

    @Override
    public void closeNow() {
        this.close().subscribe((Action1)Actions.empty(), (Action1)new Action1<Throwable>(){

            public void call(Throwable throwable) {
                logger.error("Error closing connection.", throwable);
            }
        });
    }

    @Override
    public Observable<Void> closeListener() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Void>(){

            public void call(final Subscriber<? super Void> subscriber) {
                SubscriberToChannelFutureBridge l = new SubscriberToChannelFutureBridge(){

                    @Override
                    protected void doOnSuccess(ChannelFuture future) {
                        subscriber.onCompleted();
                    }

                    @Override
                    protected void doOnFailure(ChannelFuture future, Throwable cause) {
                        subscriber.onCompleted();
                    }
                };
                l.bridge(DefaultChannelOperations.this.nettyChannel.closeFuture(), subscriber);
            }
        });
    }

    private <X> Observable<Void> _write(Observable<X> msgs, Func1<X, Boolean> flushSelector) {
        return this._write(msgs.lift(new FlushSelectorOperator<X>(flushSelector, this)));
    }

    private void _flushInEventloop(long startTimeNanos) {
        assert (this.nettyChannel.eventLoop().inEventLoop());
        this.nettyChannel.flush();
        this.eventListener.onFlushComplete(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS);
    }

    private Observable<Void> _write(final Observable<?> msgs) {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Void>(){

            public void call(final Subscriber<? super Void> subscriber) {
                final long startTimeNanos = Clock.newStartTimeNanos();
                if (DefaultChannelOperations.this.eventPublisher.publishingEnabled()) {
                    DefaultChannelOperations.this.eventListener.onWriteStart();
                }
                if (DefaultChannelOperations.this.nettyChannel.eventLoop().inEventLoop()) {
                    this._writeStreamToChannel(subscriber, startTimeNanos);
                } else {
                    DefaultChannelOperations.this.nettyChannel.eventLoop().execute(new Runnable(){

                        @Override
                        public void run() {
                            this._writeStreamToChannel((Subscriber<? super Void>)subscriber, startTimeNanos);
                        }
                    });
                }
            }

            private void _writeStreamToChannel(final Subscriber<? super Void> subscriber, final long startTimeNanos) {
                final ChannelFuture writeFuture = DefaultChannelOperations.this.nettyChannel.write((Object)msgs.doOnCompleted(new Action0(){

                    public void call() {
                        Boolean shdNotFlush = (Boolean)DefaultChannelOperations.this.nettyChannel.attr(ChannelOperations.FLUSH_ONLY_ON_READ_COMPLETE).get();
                        if (null == shdNotFlush || !shdNotFlush.booleanValue()) {
                            DefaultChannelOperations.this.flush();
                        }
                    }
                }));
                subscriber.add(Subscriptions.create((Action0)new Action0(){

                    public void call() {
                        writeFuture.cancel(false);
                    }
                }));
                writeFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (subscriber.isUnsubscribed()) {
                            return;
                        }
                        if (future.isSuccess()) {
                            if (DefaultChannelOperations.this.eventPublisher.publishingEnabled()) {
                                DefaultChannelOperations.this.eventListener.onWriteSuccess(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS);
                            }
                            subscriber.onCompleted();
                        } else {
                            if (DefaultChannelOperations.this.eventPublisher.publishingEnabled()) {
                                DefaultChannelOperations.this.eventListener.onWriteFailed(Clock.onEndNanos(startTimeNanos), TimeUnit.NANOSECONDS, future.cause());
                            }
                            subscriber.onError(future.cause());
                        }
                    }
                });
            }
        });
    }

    private class OnSubscribeForClose
    implements Observable.OnSubscribe<Void> {
        private final Channel nettyChannel;

        public OnSubscribeForClose(Channel nettyChannel) {
            this.nettyChannel = nettyChannel;
        }

        public void call(Subscriber<? super Void> subscriber) {
            ChannelCloseListener closeListener;
            long closeStartTimeNanos = Clock.newStartTimeNanos();
            if (CLOSE_ISSUED_UPDATER.compareAndSet(DefaultChannelOperations.this, 0, 1)) {
                if (DefaultChannelOperations.this.eventPublisher.publishingEnabled()) {
                    DefaultChannelOperations.this.eventListener.onConnectionCloseStart();
                }
                this.nettyChannel.close();
                closeListener = new ChannelCloseListener(DefaultChannelOperations.this.eventListener, DefaultChannelOperations.this.eventPublisher, closeStartTimeNanos, subscriber);
            } else {
                closeListener = new ChannelCloseListener(subscriber);
            }
            closeListener.bridge(this.nettyChannel.closeFuture(), subscriber);
        }

        private class ChannelCloseListener
        extends SubscriberToChannelFutureBridge {
            private final long closeStartTimeNanos;
            private final Subscriber<? super Void> subscriber;
            private final ConnectionEventListener eventListener;
            private final EventPublisher eventPublisher;

            public ChannelCloseListener(ConnectionEventListener eventListener, EventPublisher eventPublisher, long closeStartTimeNanos, Subscriber<? super Void> subscriber) {
                this.eventListener = eventListener;
                this.eventPublisher = eventPublisher;
                this.closeStartTimeNanos = closeStartTimeNanos;
                this.subscriber = subscriber;
            }

            public ChannelCloseListener(Subscriber<? super Void> subscriber) {
                this(null, null, -1L, subscriber);
            }

            @Override
            protected void doOnSuccess(ChannelFuture future) {
                if (null != this.eventListener && this.eventPublisher.publishingEnabled()) {
                    this.eventListener.onConnectionCloseSuccess(Clock.onEndNanos(this.closeStartTimeNanos), TimeUnit.NANOSECONDS);
                }
                if (!this.subscriber.isUnsubscribed()) {
                    this.subscriber.onCompleted();
                }
            }

            @Override
            protected void doOnFailure(ChannelFuture future, Throwable cause) {
                if (null != this.eventListener && this.eventPublisher.publishingEnabled()) {
                    this.eventListener.onConnectionCloseFailed(Clock.onEndNanos(this.closeStartTimeNanos), TimeUnit.NANOSECONDS, future.cause());
                }
                if (!this.subscriber.isUnsubscribed()) {
                    this.subscriber.onError(future.cause());
                }
            }
        }
    }
}

