/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventhandling.replay;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import org.axonframework.common.Assert;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.Cluster;
import org.axonframework.eventhandling.replay.IncomingMessageHandler;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class BackloggingIncomingMessageHandler
implements IncomingMessageHandler {
    private boolean inReplay = false;
    private final Queue<EventMessage> backlog;
    private final Set<String> replayedMessages = new HashSet<String>();
    private DateTime backlogThreshold;
    private final Duration timeMargin;

    public BackloggingIncomingMessageHandler() {
        this(Duration.standardSeconds((long)5L));
    }

    public BackloggingIncomingMessageHandler(Duration backlogThresholdMargin) {
        this(backlogThresholdMargin, new LinkedList<EventMessage>());
    }

    public BackloggingIncomingMessageHandler(Duration backlogThresholdMargin, Queue<EventMessage> backlog) {
        this.timeMargin = backlogThresholdMargin;
        this.backlog = backlog;
    }

    @Override
    public synchronized void prepareForReplay(Cluster destination) {
        Assert.isFalse(this.inReplay, "This message handler is already performing a replay. Are you using the same instances on multiple clusters?");
        this.inReplay = true;
        this.backlogThreshold = new DateTime().minus((ReadableDuration)this.timeMargin);
    }

    @Override
    public synchronized List<EventMessage> onIncomingMessages(Cluster destination, EventMessage ... messages) {
        if (!this.inReplay) {
            destination.publish(messages);
            return null;
        }
        ArrayList<EventMessage> discarded = null;
        for (EventMessage message : messages) {
            if (!message.getTimestamp().isAfter((ReadableInstant)this.backlogThreshold)) continue;
            if (this.replayedMessages.contains(message.getIdentifier())) {
                if (discarded == null) {
                    discarded = new ArrayList<EventMessage>();
                }
                discarded.add(message);
                continue;
            }
            this.backlog.add(message);
        }
        return discarded;
    }

    @Override
    public List<EventMessage> releaseMessage(Cluster destination, DomainEventMessage message) {
        LinkedList<EventMessage> processedMessages = new LinkedList<EventMessage>();
        if (message.getTimestamp().isAfter((ReadableInstant)this.backlogThreshold)) {
            this.replayedMessages.add(message.getIdentifier());
            for (EventMessage backloggedMessage : this.backlog) {
                if (!backloggedMessage.getTimestamp().isAfter((ReadableInstant)message.getTimestamp()) && !(backloggedMessage instanceof DomainEventMessage)) {
                    processedMessages.add(backloggedMessage);
                    destination.publish(backloggedMessage);
                } else if (backloggedMessage.getIdentifier().equals(message.getIdentifier())) {
                    processedMessages.add(backloggedMessage);
                }
                if (!backloggedMessage.getTimestamp().isAfter((ReadableInstant)message.getTimestamp().plus((ReadableDuration)this.timeMargin))) continue;
                break;
            }
        }
        for (EventMessage processedMessage : processedMessages) {
            this.backlog.remove(processedMessage);
        }
        return processedMessages;
    }

    @Override
    public synchronized void onReplayFailed(Cluster destination, Throwable cause) {
        this.inReplay = false;
        this.replayedMessages.clear();
        this.backlog.clear();
    }

    @Override
    public synchronized void processBacklog(Cluster destination) {
        this.inReplay = false;
        while (!this.backlog.isEmpty()) {
            EventMessage message = this.backlog.poll();
            if (message == null || this.replayedMessages.contains(message.getIdentifier())) continue;
            destination.publish(message);
        }
        this.replayedMessages.clear();
    }
}

