/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup;

import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.VertexGroupComputeUtil;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;

public class ForwardGroupComputeUtil {
    public static Map<JobVertexID, ForwardGroup> computeForwardGroups(Iterable<JobVertex> topologicallySortedVertices, Function<JobVertexID, ExecutionJobVertex> executionJobVertexRetriever) {
        IdentityHashMap vertexToGroup = new IdentityHashMap();
        for (JobVertex vertex2 : topologicallySortedVertices) {
            Set<JobVertex> currentGroup = new HashSet<JobVertex>();
            currentGroup.add(vertex2);
            vertexToGroup.put(vertex2, currentGroup);
            for (JobEdge input : ForwardGroupComputeUtil.getForwardInputs(vertex2)) {
                JobVertex producerVertex = input.getSource().getProducer();
                Set producerGroup = (Set)vertexToGroup.get(producerVertex);
                if (producerGroup == null) {
                    throw new IllegalStateException("Producer task " + producerVertex.getID() + " forward group is null while calculating forward group for the consumer task " + vertex2.getID() + ". This should be a forward group building bug.");
                }
                if (currentGroup == producerGroup) continue;
                currentGroup = VertexGroupComputeUtil.mergeVertexGroups(currentGroup, producerGroup, vertexToGroup);
            }
        }
        HashMap<JobVertexID, ForwardGroup> ret = new HashMap<JobVertexID, ForwardGroup>();
        for (Set vertexGroup : VertexGroupComputeUtil.uniqueVertexGroups(vertexToGroup)) {
            if (vertexGroup.size() <= 1) continue;
            ForwardGroup forwardGroup = new ForwardGroup(vertexGroup.stream().map(vertex -> (ExecutionJobVertex)executionJobVertexRetriever.apply(vertex.getID())).collect(Collectors.toSet()));
            for (JobVertexID jobVertexId : forwardGroup.getJobVertexIds()) {
                ret.put(jobVertexId, forwardGroup);
            }
        }
        return ret;
    }

    static Iterable<JobEdge> getForwardInputs(JobVertex jobVertex) {
        return jobVertex.getInputs().stream().filter(JobEdge::isForward).collect(Collectors.toSet());
    }
}

