/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.turbine.aggregator;

import com.netflix.turbine.aggregator.AggregateString;
import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.NumberList;
import com.netflix.turbine.aggregator.TypeAndNameKey;
import com.netflix.turbine.internal.OperatorPivot;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import rx.Observable;
import rx.observables.GroupedObservable;

public class StreamAggregator {
    public static Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> aggregateGroupedStreams(Observable<GroupedObservable<InstanceKey, Map<String, Object>>> stream) {
        return StreamAggregator.aggregateUsingFlattenedGroupBy(stream);
    }

    private StreamAggregator() {
    }

    private static Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> aggregateUsingPivot(Observable<GroupedObservable<InstanceKey, Map<String, Object>>> instanceStreams) {
        return instanceStreams.map(instanceStream -> GroupedObservable.from((Object)instanceStream.getKey(), (Observable)instanceStream.groupBy(json -> TypeAndNameKey.from(String.valueOf(json.get("type")), String.valueOf(json.get("name")))))).lift(OperatorPivot.create()).map(commandGroup -> GroupedObservable.from((Object)commandGroup.getKey(), (Observable)commandGroup.flatMap(instanceGroup -> instanceGroup.startWith(Collections.emptyMap()).buffer(2, 1).filter(list -> list.size() == 2).map(StreamAggregator::previousAndCurrentToDelta).filter(data -> data != null && !data.isEmpty())).scan(new HashMap(), StreamAggregator::sumOfDelta).skip(1)));
    }

    private static Observable<GroupedObservable<TypeAndNameKey, Map<String, Object>>> aggregateUsingFlattenedGroupBy(Observable<GroupedObservable<InstanceKey, Map<String, Object>>> stream) {
        Observable allData = stream.flatMap(instanceStream -> instanceStream.map(event -> {
            event.put("InstanceKey", instanceStream.getKey());
            event.put("TypeAndName", TypeAndNameKey.from(String.valueOf(event.get("type")), String.valueOf(event.get("name"))));
            return event;
        }).publish(is -> {
            Observable tombstone = is.collect(() -> new HashSet(), (listOfTypeAndName, event) -> listOfTypeAndName.add((TypeAndNameKey)event.get("TypeAndName"))).flatMap(listOfTypeAndName -> Observable.from((Iterable)listOfTypeAndName).map(typeAndName -> {
                LinkedHashMap<String, Object> tombstoneForTypeAndName = new LinkedHashMap<String, Object>();
                tombstoneForTypeAndName.put("tombstone", "true");
                tombstoneForTypeAndName.put("InstanceKey", instanceStream.getKey());
                tombstoneForTypeAndName.put("TypeAndName", typeAndName);
                tombstoneForTypeAndName.put("name", typeAndName.getName());
                tombstoneForTypeAndName.put("type", typeAndName.getType());
                return tombstoneForTypeAndName;
            }).concatWith(Observable.defer(() -> Observable.from((Iterable)listOfTypeAndName).map(typeAndName -> {
                LinkedHashMap<String, Object> tombstoneForTypeAndName = new LinkedHashMap<String, Object>();
                tombstoneForTypeAndName.put("tombstone-end", "true");
                tombstoneForTypeAndName.put("InstanceKey", instanceStream.getKey());
                tombstoneForTypeAndName.put("TypeAndName", typeAndName);
                return tombstoneForTypeAndName;
            }))));
            return is.mergeWith(tombstone);
        }));
        Observable byCommand = allData.groupBy(event -> (TypeAndNameKey)event.get("TypeAndName"));
        return byCommand.map(commandGroup -> {
            Observable sumOfDeltasForAllInstancesForCommand = commandGroup.groupBy(json -> json.get("InstanceKey")).flatMap(instanceGroup -> instanceGroup.takeWhile(d -> !d.containsKey("tombstone-end")).startWith(Collections.emptyMap()).map(data -> {
                if (data.containsKey("tombstone")) {
                    return Collections.emptyMap();
                }
                return data;
            }).buffer(2, 1).filter(list -> list.size() == 2).map(StreamAggregator::previousAndCurrentToDelta).filter(data -> data != null && !data.isEmpty())).scan(new HashMap(), StreamAggregator::sumOfDelta).skip(1);
            return GroupedObservable.from((Object)commandGroup.getKey(), (Observable)sumOfDeltasForAllInstancesForCommand);
        });
    }

    static final Map<String, Object> previousAndCurrentToDelta(List<Map<String, Object>> data) {
        if (data.size() == 2) {
            Map<String, Object> previous = data.get(0);
            Map<String, Object> current = data.get(1);
            return StreamAggregator.previousAndCurrentToDelta(previous, current);
        }
        throw new IllegalArgumentException("Must be list of 2 items");
    }

    static final Map<String, Object> previousAndCurrentToDelta(Map<String, Object> previous, Map<String, Object> current) {
        if (previous.isEmpty()) {
            LinkedHashMap<String, Object> seed = new LinkedHashMap<String, Object>();
            StreamAggregator.initMapWithIdentifiers(current, seed);
            for (String key : current.keySet()) {
                if (StreamAggregator.isIdentifierKey(key)) continue;
                Object currentValue = current.get(key);
                if (currentValue instanceof Number && !key.startsWith("propertyValue_")) {
                    seed.put(key, ((Number)currentValue).longValue());
                    continue;
                }
                if (currentValue instanceof Map) {
                    seed.put(key, NumberList.create((Map)currentValue));
                    continue;
                }
                seed.put(key, new String[]{String.valueOf(currentValue)});
            }
            return seed;
        }
        if (current.isEmpty() || StreamAggregator.containsOnlyIdentifiers(current)) {
            LinkedHashMap<String, Object> delta = new LinkedHashMap<String, Object>();
            StreamAggregator.initMapWithIdentifiers(previous, delta);
            for (String key : previous.keySet()) {
                if (StreamAggregator.isIdentifierKey(key)) continue;
                Object previousValue = previous.get(key);
                if (previousValue instanceof Number && !key.startsWith("propertyValue_")) {
                    Number previousValueAsNumber = (Number)previousValue;
                    long d = -previousValueAsNumber.longValue();
                    delta.put(key, d);
                    continue;
                }
                if (previousValue instanceof Map) {
                    delta.put(key, NumberList.deltaInverse((Map)previousValue));
                    continue;
                }
                delta.put(key, new String[]{String.valueOf(previousValue), null});
            }
            return delta;
        }
        LinkedHashMap<String, Object> delta = new LinkedHashMap<String, Object>();
        StreamAggregator.initMapWithIdentifiers(current, delta);
        for (String key : current.keySet()) {
            if (StreamAggregator.isIdentifierKey(key)) continue;
            Object previousValue = previous.get(key);
            Object currentValue = current.get(key);
            if (currentValue instanceof Number && !key.startsWith("propertyValue_")) {
                if (previousValue == null) {
                    previousValue = 0;
                }
                Number previousValueAsNumber = (Number)previousValue;
                if (currentValue == null) continue;
                Number currentValueAsNumber = (Number)currentValue;
                long d = currentValueAsNumber.longValue() - previousValueAsNumber.longValue();
                delta.put(key, d);
                continue;
            }
            if (currentValue instanceof Map) {
                if (previousValue == null) {
                    delta.put(key, NumberList.create((Map)currentValue));
                    continue;
                }
                delta.put(key, NumberList.delta((Map<String, Object>)((Map)currentValue), (Map)previousValue));
                continue;
            }
            delta.put(key, new String[]{String.valueOf(previousValue), String.valueOf(currentValue)});
        }
        return delta;
    }

    private static boolean isIdentifierKey(String key) {
        return key.equals("InstanceKey") || key.equals("TypeAndName") || key.equals("instanceId") || key.equals("currentTime") || key.equals("name") || key.equals("type");
    }

    private static boolean containsOnlyIdentifiers(Map<String, Object> m) {
        for (String k : m.keySet()) {
            if (StreamAggregator.isIdentifierKey(k)) continue;
            return false;
        }
        return true;
    }

    private static void initMapWithIdentifiers(Map<String, Object> source, Map<String, Object> toInit) {
        toInit.put("InstanceKey", source.get("InstanceKey"));
        toInit.put("TypeAndName", source.get("TypeAndName"));
        toInit.put("instanceId", source.get("instanceId"));
        toInit.put("name", source.get("name"));
        toInit.put("type", source.get("type"));
    }

    static Map<String, Object> sumOfDelta(Map<String, Object> state, Map<String, Object> delta) {
        InstanceKey instanceId = (InstanceKey)delta.get("InstanceKey");
        if (instanceId == null) {
            throw new RuntimeException("InstanceKey can not be null");
        }
        for (String key : delta.keySet()) {
            Object existing = state.get(key);
            Object current = delta.get(key);
            if (current instanceof Number) {
                if (existing == null) {
                    existing = 0;
                }
                Number v = (Number)existing;
                Number d = (Number)delta.get(key);
                state.put(key, v.longValue() + d.longValue());
                continue;
            }
            if (current instanceof NumberList) {
                if (existing == null) {
                    state.put(key, current);
                    continue;
                }
                state.put(key, ((NumberList)existing).sum((NumberList)current));
                continue;
            }
            Object o = delta.get(key);
            if (o instanceof String[]) {
                String[] vs = (String[])o;
                if (vs.length == 1) {
                    Object previousAggregateString = state.get(key);
                    if (previousAggregateString instanceof AggregateString) {
                        state.put(key, ((AggregateString)previousAggregateString).update(null, vs[0], instanceId));
                        continue;
                    }
                    state.put(key, AggregateString.create(vs[0], instanceId));
                    continue;
                }
                AggregateString pas = (AggregateString)state.get(key);
                state.put(key, pas.update(vs[0], vs[1], instanceId));
                continue;
            }
            state.put(key, String.valueOf(o));
        }
        return state;
    }
}

