/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.axonframework.common.Assert;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.EventProcessingMonitor;
import org.axonframework.eventhandling.EventProcessingMonitorSupport;

public class MultiplexingEventProcessingMonitor
implements EventProcessingMonitor {
    private final ConcurrentMap<String, Counter> eventCounters = new ConcurrentHashMap<String, Counter>();
    private final EventProcessingMonitor targetMonitor;

    public MultiplexingEventProcessingMonitor(EventProcessingMonitor targetMonitor) {
        this.targetMonitor = targetMonitor;
    }

    public void prepare(EventMessage eventMessage) {
        this.eventCounters.put(eventMessage.getIdentifier(), new Counter());
    }

    public void prepareForInvocation(EventMessage eventMessage, EventListener member) {
        if (member instanceof EventProcessingMonitorSupport) {
            Counter counter = (Counter)this.eventCounters.get(eventMessage.getIdentifier());
            Assert.notNull(counter, "You must prepare a message before registering async invocations");
            counter.expectAsyncInvocation();
        }
    }

    @Override
    public void onEventProcessingCompleted(List<? extends EventMessage> eventMessages) {
        ArrayList<EventMessage> messagesToAck = new ArrayList<EventMessage>(eventMessages.size());
        for (EventMessage eventMessage : eventMessages) {
            String eventIdentifier = eventMessage.getIdentifier();
            Counter counter = (Counter)this.eventCounters.get(eventIdentifier);
            if (counter != null && !counter.recordSuccess()) continue;
            if (counter != null && counter.hasFailed()) {
                this.targetMonitor.onEventProcessingFailed(Arrays.asList(eventMessage), counter.failureCause());
            } else {
                messagesToAck.add(eventMessage);
            }
            this.eventCounters.remove(eventIdentifier, counter);
        }
        if (!messagesToAck.isEmpty()) {
            this.targetMonitor.onEventProcessingCompleted(messagesToAck);
        }
    }

    @Override
    public void onEventProcessingFailed(List<? extends EventMessage> eventMessages, Throwable cause) {
        ArrayList<EventMessage> messagesToReport = new ArrayList<EventMessage>(eventMessages.size());
        for (EventMessage eventMessage : eventMessages) {
            String eventIdentifier = eventMessage.getIdentifier();
            Counter counter = (Counter)this.eventCounters.get(eventIdentifier);
            if (counter != null && !counter.recordFailure(cause)) continue;
            messagesToReport.add(eventMessage);
            this.eventCounters.remove(eventIdentifier, counter);
        }
        if (!messagesToReport.isEmpty()) {
            this.targetMonitor.onEventProcessingFailed(messagesToReport, cause);
        }
    }

    private static class Counter {
        private final AtomicInteger eventCounter = new AtomicInteger(1);
        private final AtomicInteger failureCounter = new AtomicInteger(0);
        private volatile Throwable cause;

        private Counter() {
        }

        public void expectAsyncInvocation() {
            this.eventCounter.incrementAndGet();
        }

        public boolean recordSuccess() {
            return this.eventCounter.decrementAndGet() == 0;
        }

        public boolean recordFailure(Throwable cause) {
            this.cause = cause;
            this.failureCounter.incrementAndGet();
            return this.eventCounter.decrementAndGet() == 0;
        }

        public Throwable failureCause() {
            return this.cause;
        }

        private boolean hasFailed() {
            return this.cause != null;
        }
    }
}

