/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.eventstore.jpa;

import java.io.Closeable;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import org.axonframework.common.Assert;
import org.axonframework.common.IdentifierValidator;
import org.axonframework.common.io.IOUtils;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.domain.GenericDomainEventMessage;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.PartialStreamSupport;
import org.axonframework.eventstore.SnapshotEventStore;
import org.axonframework.eventstore.jpa.DefaultEventEntryStore;
import org.axonframework.eventstore.jpa.EventEntryStore;
import org.axonframework.eventstore.jpa.SQLErrorCodesResolver;
import org.axonframework.eventstore.jpa.criteria.JpaCriteria;
import org.axonframework.eventstore.jpa.criteria.JpaCriteriaBuilder;
import org.axonframework.eventstore.jpa.criteria.ParameterRegistry;
import org.axonframework.eventstore.management.Criteria;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.eventstore.management.EventStoreManagement;
import org.axonframework.repository.ConcurrencyException;
import org.axonframework.serializer.MessageSerializer;
import org.axonframework.serializer.SerializedDomainEventData;
import org.axonframework.serializer.SerializedObject;
import org.axonframework.serializer.Serializer;
import org.axonframework.serializer.xml.XStreamSerializer;
import org.axonframework.upcasting.SimpleUpcasterChain;
import org.axonframework.upcasting.UpcastUtils;
import org.axonframework.upcasting.UpcasterAware;
import org.axonframework.upcasting.UpcasterChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JpaEventStore
implements SnapshotEventStore,
EventStoreManagement,
UpcasterAware,
PartialStreamSupport {
    private static final Logger logger = LoggerFactory.getLogger(JpaEventStore.class);
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final int DEFAULT_MAX_SNAPSHOTS_ARCHIVED = 1;
    private final MessageSerializer serializer;
    private final EventEntryStore<?> eventEntryStore;
    private final JpaCriteriaBuilder criteriaBuilder = new JpaCriteriaBuilder();
    private final EntityManagerProvider entityManagerProvider;
    private int batchSize = 100;
    private UpcasterChain upcasterChain = SimpleUpcasterChain.EMPTY;
    private int maxSnapshotsArchived = 1;
    private PersistenceExceptionResolver persistenceExceptionResolver;

    public JpaEventStore(EntityManagerProvider entityManagerProvider) {
        this(entityManagerProvider, new XStreamSerializer(), new DefaultEventEntryStore());
    }

    public JpaEventStore(EntityManagerProvider entityManagerProvider, EventEntryStore eventEntryStore) {
        this(entityManagerProvider, new XStreamSerializer(), eventEntryStore);
    }

    public JpaEventStore(EntityManagerProvider entityManagerProvider, Serializer serializer) {
        this(entityManagerProvider, serializer, new DefaultEventEntryStore());
    }

    public JpaEventStore(EntityManagerProvider entityManagerProvider, Serializer serializer, EventEntryStore eventEntryStore) {
        Assert.notNull(entityManagerProvider, "entityManagerProvider may not be null");
        Assert.notNull(serializer, "serializer may not be null");
        Assert.notNull(eventEntryStore, "eventEntryStore may not be null");
        this.entityManagerProvider = entityManagerProvider;
        this.serializer = new MessageSerializer(serializer);
        this.eventEntryStore = eventEntryStore;
    }

    @Override
    public void appendEvents(String type, DomainEventStream events) {
        DomainEventMessage event = null;
        try {
            EntityManager entityManager = this.entityManagerProvider.getEntityManager();
            while (events.hasNext()) {
                event = events.next();
                IdentifierValidator.validateIdentifier(event.getAggregateIdentifier().getClass());
                SerializedObject<?> serializedPayload = this.serializer.serializePayload(event, this.eventEntryStore.getDataType());
                SerializedObject<?> serializedMetaData = this.serializer.serializeMetaData(event, this.eventEntryStore.getDataType());
                this.eventEntryStore.persistEvent(type, event, serializedPayload, serializedMetaData, entityManager);
            }
            entityManager.flush();
        }
        catch (RuntimeException exception) {
            if (event != null && this.persistenceExceptionResolver != null && this.persistenceExceptionResolver.isDuplicateKeyViolation(exception)) {
                throw new ConcurrencyException(String.format("Concurrent modification detected for Aggregate identifier [%s], sequence: [%s]", event.getAggregateIdentifier(), event.getSequenceNumber()), exception);
            }
            throw exception;
        }
    }

    @Override
    public DomainEventStream readEvents(String type, Object identifier) {
        long snapshotSequenceNumber = -1L;
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        SerializedDomainEventData<?> lastSnapshotEvent = this.eventEntryStore.loadLastSnapshotEvent(type, identifier, entityManager);
        GenericDomainEventMessage snapshotEvent = null;
        if (lastSnapshotEvent != null) {
            try {
                snapshotEvent = new GenericDomainEventMessage(identifier, lastSnapshotEvent.getSequenceNumber(), this.serializer.deserialize(lastSnapshotEvent.getPayload()), (Map)this.serializer.deserialize(lastSnapshotEvent.getMetaData()));
                snapshotSequenceNumber = snapshotEvent.getSequenceNumber();
            }
            catch (RuntimeException ex) {
                logger.warn("Error while reading snapshot event entry. Reconstructing aggregate on entire event stream. Caused by: {} {}", (Object)ex.getClass().getName(), (Object)ex.getMessage());
            }
            catch (LinkageError error) {
                logger.warn("Error while reading snapshot event entry. Reconstructing aggregate on entire event stream. Caused by: {} {}", (Object)error.getClass().getName(), (Object)error.getMessage());
            }
        }
        Iterator<SerializedDomainEventData<?>> entries = this.eventEntryStore.fetchAggregateStream(type, identifier, snapshotSequenceNumber + 1L, this.batchSize, entityManager);
        if (snapshotEvent == null && !entries.hasNext()) {
            throw new EventStreamNotFoundException(type, identifier);
        }
        return new CursorBackedDomainEventStream(snapshotEvent, entries, identifier, false);
    }

    @Override
    public DomainEventStream readEvents(String type, Object identifier, long firstSequenceNumber) {
        return this.readEvents(type, identifier, firstSequenceNumber, Long.MAX_VALUE);
    }

    @Override
    public DomainEventStream readEvents(String type, Object identifier, long firstSequenceNumber, long lastSequenceNumber) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        int minimalBatchSize = (int)Math.min((long)this.batchSize, lastSequenceNumber - firstSequenceNumber + 2L);
        Iterator<SerializedDomainEventData<?>> entries = this.eventEntryStore.fetchAggregateStream(type, identifier, firstSequenceNumber, minimalBatchSize, entityManager);
        if (!entries.hasNext()) {
            throw new EventStreamNotFoundException(type, identifier);
        }
        return new CursorBackedDomainEventStream(null, entries, identifier, lastSequenceNumber, false);
    }

    @Override
    public void appendSnapshotEvent(String type, DomainEventMessage snapshotEvent) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        Class<?> dataType = this.eventEntryStore.getDataType();
        SerializedObject<?> serializedPayload = this.serializer.serializePayload(snapshotEvent, dataType);
        SerializedObject<?> serializedMetaData = this.serializer.serializeMetaData(snapshotEvent, dataType);
        try {
            this.eventEntryStore.persistSnapshot(type, snapshotEvent, serializedPayload, serializedMetaData, entityManager);
            if (this.maxSnapshotsArchived > 0) {
                this.eventEntryStore.pruneSnapshots(type, snapshotEvent, this.maxSnapshotsArchived, this.entityManagerProvider.getEntityManager());
            }
            entityManager.flush();
        }
        catch (RuntimeException exception) {
            if (snapshotEvent != null && this.persistenceExceptionResolver != null && this.persistenceExceptionResolver.isDuplicateKeyViolation(exception)) {
                throw new ConcurrencyException(String.format("A snapshot for aggregate [%s] at sequence: [%s] was already inserted", snapshotEvent.getAggregateIdentifier(), snapshotEvent.getSequenceNumber()), exception);
            }
            throw exception;
        }
    }

    @Override
    public void visitEvents(EventVisitor visitor) {
        this.doVisitEvents(visitor, null, Collections.<String, Object>emptyMap());
    }

    @Override
    public void visitEvents(Criteria criteria, EventVisitor visitor) {
        StringBuilder sb = new StringBuilder();
        ParameterRegistry parameters = new ParameterRegistry();
        ((JpaCriteria)criteria).parse("e", sb, parameters);
        this.doVisitEvents(visitor, sb.toString(), parameters.getParameters());
    }

    @Override
    public CriteriaBuilder newCriteriaBuilder() {
        return this.criteriaBuilder;
    }

    private void doVisitEvents(EventVisitor visitor, String whereClause, Map<String, Object> parameters) {
        EntityManager entityManager = this.entityManagerProvider.getEntityManager();
        Iterator<SerializedDomainEventData<?>> batch = this.eventEntryStore.fetchFiltered(whereClause, parameters, this.batchSize, entityManager);
        CursorBackedDomainEventStream eventStream = new CursorBackedDomainEventStream(null, batch, null, true);
        while (eventStream.hasNext()) {
            visitor.doWithEvent(eventStream.next());
        }
    }

    public void setDataSource(DataSource dataSource) throws SQLException {
        if (this.persistenceExceptionResolver == null) {
            this.persistenceExceptionResolver = new SQLErrorCodesResolver(dataSource);
        }
    }

    public void setPersistenceExceptionResolver(PersistenceExceptionResolver persistenceExceptionResolver) {
        this.persistenceExceptionResolver = persistenceExceptionResolver;
    }

    public void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    @Override
    public void setUpcasterChain(UpcasterChain upcasterChain) {
        this.upcasterChain = upcasterChain;
    }

    public void setMaxSnapshotsArchived(int maxSnapshotsArchived) {
        this.maxSnapshotsArchived = maxSnapshotsArchived;
    }

    private final class CursorBackedDomainEventStream
    implements DomainEventStream,
    Closeable {
        private Iterator<DomainEventMessage> currentBatch;
        private DomainEventMessage next;
        private final Iterator<? extends SerializedDomainEventData> cursor;
        private final Object aggregateIdentifier;
        private final long lastSequenceNumber;
        private final boolean skipUnknownTypes;

        public CursorBackedDomainEventStream(DomainEventMessage snapshotEvent, Iterator<? extends SerializedDomainEventData> cursor, Object aggregateIdentifier, boolean skipUnknownTypes) {
            this(snapshotEvent, cursor, aggregateIdentifier, Long.MAX_VALUE, skipUnknownTypes);
        }

        public CursorBackedDomainEventStream(DomainEventMessage snapshotEvent, Iterator<? extends SerializedDomainEventData> cursor, Object aggregateIdentifier, long lastSequenceNumber, boolean skipUnknownTypes) {
            this.aggregateIdentifier = aggregateIdentifier;
            this.lastSequenceNumber = lastSequenceNumber;
            this.skipUnknownTypes = skipUnknownTypes;
            this.currentBatch = snapshotEvent != null ? Collections.singletonList(snapshotEvent).iterator() : Collections.emptyList().iterator();
            this.cursor = cursor;
            this.initializeNextItem();
        }

        @Override
        public boolean hasNext() {
            return this.next != null && this.next.getSequenceNumber() <= this.lastSequenceNumber;
        }

        @Override
        public DomainEventMessage next() {
            DomainEventMessage current = this.next;
            this.initializeNextItem();
            return current;
        }

        private void initializeNextItem() {
            while (!this.currentBatch.hasNext() && this.cursor.hasNext()) {
                SerializedDomainEventData entry = this.cursor.next();
                this.currentBatch = UpcastUtils.upcastAndDeserialize(entry, this.aggregateIdentifier, JpaEventStore.this.serializer, JpaEventStore.this.upcasterChain, this.skipUnknownTypes).iterator();
            }
            this.next = this.currentBatch.hasNext() ? this.currentBatch.next() : null;
        }

        @Override
        public DomainEventMessage peek() {
            return this.next;
        }

        @Override
        public void close() throws IOException {
            IOUtils.closeIfCloseable(this.currentBatch);
        }
    }
}

