/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.spi.impl.operationparker.impl;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.instance.HazelcastThreadGroup;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.internal.metrics.MetricsProvider;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.partition.MigrationInfo;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.BlockingOperation;
import com.hazelcast.spi.LiveOperations;
import com.hazelcast.spi.LiveOperationsTracker;
import com.hazelcast.spi.Notifier;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationResponseHandler;
import com.hazelcast.spi.WaitNotifyKey;
import com.hazelcast.spi.exception.PartitionMigratingException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationparker.OperationParker;
import com.hazelcast.spi.impl.operationparker.impl.ParkedOperation;
import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.executor.SingleExecutorThreadFactory;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class OperationParkerImpl
implements OperationParker,
LiveOperationsTracker,
MetricsProvider {
    private static final long FIRST_WAIT_TIME = 1000L;
    private static final long TIMEOUT_UPPER_BOUND = 1500L;
    private final ConcurrentMap<WaitNotifyKey, Queue<ParkedOperation>> parkQueueMap = new ConcurrentHashMap<WaitNotifyKey, Queue<ParkedOperation>>(100);
    private final DelayQueue delayQueue = new DelayQueue();
    private final ExecutorService expirationExecutor;
    private final Future expirationTaskFuture;
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final ConstructorFunction<WaitNotifyKey, Queue<ParkedOperation>> parkQueueConstructor = new ConstructorFunction<WaitNotifyKey, Queue<ParkedOperation>>(){

        @Override
        public Queue<ParkedOperation> createNew(WaitNotifyKey key) {
            return new ConcurrentLinkedQueue<ParkedOperation>();
        }
    };

    public OperationParkerImpl(NodeEngineImpl nodeEngine) {
        this.nodeEngine = nodeEngine;
        Node node = nodeEngine.getNode();
        this.logger = node.getLogger(OperationParker.class.getName());
        HazelcastThreadGroup threadGroup = node.getHazelcastThreadGroup();
        this.expirationExecutor = Executors.newSingleThreadExecutor(new SingleExecutorThreadFactory(threadGroup.getInternalThreadGroup(), threadGroup.getClassLoader(), threadGroup.getThreadNamePrefix("operation-parker")));
        this.expirationTaskFuture = this.expirationExecutor.submit(new ExpirationTask());
    }

    @Override
    public void provideMetrics(MetricsRegistry registry) {
        this.nodeEngine.getMetricsRegistry().scanAndRegister(this, "operation-parker");
    }

    @Override
    public void populate(LiveOperations liveOperations) {
        for (Queue parkQueue : this.parkQueueMap.values()) {
            for (ParkedOperation parkedOperation : parkQueue) {
                Operation operation = parkedOperation.getOperation();
                liveOperations.add(operation.getCallerAddress(), operation.getCallId());
            }
        }
    }

    private void invalidate(ParkedOperation parkedOperation) throws Exception {
        this.nodeEngine.getOperationService().execute(parkedOperation);
    }

    @Override
    public void park(BlockingOperation op) {
        WaitNotifyKey key = op.getWaitKey();
        Queue<ParkedOperation> parkQueue = ConcurrencyUtil.getOrPutIfAbsent(this.parkQueueMap, key, this.parkQueueConstructor);
        long timeout = op.getWaitTimeout();
        ParkedOperation parkedOperation = new ParkedOperation(parkQueue, op);
        parkedOperation.setNodeEngine(this.nodeEngine);
        parkQueue.offer(parkedOperation);
        if (timeout > -1L && timeout < 1500L) {
            this.delayQueue.offer(parkedOperation);
        }
    }

    @Override
    public void unpark(Notifier notifier) {
        WaitNotifyKey key = notifier.getNotifiedKey();
        Queue parkQueue = (Queue)this.parkQueueMap.get(key);
        if (parkQueue == null) {
            return;
        }
        ParkedOperation parkedOp = (ParkedOperation)parkQueue.peek();
        while (parkedOp != null) {
            Operation op = parkedOp.getOperation();
            if (notifier == op) {
                throw new IllegalStateException("Found cyclic wait-notify! -> " + notifier);
            }
            if (parkedOp.isValid()) {
                if (parkedOp.isExpired()) {
                    parkedOp.onExpire();
                } else {
                    if (parkedOp.shouldWait()) {
                        return;
                    }
                    this.nodeEngine.getOperationService().run(op);
                }
                parkedOp.setValid(false);
            }
            parkQueue.poll();
            parkedOp = (ParkedOperation)parkQueue.peek();
            if (parkedOp != null) continue;
            this.parkQueueMap.remove(key);
        }
    }

    @Probe
    public int getParkQueueCount() {
        return this.parkQueueMap.size();
    }

    @Probe
    public int getTotalParkedOperationCount() {
        int count = 0;
        for (Queue parkQueue : this.parkQueueMap.values()) {
            count += parkQueue.size();
        }
        return count;
    }

    public int getTotalValidWaitingOperationCount() {
        int count = 0;
        for (Queue parkQueue : this.parkQueueMap.values()) {
            for (ParkedOperation parkedOperation : parkQueue) {
                if (!parkedOperation.valid) continue;
                ++count;
            }
        }
        return count;
    }

    public void onMemberLeft(MemberImpl leftMember) {
        this.invalidateWaitingOps(leftMember.getUuid());
    }

    public void onClientDisconnected(String clientUuid) {
        this.invalidateWaitingOps(clientUuid);
    }

    private void invalidateWaitingOps(String callerUuid) {
        for (Queue parkQueue : this.parkQueueMap.values()) {
            for (ParkedOperation parkedOperation : parkQueue) {
                Operation op;
                if (!parkedOperation.isValid() || !callerUuid.equals((op = parkedOperation.getOperation()).getCallerUuid())) continue;
                parkedOperation.setValid(false);
            }
        }
    }

    public void onPartitionMigrate(Address thisAddress, MigrationInfo migrationInfo) {
        if (!thisAddress.equals(migrationInfo.getSource())) {
            return;
        }
        int partitionId = migrationInfo.getPartitionId();
        for (Queue parkQueue : this.parkQueueMap.values()) {
            Iterator it = parkQueue.iterator();
            while (it.hasNext()) {
                Operation op;
                if (Thread.interrupted()) {
                    return;
                }
                ParkedOperation parkedOperation = (ParkedOperation)it.next();
                if (!parkedOperation.isValid() || partitionId != (op = parkedOperation.getOperation()).getPartitionId()) continue;
                parkedOperation.setValid(false);
                PartitionMigratingException pme = new PartitionMigratingException(thisAddress, partitionId, op.getClass().getName(), op.getServiceName());
                OperationResponseHandler responseHandler = op.getOperationResponseHandler();
                responseHandler.sendResponse(op, pme);
                it.remove();
            }
        }
    }

    @Override
    public void cancelParkedOperations(String serviceName, Object objectId, Throwable cause) {
        for (Queue parkQueue : this.parkQueueMap.values()) {
            for (ParkedOperation parkedOperation : parkQueue) {
                WaitNotifyKey wnk;
                if (!parkedOperation.isValid() || !serviceName.equals((wnk = parkedOperation.blockingOperation.getWaitKey()).getServiceName()) || !objectId.equals(wnk.getObjectName())) continue;
                parkedOperation.cancel(cause);
            }
        }
    }

    public void reset() {
        this.delayQueue.clear();
        this.parkQueueMap.clear();
    }

    public void shutdown() {
        this.logger.finest("Stopping tasks...");
        this.expirationTaskFuture.cancel(true);
        this.expirationExecutor.shutdown();
        HazelcastInstanceNotActiveException response = new HazelcastInstanceNotActiveException();
        Address thisAddress = this.nodeEngine.getThisAddress();
        for (Queue parkQueue : this.parkQueueMap.values()) {
            for (ParkedOperation parkedOperation : parkQueue) {
                Operation op;
                if (!parkedOperation.isValid() || !thisAddress.equals((op = parkedOperation.getOperation()).getCallerAddress())) continue;
                try {
                    OperationResponseHandler responseHandler = op.getOperationResponseHandler();
                    responseHandler.sendResponse(op, response);
                }
                catch (Exception e) {
                    this.logger.finest("While sending HazelcastInstanceNotActiveException response...", e);
                }
            }
            parkQueue.clear();
        }
        this.parkQueueMap.clear();
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("OperationParker{");
        sb.append("delayQueue=");
        sb.append(this.delayQueue.size());
        sb.append(" \n[");
        for (Queue scheduledOps : this.parkQueueMap.values()) {
            sb.append("\t");
            sb.append(scheduledOps.size());
            sb.append(", ");
        }
        sb.append("]\n}");
        return sb.toString();
    }

    private class ExpirationTask
    implements Runnable {
        private ExpirationTask() {
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    if (!this.doRun()) continue;
                    return;
                }
                catch (InterruptedException e) {
                    return;
                }
                catch (Throwable t) {
                    OperationParkerImpl.this.logger.warning(t);
                    continue;
                }
                break;
            }
            return;
        }

        private boolean doRun() throws Exception {
            long waitTime = 1000L;
            while (waitTime > 0L) {
                long end;
                long begin = System.currentTimeMillis();
                ParkedOperation parkedOperation = (ParkedOperation)OperationParkerImpl.this.delayQueue.poll(waitTime, TimeUnit.MILLISECONDS);
                if (parkedOperation != null && parkedOperation.isValid()) {
                    OperationParkerImpl.this.invalidate(parkedOperation);
                }
                if ((waitTime -= (end = System.currentTimeMillis()) - begin) <= 1000L) continue;
                waitTime = 1000L;
            }
            for (Queue parkQueue : OperationParkerImpl.this.parkQueueMap.values()) {
                for (ParkedOperation parkedOperation : parkQueue) {
                    if (Thread.interrupted()) {
                        return true;
                    }
                    if (!parkedOperation.isValid() || !parkedOperation.needsInvalidation()) continue;
                    OperationParkerImpl.this.invalidate(parkedOperation);
                }
            }
            return false;
        }
    }
}

