/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.ConstrainedPrioritySet;

final class TaskMovement {
    private final TaskId task;
    private final ProcessId destination;
    private final SortedSet<ProcessId> caughtUpClients;

    private TaskMovement(TaskId task, ProcessId destination, SortedSet<ProcessId> caughtUpClients) {
        this.task = task;
        this.destination = destination;
        this.caughtUpClients = caughtUpClients;
    }

    private TaskId task() {
        return this.task;
    }

    private int numCaughtUpClients() {
        return this.caughtUpClients.size();
    }

    private static boolean taskIsNotCaughtUpOnClientAndOtherMoreCaughtUpClientsExist(TaskId task, ProcessId client, Map<ProcessId, ClientState> clientStates, Map<TaskId, SortedSet<ProcessId>> tasksToCaughtUpClients, Map<TaskId, SortedSet<ProcessId>> tasksToClientByLag) {
        long clientLag;
        SortedSet<ProcessId> taskClients = Objects.requireNonNull(tasksToClientByLag.get(task), "uninitialized set");
        if (TaskMovement.taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients)) {
            return false;
        }
        long mostCaughtUpLag = clientStates.get(taskClients.first()).lagFor(task);
        return mostCaughtUpLag < (clientLag = clientStates.get(client).lagFor(task));
    }

    private static boolean taskIsCaughtUpOnClient(TaskId task, ProcessId client, Map<TaskId, SortedSet<ProcessId>> tasksToCaughtUpClients) {
        Set caughtUpClients = Objects.requireNonNull(tasksToCaughtUpClients.get(task), "uninitialized set");
        return caughtUpClients.contains(client);
    }

    static int assignActiveTaskMovements(Map<TaskId, SortedSet<ProcessId>> tasksToCaughtUpClients, Map<TaskId, SortedSet<ProcessId>> tasksToClientByLag, Map<ProcessId, ClientState> clientStates, Map<ProcessId, Set<TaskId>> warmups, AtomicInteger remainingWarmupReplicas) {
        BiFunction<ProcessId, TaskId, Boolean> caughtUpPredicate = (client, task) -> TaskMovement.taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients);
        ConstrainedPrioritySet caughtUpClientsByTaskLoad = new ConstrainedPrioritySet(caughtUpPredicate, client -> ((ClientState)clientStates.get(client)).assignedTaskLoad());
        PriorityQueue<TaskMovement> taskMovements = new PriorityQueue<TaskMovement>(Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task));
        for (Map.Entry<ProcessId, ClientState> clientStateEntry : clientStates.entrySet()) {
            ProcessId client2 = clientStateEntry.getKey();
            ClientState state = clientStateEntry.getValue();
            for (TaskId task2 : state.activeTasks()) {
                if (!TaskMovement.taskIsNotCaughtUpOnClientAndOtherMoreCaughtUpClientsExist(task2, client2, clientStates, tasksToCaughtUpClients, tasksToClientByLag)) continue;
                taskMovements.add(new TaskMovement(task2, client2, tasksToCaughtUpClients.get(task2)));
            }
            caughtUpClientsByTaskLoad.offer(client2);
        }
        int movementsNeeded = taskMovements.size();
        while (!taskMovements.isEmpty()) {
            TaskMovement movement = (TaskMovement)taskMovements.poll();
            boolean moved = TaskMovement.tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, caughtUpClientsByTaskLoad, movement) || TaskMovement.tryToMoveActiveToCaughtUpClientAndTryToWarmUp(clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement) || TaskMovement.tryToMoveActiveToMostCaughtUpClient(tasksToClientByLag, clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement);
            if (moved) continue;
            throw new IllegalStateException("Tried to move task to more caught-up client as scheduled before but none exist");
        }
        return movementsNeeded;
    }

    static int assignStandbyTaskMovements(Map<TaskId, SortedSet<ProcessId>> tasksToCaughtUpClients, Map<TaskId, SortedSet<ProcessId>> tasksToClientByLag, Map<ProcessId, ClientState> clientStates, AtomicInteger remainingWarmupReplicas, Map<ProcessId, Set<TaskId>> warmups) {
        BiFunction<ProcessId, TaskId, Boolean> caughtUpPredicate = (client, task) -> TaskMovement.taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients);
        ConstrainedPrioritySet caughtUpClientsByTaskLoad = new ConstrainedPrioritySet(caughtUpPredicate, client -> ((ClientState)clientStates.get(client)).assignedTaskLoad());
        PriorityQueue<TaskMovement> taskMovements = new PriorityQueue<TaskMovement>(Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task));
        for (Map.Entry<ProcessId, ClientState> clientStateEntry : clientStates.entrySet()) {
            ProcessId destination = clientStateEntry.getKey();
            ClientState state = clientStateEntry.getValue();
            for (TaskId task2 : state.standbyTasks()) {
                if (warmups.getOrDefault(destination, Collections.emptySet()).contains(task2) || !TaskMovement.taskIsNotCaughtUpOnClientAndOtherMoreCaughtUpClientsExist(task2, destination, clientStates, tasksToCaughtUpClients, tasksToClientByLag)) continue;
                taskMovements.add(new TaskMovement(task2, destination, tasksToCaughtUpClients.get(task2)));
            }
            caughtUpClientsByTaskLoad.offer(destination);
        }
        int movementsNeeded = 0;
        while (!taskMovements.isEmpty()) {
            TaskMovement movement = (TaskMovement)taskMovements.poll();
            Function<ProcessId, Boolean> eligibleClientPredicate = clientId -> !((ClientState)clientStates.get(clientId)).hasAssignedTask(movement.task);
            ProcessId sourceClient = caughtUpClientsByTaskLoad.poll(movement.task, eligibleClientPredicate);
            if (sourceClient == null) {
                sourceClient = TaskMovement.mostCaughtUpEligibleClient(tasksToClientByLag, eligibleClientPredicate, movement.task, movement.destination);
            }
            if (sourceClient == null) continue;
            TaskMovement.moveStandbyAndTryToWarmUp(remainingWarmupReplicas, movement.task, clientStates.get(sourceClient), clientStates.get(movement.destination));
            caughtUpClientsByTaskLoad.offerAll(Arrays.asList(sourceClient, movement.destination));
            ++movementsNeeded;
        }
        return movementsNeeded;
    }

    private static boolean tryToSwapStandbyAndActiveOnCaughtUpClient(Map<ProcessId, ClientState> clientStates, ConstrainedPrioritySet caughtUpClientsByTaskLoad, TaskMovement movement) {
        ProcessId caughtUpStandbySourceClient = caughtUpClientsByTaskLoad.poll(movement.task, c -> ((ClientState)clientStates.get(c)).hasStandbyTask(movement.task));
        if (caughtUpStandbySourceClient != null) {
            TaskMovement.swapStandbyAndActive(movement.task, clientStates.get(caughtUpStandbySourceClient), clientStates.get(movement.destination));
            caughtUpClientsByTaskLoad.offerAll(Arrays.asList(caughtUpStandbySourceClient, movement.destination));
            return true;
        }
        return false;
    }

    private static boolean tryToMoveActiveToCaughtUpClientAndTryToWarmUp(Map<ProcessId, ClientState> clientStates, Map<ProcessId, Set<TaskId>> warmups, AtomicInteger remainingWarmupReplicas, ConstrainedPrioritySet caughtUpClientsByTaskLoad, TaskMovement movement) {
        ProcessId caughtUpSourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
        if (caughtUpSourceClient != null) {
            TaskMovement.moveActiveAndTryToWarmUp(remainingWarmupReplicas, movement.task, clientStates.get(caughtUpSourceClient), clientStates.get(movement.destination), warmups.computeIfAbsent(movement.destination, x -> new TreeSet()));
            caughtUpClientsByTaskLoad.offerAll(Arrays.asList(caughtUpSourceClient, movement.destination));
            return true;
        }
        return false;
    }

    private static boolean tryToMoveActiveToMostCaughtUpClient(Map<TaskId, SortedSet<ProcessId>> tasksToClientByLag, Map<ProcessId, ClientState> clientStates, Map<ProcessId, Set<TaskId>> warmups, AtomicInteger remainingWarmupReplicas, ConstrainedPrioritySet caughtUpClientsByTaskLoad, TaskMovement movement) {
        ProcessId mostCaughtUpSourceClient = TaskMovement.mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination);
        if (mostCaughtUpSourceClient != null) {
            if (clientStates.get(mostCaughtUpSourceClient).hasStandbyTask(movement.task)) {
                TaskMovement.swapStandbyAndActive(movement.task, clientStates.get(mostCaughtUpSourceClient), clientStates.get(movement.destination));
            } else {
                TaskMovement.moveActiveAndTryToWarmUp(remainingWarmupReplicas, movement.task, clientStates.get(mostCaughtUpSourceClient), clientStates.get(movement.destination), warmups.computeIfAbsent(movement.destination, x -> new TreeSet()));
            }
            caughtUpClientsByTaskLoad.offerAll(Arrays.asList(mostCaughtUpSourceClient, movement.destination));
            return true;
        }
        return false;
    }

    private static void moveActiveAndTryToWarmUp(AtomicInteger remainingWarmupReplicas, TaskId task, ClientState sourceClientState, ClientState destinationClientState, Set<TaskId> warmups) {
        sourceClientState.assignActive(task);
        if (remainingWarmupReplicas.getAndDecrement() > 0) {
            destinationClientState.unassignActive(task);
            destinationClientState.assignStandby(task);
            warmups.add(task);
        } else {
            destinationClientState.unassignActive(task);
        }
    }

    private static void moveStandbyAndTryToWarmUp(AtomicInteger remainingWarmupReplicas, TaskId task, ClientState sourceClientState, ClientState destinationClientState) {
        sourceClientState.assignStandby(task);
        if (remainingWarmupReplicas.getAndDecrement() <= 0) {
            destinationClientState.unassignStandby(task);
        }
    }

    private static void swapStandbyAndActive(TaskId task, ClientState sourceClientState, ClientState destinationClientState) {
        sourceClientState.unassignStandby(task);
        sourceClientState.assignActive(task);
        destinationClientState.unassignActive(task);
        destinationClientState.assignStandby(task);
    }

    private static ProcessId mostCaughtUpEligibleClient(Map<TaskId, SortedSet<ProcessId>> tasksToClientByLag, TaskId task, ProcessId destinationClient) {
        return TaskMovement.mostCaughtUpEligibleClient(tasksToClientByLag, client -> true, task, destinationClient);
    }

    private static ProcessId mostCaughtUpEligibleClient(Map<TaskId, SortedSet<ProcessId>> tasksToClientByLag, Function<ProcessId, Boolean> constraint, TaskId task, ProcessId destinationClient) {
        ProcessId client;
        Iterator iterator = tasksToClientByLag.get(task).iterator();
        while (iterator.hasNext() && !destinationClient.equals(client = (ProcessId)iterator.next())) {
            if (!constraint.apply(client).booleanValue()) continue;
            return client;
        }
        return null;
    }
}

