/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.operators.sort.CircularElement;
import org.apache.flink.runtime.operators.sort.StageRunner;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.MutableObjectIterator;

final class CircularQueues<E>
implements StageRunner.StageMessageDispatcher<E> {
    private final BlockingQueue<CircularElement<E>> empty;
    private final BlockingQueue<CircularElement<E>> sort;
    private final BlockingQueue<CircularElement<E>> spill;
    private volatile boolean isFinished = false;
    private final CompletableFuture<MutableObjectIterator<E>> iteratorFuture = new CompletableFuture();

    public CircularQueues() {
        this.empty = new LinkedBlockingQueue<CircularElement<E>>();
        this.sort = new LinkedBlockingQueue<CircularElement<E>>();
        this.spill = new LinkedBlockingQueue<CircularElement<E>>();
    }

    private BlockingQueue<CircularElement<E>> getQueue(StageRunner.SortStage stage) {
        switch (stage) {
            case READ: {
                return this.empty;
            }
            case SORT: {
                return this.sort;
            }
            case SPILL: {
                return this.spill;
            }
        }
        throw new IllegalArgumentException();
    }

    public CompletableFuture<MutableObjectIterator<E>> getIteratorFuture() {
        return this.iteratorFuture;
    }

    @Override
    public void send(StageRunner.SortStage stage, CircularElement<E> element) {
        this.getQueue(stage).add(element);
    }

    @Override
    public void sendResult(MutableObjectIterator<E> result) {
        this.iteratorFuture.complete(result);
    }

    @Override
    public CircularElement<E> take(StageRunner.SortStage stage) throws InterruptedException {
        while (!this.isFinished) {
            CircularElement<E> value = this.getQueue(stage).poll(1L, TimeUnit.SECONDS);
            if (value == null) continue;
            return value;
        }
        throw new FlinkRuntimeException("The sorter is closed already");
    }

    @Override
    public CircularElement<E> poll(StageRunner.SortStage stage) {
        return (CircularElement)this.getQueue(stage).poll();
    }

    @Override
    public void close() {
        this.isFinished = true;
    }
}

