/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.datastream;

import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;

@PublicEvolving
public class AsyncDataStream {
    private static final int DEFAULT_QUEUE_CAPACITY = 100;

    private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, int bufSize, OutputMode mode) {
        TypeInformation outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(func, AsyncFunction.class, 0, 1, new int[]{1, 0}, in.getType(), Utils.getCallLocationName(), true);
        AsyncWaitOperatorFactory<IN, OUT> operatorFactory = new AsyncWaitOperatorFactory<IN, OUT>(in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);
        return in.transform("async wait operator", outTypeInfo, operatorFactory);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity) {
        return AsyncDataStream.addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {
        return AsyncDataStream.addOperator(in, func, timeUnit.toMillis(timeout), 100, OutputMode.UNORDERED);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, int capacity) {
        return AsyncDataStream.addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
    }

    public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {
        return AsyncDataStream.addOperator(in, func, timeUnit.toMillis(timeout), 100, OutputMode.ORDERED);
    }

    public static enum OutputMode {
        ORDERED,
        UNORDERED;

    }
}

