/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.spi.cluster.ignite.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.spi.cluster.ignite.impl.ChoosableIterableImpl;
import io.vertx.spi.cluster.ignite.impl.ClusterSerializationUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;

public class AsyncMultiMapImpl<K, V>
implements AsyncMultiMap<K, V> {
    private final IgniteCache<K, Set<V>> cache;
    private final VertxInternal vertx;
    private final TaskQueue taskQueue = new TaskQueue();
    private final ConcurrentMap<K, ChoosableIterableImpl<V>> subs = new ConcurrentHashMap<K, ChoosableIterableImpl<V>>();

    public AsyncMultiMapImpl(IgniteCache<K, Set<V>> cache, Vertx vertx) {
        ((Ignite)cache.unwrap(Ignite.class)).events().localListen((IgnitePredicate & Serializable)event -> {
            if (!(event instanceof CacheEvent)) {
                throw new IllegalArgumentException("Unknown event received: " + event);
            }
            CacheEvent cacheEvent = (CacheEvent)event;
            if (Objects.equals(cacheEvent.cacheName(), cache.getName()) && ((IgniteCacheProxy)cache).context().localNodeId().equals(cacheEvent.eventNode().id())) {
                Object key = ClusterSerializationUtils.unmarshal(cacheEvent.key());
                switch (cacheEvent.type()) {
                    case 65: {
                        this.subs.remove(key);
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Unknown event received: " + event);
                    }
                }
            }
            return true;
        }, new int[]{65});
        this.cache = cache;
        this.vertx = (VertxInternal)vertx;
    }

    public void add(K key, V value, Handler<AsyncResult<Void>> handler) {
        Object val0 = ClusterSerializationUtils.marshal(value);
        this.execute(cache -> cache.invokeAsync(ClusterSerializationUtils.marshal(key), (CacheEntryProcessor & Serializable)(entry, arguments) -> {
            HashSet<Object> values = (HashSet<Object>)entry.getValue();
            if (values == null) {
                values = new HashSet<Object>();
            }
            values.add(val0);
            entry.setValue(values);
            return null;
        }, new Object[0]), handler);
    }

    public void get(K key, Handler<AsyncResult<ChoosableIterable<V>>> handler) {
        this.execute(cache -> cache.getAsync(ClusterSerializationUtils.marshal(key)), items -> {
            Set items0;
            ChoosableIterableImpl it;
            Set unmarshalledItems = null;
            if (items != null) {
                unmarshalledItems = items.stream().map(ClusterSerializationUtils::unmarshal).collect(Collectors.toSet());
            }
            return (it = this.subs.compute(key, (arg_0, arg_1) -> AsyncMultiMapImpl.lambda$null$2(items0 = unmarshalledItems, arg_0, arg_1))) == null ? ChoosableIterableImpl.empty() : it;
        }, handler);
    }

    public void remove(K key, V value, Handler<AsyncResult<Boolean>> handler) {
        this.execute(cache -> cache.invokeAsync(ClusterSerializationUtils.marshal(key), (CacheEntryProcessor & Serializable)(entry, arguments) -> {
            Set values = (Set)entry.getValue();
            if (values != null) {
                values = values.stream().map(ClusterSerializationUtils::unmarshal).collect(Collectors.toSet());
                boolean removed = values.remove(value);
                if (values.isEmpty()) {
                    entry.remove();
                } else {
                    values = values.stream().map(ClusterSerializationUtils::marshal).collect(Collectors.toSet());
                    entry.setValue(values);
                }
                return removed;
            }
            return false;
        }, new Object[0]), handler);
    }

    public void removeAllForValue(V value, Handler<AsyncResult<Void>> handler) {
        this.removeAllMatching(obj -> value.equals(ClusterSerializationUtils.unmarshal(obj)), handler);
    }

    public void removeAllMatching(Predicate<V> p, Handler<AsyncResult<Void>> handler) {
        this.vertx.getOrCreateContext().executeBlocking(fut -> {
            boolean success = false;
            CacheException err = null;
            for (int i = 0; i < 5; ++i) {
                try {
                    for (Cache.Entry entry : this.cache) {
                        this.cache.invokeAsync(entry.getKey(), (CacheEntryProcessor & Serializable)(e, args) -> {
                            Set values = (Set)e.getValue();
                            if (values != null) {
                                values.removeIf(p);
                                if (values.isEmpty()) {
                                    e.remove();
                                } else {
                                    e.setValue((Object)values);
                                }
                            }
                            return null;
                        }, new Object[0]);
                    }
                    success = true;
                    fut.complete();
                    break;
                }
                catch (CacheException e2) {
                    err = e2;
                    continue;
                }
            }
            if (!success) {
                fut.fail((Throwable)err);
            }
        }, this.taskQueue, handler);
    }

    private <T> void execute(Function<IgniteCache<K, Set<V>>, IgniteFuture<T>> cacheOp, Handler<AsyncResult<T>> handler) {
        this.execute(cacheOp, UnaryOperator.identity(), handler);
    }

    private <T, R> void execute(Function<IgniteCache<K, Set<V>>, IgniteFuture<T>> cacheOp, Function<T, R> mapper, Handler<AsyncResult<R>> handler) {
        this.vertx.getOrCreateContext().executeBlocking(f -> {
            IgniteFuture future = (IgniteFuture)cacheOp.apply(this.cache);
            f.complete(mapper.apply(future.get()));
        }, this.taskQueue, handler);
    }

    private static /* synthetic */ ChoosableIterableImpl lambda$null$2(Set items0, Object k, ChoosableIterableImpl oldValue) {
        if (items0 == null || items0.isEmpty()) {
            return null;
        }
        if (oldValue == null) {
            return new ChoosableIterableImpl(new ArrayList(items0));
        }
        oldValue.update(new ArrayList(items0));
        return oldValue;
    }
}

