/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.microprofile.reactive.messaging.tck.signatures.processors;

import jakarta.enterprise.context.ApplicationScoped;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;

@ApplicationScoped
public class TransformerBean {
    private Map<String, List<String>> collector = new ConcurrentHashMap<String, List<String>>();
    private static final List<String> EXPECTED = Arrays.asList("1", "1", "2", "2", "3", "3", "4", "4", "5", "5", "6", "6", "7", "7", "8", "8", "9", "9", "10", "10");
    private static Map<String, AtomicInteger> counters = new ConcurrentHashMap<String, AtomicInteger>();

    private static void increment(String counter) {
        counters.computeIfAbsent(counter, x -> new AtomicInteger(0)).incrementAndGet();
    }

    @Outgoing(value="publisher-for-publisher-message")
    public PublisherBuilder<Integer> streamForProcessorOfMessages() {
        return ReactiveStreams.of((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Outgoing(value="publisher-for-publisher-payload")
    public PublisherBuilder<Integer> streamForProcessorOfPayloads() {
        return ReactiveStreams.of((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Outgoing(value="publisher-for-publisher-builder-message")
    public PublisherBuilder<Integer> streamForProcessorBuilderOfMessages() {
        return ReactiveStreams.of((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Outgoing(value="publisher-for-publisher-builder-payload")
    public PublisherBuilder<Integer> streamForProcessorBuilderOfPayloads() {
        return ReactiveStreams.of((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Incoming(value="publisher-message")
    public void getMessgesFromProcessorOfMessages(String value) {
        this.add("message", value);
    }

    @Incoming(value="publisher-payload")
    public void getMessgesFromProcessorOfPayloads(String value) {
        this.add("payload", value);
    }

    @Incoming(value="publisher-builder-message")
    public void getMessgesFromProcessorBuilderOfMessages(String value) {
        this.add("builder-message", value);
    }

    @Incoming(value="publisher-builder-payload")
    public void getMessgesFromProcessorBuilderOfPayloads(String value) {
        this.add("builder-payload", value);
    }

    @Incoming(value="publisher-for-publisher-message")
    @Outgoing(value="publisher-message")
    public Publisher<Message<String>> processorOfMessages(Publisher<Message<Integer>> stream) {
        TransformerBean.increment("publisher-message");
        return ReactiveStreams.fromPublisher(stream).map(Message::getPayload).map(i -> i + 1).flatMap(i -> ReactiveStreams.of((Object[])new Integer[]{i, i})).map(i -> Integer.toString(i)).map(Message::of).buildRs();
    }

    @Incoming(value="publisher-for-publisher-payload")
    @Outgoing(value="publisher-payload")
    public Publisher<String> processorOfPayloads(Publisher<Integer> stream) {
        TransformerBean.increment("publisher-payload");
        return ReactiveStreams.fromPublisher(stream).map(i -> i + 1).flatMap(i -> ReactiveStreams.of((Object[])new Integer[]{i, i})).map(i -> Integer.toString(i)).buildRs();
    }

    @Incoming(value="publisher-for-publisher-builder-message")
    @Outgoing(value="publisher-builder-message")
    public PublisherBuilder<Message<String>> processorBuilderOfMessages(PublisherBuilder<Message<Integer>> stream) {
        TransformerBean.increment("publisher-builder-message");
        return stream.map(Message::getPayload).map(i -> i + 1).flatMap(i -> ReactiveStreams.of((Object[])new Integer[]{i, i})).map(i -> Integer.toString(i)).map(Message::of);
    }

    @Incoming(value="publisher-for-publisher-builder-payload")
    @Outgoing(value="publisher-builder-payload")
    public PublisherBuilder<String> processorBuilderOfPayloads(PublisherBuilder<Integer> stream) {
        TransformerBean.increment("publisher-builder-payload");
        return stream.map(i -> i + 1).flatMap(i -> ReactiveStreams.of((Object[])new Integer[]{i, i})).map(i -> Integer.toString(i));
    }

    private void add(String key, String value) {
        this.collector.computeIfAbsent(key, x -> new CopyOnWriteArrayList()).add(value);
    }

    void verify() {
        Awaitility.await().until(() -> this.collector.size() == 4);
        ((MapAssert)Assertions.assertThat(this.collector).hasSize(4)).allSatisfy((k, v) -> Assertions.assertThat((List)v).containsExactlyElementsOf(EXPECTED));
        Assertions.assertThat((AtomicInteger)counters.get("publisher-message")).hasValue(1);
        Assertions.assertThat((AtomicInteger)counters.get("publisher-payload")).hasValue(1);
        Assertions.assertThat((AtomicInteger)counters.get("publisher-builder-message")).hasValue(1);
        Assertions.assertThat((AtomicInteger)counters.get("publisher-builder-payload")).hasValue(1);
    }
}

