/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.proton.streams.impl;

import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.ProtonLinkOptions;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.impl.ProtonConnectionImpl;
import io.vertx.proton.streams.Delivery;
import io.vertx.proton.streams.ProtonPublisher;
import io.vertx.proton.streams.ProtonPublisherOptions;
import io.vertx.proton.streams.impl.DeliveryImpl;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.Target;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ProtonPublisherImpl
implements ProtonPublisher<Delivery> {
    private static final Logger LOG = LoggerFactory.getLogger(ProtonPublisherImpl.class);
    private static final Symbol SHARED = Symbol.valueOf((String)"shared");
    private static final Symbol GLOBAL = Symbol.valueOf((String)"global");
    private ContextInternal connCtx;
    private final ProtonConnectionImpl conn;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private AmqpSubscription subscription;
    private ProtonReceiver receiver;
    private boolean emitOnConnectionEnd = true;
    private int maxOutstandingCredit = 1000;
    private boolean durable;

    public ProtonPublisherImpl(String address, ProtonConnectionImpl conn, ProtonPublisherOptions options) {
        this.connCtx = conn.getContext();
        this.conn = conn;
        ProtonLinkOptions linkOptions = new ProtonLinkOptions();
        if (options.getLinkName() != null) {
            linkOptions.setLinkName(options.getLinkName());
        }
        this.receiver = conn.createReceiver(address, linkOptions);
        this.receiver.setAutoAccept(false);
        this.receiver.setPrefetch(0);
        if (options.getMaxOutstandingCredit() > 0) {
            this.maxOutstandingCredit = options.getMaxOutstandingCredit();
        }
        Source source = (Source)this.receiver.getSource();
        this.durable = options.isDurable();
        if (this.durable) {
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
        }
        if (options.isDynamic()) {
            source.setAddress(null);
            source.setDynamic(true);
        }
        ArrayList<Symbol> capabilities = new ArrayList<Symbol>();
        if (options.isShared()) {
            capabilities.add(SHARED);
        }
        if (options.isGlobal()) {
            capabilities.add(GLOBAL);
        }
        if (!capabilities.isEmpty()) {
            Symbol[] caps = capabilities.toArray(new Symbol[capabilities.size()]);
            source.setCapabilities(caps);
        }
    }

    public void subscribe(Subscriber<? super Delivery> subscriber) {
        LOG.trace((Object)"Subscribe called");
        Objects.requireNonNull(subscriber, "A subscriber must be supplied");
        if (this.subscribed.getAndSet(true)) {
            throw new IllegalStateException("Only a single susbcriber supported, and subscribe already called.");
        }
        this.subscription = new AmqpSubscription(subscriber);
        this.connCtx.runOnContext(x -> {
            this.conn.addEndHandler((Handler<Void>)((Handler)v -> {
                if (this.emitOnConnectionEnd) {
                    this.subscription.indicateError(new Exception("Connection closed: " + this.conn.getContainer()));
                }
            }));
            this.receiver.closeHandler(res -> {
                this.subscription.indicateError(new Exception("Link closed unexpectedly"));
                this.receiver.close();
            });
            this.receiver.detachHandler(res -> {
                this.subscription.indicateError(new Exception("Link detached unexpectedly"));
                this.receiver.detach();
            });
            this.receiver.openHandler(res -> this.subscription.indicateSubscribed());
            this.receiver.handler((delivery, message) -> {
                DeliveryImpl envelope = new DeliveryImpl(message, delivery, this.connCtx);
                if (!this.subscription.onNextWrapper(envelope)) {
                    delivery.disposition((DeliveryState)Released.getInstance(), true);
                }
            });
            this.receiver.open();
        });
    }

    public boolean isEmitOnConnectionEnd() {
        return this.emitOnConnectionEnd;
    }

    public void setEmitOnConnectionEnd(boolean emitOnConnectionEnd) {
        this.emitOnConnectionEnd = emitOnConnectionEnd;
    }

    public ProtonReceiver getLink() {
        return this.receiver;
    }

    @Override
    public ProtonPublisher<Delivery> setSource(org.apache.qpid.proton.amqp.transport.Source source) {
        this.receiver.setSource(source);
        return this;
    }

    @Override
    public org.apache.qpid.proton.amqp.transport.Source getSource() {
        return this.receiver.getSource();
    }

    @Override
    public ProtonPublisher<Delivery> setTarget(Target target) {
        this.receiver.setTarget(target);
        return this;
    }

    @Override
    public Target getTarget() {
        return this.receiver.getTarget();
    }

    @Override
    public org.apache.qpid.proton.amqp.transport.Source getRemoteSource() {
        return this.receiver.getRemoteSource();
    }

    @Override
    public Target getRemoteTarget() {
        return this.receiver.getRemoteTarget();
    }

    @Override
    public String getRemoteAddress() {
        org.apache.qpid.proton.amqp.transport.Source remoteSource = this.getRemoteSource();
        return remoteSource == null ? null : remoteSource.getAddress();
    }

    public class AmqpSubscription
    implements Subscription {
        private Subscriber<? super Delivery> subcriber;
        private final AtomicBoolean cancelled = new AtomicBoolean();
        private final AtomicBoolean completed = new AtomicBoolean();
        private long outstandingRequests = 0L;

        public AmqpSubscription(Subscriber<? super Delivery> sub) {
            this.subcriber = sub;
        }

        private boolean onNextWrapper(Delivery next) {
            if (!this.completed.get() && !this.cancelled.get()) {
                int creditLimit;
                int credits;
                int currentCredit;
                LOG.trace((Object)"calling onNext");
                this.subcriber.onNext((Object)next);
                --this.outstandingRequests;
                if (!this.cancelled.get() && (double)(currentCredit = ProtonPublisherImpl.this.receiver.getCredit()) < (double)ProtonPublisherImpl.this.maxOutstandingCredit * 0.5 && this.outstandingRequests > (long)currentCredit && (credits = (creditLimit = (int)Math.min(this.outstandingRequests, (long)ProtonPublisherImpl.this.maxOutstandingCredit)) - currentCredit) > 0) {
                    LOG.trace((Object)"Updating credit for outstanding requests: {0}", new Object[]{credits});
                    this.flowCreditIfNeeded(credits);
                }
                return true;
            }
            LOG.trace((Object)"skipped calling onNext, already completed or cancelled");
            return false;
        }

        public void request(long n) {
            LOG.trace((Object)"Request called: {0}", new Object[]{n});
            if (n <= 0L && !this.cancelled.get()) {
                LOG.warn((Object)"non-positive subscription request, requests must be > 0");
                ProtonPublisherImpl.this.connCtx.runOnContext(x -> this.indicateError(new IllegalArgumentException("non-positive subscription request, requests must be > 0")));
            } else if (!this.cancelled.get()) {
                ProtonPublisherImpl.this.connCtx.runOnContext(x -> {
                    LOG.trace((Object)"Processing request: {0}", new Object[]{n});
                    if (n == Long.MAX_VALUE) {
                        this.outstandingRequests = Long.MAX_VALUE;
                    } else {
                        try {
                            this.outstandingRequests = Math.addExact(n, this.outstandingRequests);
                        }
                        catch (ArithmeticException ae) {
                            this.outstandingRequests = Long.MAX_VALUE;
                        }
                    }
                    if (this.cancelled.get()) {
                        LOG.trace((Object)"Not sending more credit, subscription cancelled since request was originally scheduled");
                        return;
                    }
                    this.flowCreditIfNeeded(n);
                });
            }
        }

        private void flowCreditIfNeeded(long n) {
            int limit;
            int addedCredit;
            int currentCredit = ProtonPublisherImpl.this.receiver.getCredit();
            if (currentCredit < ProtonPublisherImpl.this.maxOutstandingCredit && (addedCredit = (int)Math.min(n, (long)(limit = ProtonPublisherImpl.this.maxOutstandingCredit - currentCredit))) > 0) {
                if (!this.completed.get()) {
                    LOG.trace((Object)"Flowing additional credits : {0}", new Object[]{addedCredit});
                    ProtonPublisherImpl.this.receiver.flow(addedCredit);
                } else {
                    LOG.trace((Object)"Skipping flowing additional credits as already completed: {0}", new Object[]{addedCredit});
                }
            }
        }

        public void cancel() {
            LOG.trace((Object)"Cancel called");
            if (!this.cancelled.getAndSet(true)) {
                LOG.trace((Object)"Cancellation scheduled");
                ProtonPublisherImpl.this.connCtx.runOnContext(x -> {
                    LOG.trace((Object)"Cancelling");
                    ProtonPublisherImpl.this.receiver.closeHandler(y -> {
                        this.indicateCompletion();
                        ProtonPublisherImpl.this.receiver.close();
                    });
                    ProtonPublisherImpl.this.receiver.detachHandler(y -> {
                        this.indicateCompletion();
                        ProtonPublisherImpl.this.receiver.detach();
                    });
                    if (ProtonPublisherImpl.this.durable) {
                        ProtonPublisherImpl.this.receiver.detach();
                    } else {
                        ProtonPublisherImpl.this.receiver.close();
                    }
                });
            } else {
                LOG.trace((Object)"Cancel no-op, already called.");
            }
        }

        private void indicateError(Throwable t) {
            if (!this.completed.getAndSet(true)) {
                Subscriber<? super Delivery> sub = this.subcriber;
                this.subcriber = null;
                if (sub != null && !this.cancelled.get()) {
                    LOG.trace((Object)"Indicating error");
                    sub.onError(t);
                } else {
                    LOG.trace((Object)"Skipping error indication, no sub or already cancelled");
                }
            } else {
                LOG.trace((Object)"indicateError no-op, already completed");
            }
        }

        private void indicateSubscribed() {
            if (!this.completed.get()) {
                LOG.trace((Object)"Indicating subscribed");
                if (this.subcriber != null) {
                    this.subcriber.onSubscribe((Subscription)this);
                }
            } else {
                LOG.trace((Object)"indicateSubscribed no-op, already completed");
            }
        }

        private void indicateCompletion() {
            if (!this.completed.getAndSet(true)) {
                Subscriber<? super Delivery> sub = this.subcriber;
                this.subcriber = null;
                boolean canned = this.cancelled.get();
                if (sub != null && (this.outstandingRequests > 0L && canned || !canned)) {
                    LOG.trace((Object)"Indicating completion");
                    sub.onComplete();
                } else {
                    LOG.trace((Object)"Skipping completion indication");
                }
            } else {
                LOG.trace((Object)"indicateCompletion no-op, already completed");
            }
        }
    }
}

