/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.cassandra.impl;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import io.vertx.cassandra.CassandraClient;
import io.vertx.cassandra.CassandraClientOptions;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.cassandra.impl.CassandraRowStreamImpl;
import io.vertx.cassandra.impl.ResultSetImpl;
import io.vertx.cassandra.impl.SessionHolder;
import io.vertx.cassandra.impl.Util;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collector;

public class CassandraClientImpl
implements CassandraClient {
    static final String HOLDERS_LOCAL_MAP_NAME = "__vertx.cassandraClient.sessionHolders";
    final VertxInternal vertx;
    private final String clientName;
    private final CassandraClientOptions options;
    private final Map<String, SessionHolder> holders;
    private boolean closed;

    public CassandraClientImpl(Vertx vertx, String clientName, CassandraClientOptions options) {
        Objects.requireNonNull(vertx, "vertx");
        Objects.requireNonNull(clientName, "clientName");
        Objects.requireNonNull(options, "options");
        this.vertx = (VertxInternal)vertx;
        this.clientName = clientName;
        this.options = options;
        this.holders = vertx.sharedData().getLocalMap(HOLDERS_LOCAL_MAP_NAME);
        SessionHolder current = this.holders.compute(clientName, (k, h) -> h == null ? new SessionHolder() : h.increment());
        Context context = Vertx.currentContext();
        if (context != null && context.owner() == vertx) {
            context.addCloseHook(this::close);
        }
    }

    @Override
    public synchronized boolean isConnected() {
        if (this.closed) {
            return false;
        }
        Session s = this.holders.get((Object)this.clientName).session;
        return s != null && !s.isClosed();
    }

    @Override
    public CassandraClient executeWithFullFetch(String query, Handler<AsyncResult<List<Row>>> resultHandler) {
        return this.executeWithFullFetch((Statement)new SimpleStatement(query), resultHandler);
    }

    @Override
    public CassandraClient executeWithFullFetch(Statement statement, Handler<AsyncResult<List<Row>>> resultHandler) {
        this.execute(statement, (Handler<AsyncResult<ResultSet>>)((Handler)exec -> {
            if (exec.succeeded()) {
                ResultSet resultSet = (ResultSet)exec.result();
                resultSet.all(resultHandler);
            } else {
                resultHandler.handle((Object)Future.failedFuture((Throwable)exec.cause()));
            }
        }));
        return this;
    }

    @Override
    public CassandraClient execute(String query, Handler<AsyncResult<ResultSet>> resultHandler) {
        return this.execute((Statement)new SimpleStatement(query), resultHandler);
    }

    @Override
    public <R> CassandraClient execute(String query, Collector<Row, ?, R> collector, Handler<AsyncResult<R>> asyncResultHandler) {
        return this.execute((Statement)new SimpleStatement(query), collector, asyncResultHandler);
    }

    @Override
    public CassandraClient execute(Statement statement, Handler<AsyncResult<ResultSet>> resultHandler) {
        ContextInternal context = this.vertx.getOrCreateContext();
        this.getSession(context, (Handler<AsyncResult<Session>>)((Handler)sess -> {
            if (sess.succeeded()) {
                Util.handleOnContext(((Session)sess.result()).executeAsync(statement), (Context)context, rs -> new ResultSetImpl((com.datastax.driver.core.ResultSet)rs, (Vertx)this.vertx), resultHandler);
            } else {
                resultHandler.handle((Object)Future.failedFuture((Throwable)sess.cause()));
            }
        }));
        return this;
    }

    @Override
    public <R> CassandraClient execute(Statement statement, Collector<Row, ?, R> collector, Handler<AsyncResult<R>> asyncResultHandler) {
        this.executeAndCollect(statement, collector, asyncResultHandler);
        return this;
    }

    private <C, R> void executeAndCollect(Statement statement, Collector<Row, C, R> collector, Handler<AsyncResult<R>> asyncResultHandler) {
        Promise cassandraRowStreamPromise = Promise.promise();
        this.queryStream(statement, (Handler<AsyncResult<CassandraRowStream>>)cassandraRowStreamPromise);
        Object container = collector.supplier().get();
        BiConsumer accumulator = collector.accumulator();
        Function finisher = collector.finisher();
        cassandraRowStreamPromise.future().compose(cassandraRowStream -> {
            Promise resultPromise = Promise.promise();
            cassandraRowStream.endHandler(end -> {
                Object result = finisher.apply(container);
                resultPromise.complete(result);
            });
            cassandraRowStream.handler(row -> accumulator.accept(container, (Row)row));
            cassandraRowStream.exceptionHandler(arg_0 -> ((Promise)resultPromise).fail(arg_0));
            return resultPromise.future();
        }).setHandler(asyncResultHandler);
    }

    @Override
    public CassandraClient prepare(String query, Handler<AsyncResult<PreparedStatement>> resultHandler) {
        ContextInternal context = this.vertx.getOrCreateContext();
        this.getSession(context, (Handler<AsyncResult<Session>>)((Handler)sess -> {
            if (sess.succeeded()) {
                Util.handleOnContext(((Session)sess.result()).prepareAsync(query), (Context)context, resultHandler);
            } else {
                resultHandler.handle((Object)Future.failedFuture((Throwable)sess.cause()));
            }
        }));
        return this;
    }

    @Override
    public CassandraClient queryStream(String sql, Handler<AsyncResult<CassandraRowStream>> rowStreamHandler) {
        return this.queryStream((Statement)new SimpleStatement(sql), rowStreamHandler);
    }

    @Override
    public CassandraClient queryStream(Statement statement, Handler<AsyncResult<CassandraRowStream>> rowStreamHandler) {
        ContextInternal context = this.vertx.getOrCreateContext();
        this.getSession(context, (Handler<AsyncResult<Session>>)((Handler)sess -> {
            if (sess.succeeded()) {
                Util.handleOnContext(((Session)sess.result()).executeAsync(statement), (Context)context, rs -> {
                    ResultSetImpl resultSet = new ResultSetImpl((com.datastax.driver.core.ResultSet)rs, (Vertx)this.vertx);
                    return new CassandraRowStreamImpl((Context)context, resultSet);
                }, rowStreamHandler);
            } else {
                rowStreamHandler.handle((Object)Future.failedFuture((Throwable)sess.cause()));
            }
        }));
        return this;
    }

    @Override
    public CassandraClient close() {
        return this.close(null);
    }

    @Override
    public CassandraClient close(Handler<AsyncResult<Void>> closeHandler) {
        if (this.raiseCloseFlag()) {
            while (true) {
                SessionHolder current = this.holders.get(this.clientName);
                SessionHolder next = current.decrement();
                if (next.refCount == 0) {
                    if (!this.holders.remove(this.clientName, current)) continue;
                    if (current.session != null) {
                        Util.handleOnContext(current.session.closeAsync(), (Context)this.vertx.getOrCreateContext(), closeHandler);
                        return this;
                    }
                    break;
                }
                if (this.holders.replace(this.clientName, current, next)) break;
            }
        }
        if (closeHandler != null) {
            closeHandler.handle((Object)Future.succeededFuture());
        }
        return this;
    }

    private synchronized boolean raiseCloseFlag() {
        if (!this.closed) {
            this.closed = true;
            return true;
        }
        return false;
    }

    synchronized void getSession(ContextInternal context, Handler<AsyncResult<Session>> handler) {
        if (this.closed) {
            handler.handle((Object)Future.failedFuture((String)"Client is closed"));
        } else {
            SessionHolder holder = this.holders.get(this.clientName);
            if (holder.session != null) {
                handler.handle((Object)Future.succeededFuture((Object)holder.session));
            } else {
                context.executeBlocking(promise -> this.connect((Promise<Session>)promise), holder.connectionQueue, handler);
            }
        }
    }

    private void connect(Promise<Session> promise) {
        Cluster cluster;
        Session session;
        SessionHolder current = this.holders.get(this.clientName);
        if (current == null) {
            promise.fail("Client closed while connecting");
            return;
        }
        if (current.session != null) {
            promise.complete((Object)current.session);
            return;
        }
        Cluster.Builder builder = this.options.dataStaxClusterBuilder();
        if (builder.getContactPoints().isEmpty()) {
            builder.addContactPoint("localhost");
        }
        if ((current = this.holders.compute(this.clientName, (arg_0, arg_1) -> CassandraClientImpl.lambda$connect$11(session = (cluster = builder.build()).connect(this.options.getKeyspace()), arg_0, arg_1))) != null) {
            promise.complete((Object)current.session);
        } else {
            try {
                session.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
            promise.fail("Client closed while connecting");
        }
    }

    private static /* synthetic */ SessionHolder lambda$connect$11(Session session, String k, SessionHolder h) {
        return h == null ? null : h.connected(session);
    }
}

