/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.quarkus.runtime.annotations.RegisterForReflection;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.common.metrics.Metrics;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumerHelper;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.LifecycleStatus;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RegisterForReflection(targets={KafkaReadStreamImpl.class})
public class HonoKafkaConsumer<V>
implements Lifecycle,
ServiceClient {
    public static final int THRESHOLD_METADATA_MAX_AGE_MS = 500;
    public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 250L;
    private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(30L);
    private static final String MSG_CONSUMER_NOT_INITIALIZED_STARTED = "consumer not initialized/started";
    private static final Logger LOG = LoggerFactory.getLogger(HonoKafkaConsumer.class);
    protected final Vertx vertx;
    protected final Map<String, String> consumerConfig;
    protected final Set<String> topics;
    protected final Pattern topicPattern;
    protected final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private final AtomicReference<Promise<Void>> initialPartitionAssignmentDonePromiseRef = new AtomicReference();
    private final AtomicBoolean subscriptionUpdateTriggered = new AtomicBoolean();
    private final Map<String, SubscriptionUpdateTracker> subscriptionUpdateTrackersForToBeAddedTopics = new HashMap<String, SubscriptionUpdateTracker>();
    private final Optional<Integer> metadataMaxAge;
    private final AtomicBoolean pollingPaused = new AtomicBoolean();
    private final AtomicBoolean recordFetchingPaused = new AtomicBoolean();
    private Handler<KafkaConsumerRecord<String, V>> recordHandler;
    private KafkaConsumer<String, V> kafkaConsumer;
    private Context context;
    private ExecutorService kafkaConsumerWorker;
    private volatile Set<String> subscribedTopicPatternTopics = new HashSet<String>();
    private ConsumerRebalanceListener rebalanceListener;
    private Handler<Set<TopicPartition>> onPartitionsAssignedHandler;
    private Handler<Set<TopicPartition>> onRebalanceDoneHandler;
    private Handler<Set<TopicPartition>> onPartitionsRevokedHandler;
    private Handler<Set<TopicPartition>> onPartitionsLostHandler;
    private boolean respectTtl = true;
    private Duration pollTimeout = Duration.ofMillis(250L);
    private Supplier<Consumer<String, V>> kafkaConsumerSupplier;
    private KafkaClientMetricsSupport metricsSupport;
    private Long pollPauseTimeoutTimerId;
    private Duration consumerCreationRetriesTimeout = Duration.ZERO;

    public HonoKafkaConsumer(Vertx vertx, Set<String> topics, Handler<KafkaConsumerRecord<String, V>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, Objects.requireNonNull(topics), (Pattern)null, consumerConfig);
        this.setRecordHandler(Objects.requireNonNull(recordHandler));
    }

    public HonoKafkaConsumer(Vertx vertx, Pattern topicPattern, Handler<KafkaConsumerRecord<String, V>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, null, Objects.requireNonNull(topicPattern), consumerConfig);
        this.setRecordHandler(Objects.requireNonNull(recordHandler));
    }

    protected HonoKafkaConsumer(Vertx vertx, Set<String> topics, Pattern topicPattern, Map<String, String> consumerConfig) {
        this.vertx = Objects.requireNonNull(vertx);
        if (topics == null == (topicPattern == null)) {
            throw new NullPointerException("exactly one of topics or topicPattern has to be set");
        }
        this.topicPattern = topicPattern;
        this.topics = Optional.ofNullable(topics).map(HashSet::new).orElse(null);
        this.consumerConfig = Objects.requireNonNull(consumerConfig);
        this.metadataMaxAge = Optional.ofNullable(consumerConfig.get("metadata.max.age.ms")).map(s -> {
            try {
                return Integer.parseInt(s);
            }
            catch (NumberFormatException e) {
                return null;
            }
        });
        if (!consumerConfig.containsKey("group.id")) {
            if ("true".equals(consumerConfig.get("enable.auto.commit"))) {
                throw new IllegalArgumentException("%s config entry has to be set if auto-commit is enabled".formatted("group.id"));
            }
            LOG.trace("no group.id set, using a random UUID as default and disabling auto-commit");
            consumerConfig.put("group.id", UUID.randomUUID().toString());
            consumerConfig.put("enable.auto.commit", "false");
        }
    }

    public final void setRecordHandler(Handler<KafkaConsumerRecord<String, V>> handler) {
        Objects.requireNonNull(handler);
        if (!this.lifecycleStatus.isStopped()) {
            throw new IllegalStateException("Record handler can only be set if consumer has not been started yet");
        }
        this.recordHandler = handler;
    }

    protected final void addTopic(String topicName) {
        Objects.requireNonNull(topicName);
        if (!this.lifecycleStatus.isStopped()) {
            throw new IllegalStateException("Topics can only be set if consumer has not been started yet");
        }
        if (this.topics == null) {
            throw new IllegalStateException("Cannot add topic on consumer which has been created with a topic pattern");
        }
        this.topics.add(topicName);
    }

    public final void addOnKafkaConsumerReadyHandler(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.lifecycleStatus.addOnStartedHandler(handler);
        }
    }

    public final void setOnPartitionsAssignedHandler(Handler<Set<TopicPartition>> onPartitionsAssignedHandler) {
        this.onPartitionsAssignedHandler = Objects.requireNonNull(onPartitionsAssignedHandler);
    }

    public final void setOnRebalanceDoneHandler(Handler<Set<TopicPartition>> handler) {
        this.onRebalanceDoneHandler = Objects.requireNonNull(handler);
    }

    public final void setOnPartitionsRevokedHandler(Handler<Set<TopicPartition>> onPartitionsRevokedHandler) {
        this.onPartitionsRevokedHandler = Objects.requireNonNull(onPartitionsRevokedHandler);
    }

    public final void setOnPartitionsLostHandler(Handler<Set<TopicPartition>> onPartitionsLostHandler) {
        this.onPartitionsLostHandler = Objects.requireNonNull(onPartitionsLostHandler);
    }

    public final void setMetricsSupport(KafkaClientMetricsSupport metricsSupport) {
        this.metricsSupport = metricsSupport;
    }

    public final void setConsumerCreationRetriesTimeout(Duration consumerCreationRetriesTimeout) {
        this.consumerCreationRetriesTimeout = consumerCreationRetriesTimeout;
    }

    public final void setRespectTtl(boolean respectTtl) {
        this.respectTtl = respectTtl;
    }

    public final void setPollTimeout(Duration pollTimeout) {
        this.pollTimeout = Objects.requireNonNull(pollTimeout);
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.asStream().pollTimeout(pollTimeout);
        }
    }

    public void setKafkaConsumerSupplier(Supplier<Consumer<String, V>> supplier) {
        this.kafkaConsumerSupplier = supplier;
    }

    public final boolean pauseRecordFetching() {
        if (!this.recordFetchingPaused.compareAndSet(false, true)) {
            return false;
        }
        this.runOnKafkaWorkerThread(v -> {
            Set<org.apache.kafka.common.TopicPartition> partitions = this.getUnderlyingConsumer().assignment();
            if (!partitions.isEmpty()) {
                this.getUnderlyingConsumer().pause(partitions);
            }
        });
        return true;
    }

    public final boolean resumeRecordFetching() {
        if (!this.recordFetchingPaused.compareAndSet(true, false)) {
            return false;
        }
        this.runOnKafkaWorkerThread(v -> {
            Set<org.apache.kafka.common.TopicPartition> partitions = this.getUnderlyingConsumer().assignment();
            if (!partitions.isEmpty()) {
                this.getUnderlyingConsumer().resume(partitions);
            }
        });
        return true;
    }

    public final boolean isRecordFetchingPaused() {
        return this.recordFetchingPaused.get();
    }

    public final boolean pauseRecordHandlingAndPolling(Duration timeout) {
        if (!this.pollingPaused.compareAndSet(false, true)) {
            return false;
        }
        this.pollPauseTimeoutTimerId = this.vertx.setTimer(timeout.toMillis(), tid -> {
            this.pollPauseTimeoutTimerId = null;
            if (this.resumeRecordHandlingAndPolling()) {
                LOG.debug("resumed consumer record polling - timeout of {}ms was reached [client-id: {}]", (Object)timeout.toMillis(), (Object)this.getClientId());
            }
        });
        this.getKafkaConsumer().pause();
        return true;
    }

    public final boolean resumeRecordHandlingAndPolling() {
        if (!this.pollingPaused.compareAndSet(true, false)) {
            return false;
        }
        if (this.pollPauseTimeoutTimerId != null) {
            this.vertx.cancelTimer(this.pollPauseTimeoutTimerId);
            this.pollPauseTimeoutTimerId = null;
        }
        this.getKafkaConsumer().resume();
        return true;
    }

    public final boolean isRecordHandlingAndPollingPaused() {
        return this.pollingPaused.get();
    }

    protected final KafkaConsumer<String, V> getKafkaConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        return this.kafkaConsumer;
    }

    protected final Consumer<String, V> getUnderlyingConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        return this.kafkaConsumer.asStream().unwrap();
    }

    protected final String getClientId() {
        return this.consumerConfig.get("client.id");
    }

    @Override
    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        readinessHandler.register("kafka-consumer[%s]-creation".formatted(this.getClientId()), status -> {
            if (this.lifecycleStatus.isStarted()) {
                status.tryComplete(Status.OK());
            } else {
                JsonObject data = new JsonObject();
                if (this.lifecycleStatus.isStarting()) {
                    if (this.kafkaConsumer == null) {
                        LOG.debug("readiness check failed, consumer not created yet (Kafka server URL possibly not resolvable (yet)) [client-id: {}]", (Object)this.getClientId());
                        data.put("status", "consumer not created yet (Kafka server URL possibly not resolvable (yet))");
                    } else {
                        LOG.debug("readiness check failed, consumer initialization not finished yet [client-id: {}]", (Object)this.getClientId());
                        data.put("status", "consumer initialization not finished yet");
                    }
                }
                status.tryComplete(Status.KO(data));
            }
        });
    }

    public final HealthCheckResponse checkReadiness() {
        return HealthCheckResponse.builder().name("kafka-consumer-status").status(this.lifecycleStatus.isStarted()).build();
    }

    private Future<KafkaConsumer<String, V>> initConsumer(KafkaConsumer<String, V> consumer) {
        Promise initResult = Promise.promise();
        Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.registerKafkaConsumer(consumer.unwrap()));
        consumer.handler(receivedRecord -> {
            if (!initResult.future().isComplete() && LOG.isDebugEnabled()) {
                LOG.debug("postponing record handling until consumer has been initialized [topic: {}, partition: {}, offset: {}]", receivedRecord.topic(), receivedRecord.partition(), receivedRecord.offset());
            }
            initResult.future().onSuccess(ok -> {
                if (this.respectTtl && KafkaRecordHelper.isTtlElapsed(receivedRecord.headers())) {
                    this.onRecordHandlerSkippedForExpiredRecord((KafkaConsumerRecord<String, V>)receivedRecord);
                } else {
                    try {
                        this.recordHandler.handle((KafkaConsumerRecord<String, V>)receivedRecord);
                    }
                    catch (Exception e) {
                        LOG.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}]", receivedRecord.topic(), receivedRecord.partition(), receivedRecord.offset(), receivedRecord.headers(), e);
                    }
                }
            });
        });
        consumer.batchHandler(this::onBatchOfRecordsReceived);
        consumer.exceptionHandler(error -> LOG.error("consumer error occurred [client-id: {}]", (Object)this.getClientId(), error));
        this.installRebalanceListeners();
        consumer.asStream().pollTimeout(Duration.ofMillis(10L));
        this.initSubscriptionAndWaitForRebalance().onSuccess(ok -> {
            consumer.asStream().pollTimeout(this.pollTimeout);
            this.logSubscribedTopicsWhenConsumerIsReady();
            initResult.complete(consumer);
        }).onFailure(initResult::fail);
        return initResult.future();
    }

    @Override
    public Future<Void> start() {
        if (this.recordHandler == null) {
            throw new IllegalStateException("Record handler must be set");
        }
        if (this.lifecycleStatus.isStarting()) {
            LOG.debug("already starting consumer");
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture(new IllegalStateException("consumer is already started/stopping"));
        }
        this.context = this.vertx.getOrCreateContext();
        Supplier<KafkaConsumer> consumerSupplier = () -> Optional.ofNullable(this.kafkaConsumerSupplier).map(s -> KafkaConsumer.create(this.vertx, (Consumer)s.get())).orElseGet(() -> KafkaConsumer.create(this.vertx, this.consumerConfig));
        this.runOnContext(v -> {
            KafkaClientFactory kafkaClientFactory = new KafkaClientFactory(this.vertx);
            kafkaClientFactory.createClientWithRetries(consumerSupplier, this.lifecycleStatus::isStarting, this.consumerConfig.get("bootstrap.servers"), this.consumerCreationRetriesTimeout).onFailure(t -> LOG.error("error creating consumer [client-id: {}]", (Object)this.getClientId(), t)).onSuccess(consumer -> {
                this.kafkaConsumer = consumer;
            }).compose(this::initConsumer).onSuccess(c -> this.lifecycleStatus.setStarted());
        });
        return Future.succeededFuture();
    }

    private void logSubscribedTopicsWhenConsumerIsReady() {
        if (this.topicPattern != null) {
            if (this.subscribedTopicPatternTopics.size() <= 5) {
                LOG.debug("consumer started, subscribed to topic pattern [{}], matching topics: {}", (Object)this.topicPattern, (Object)this.subscribedTopicPatternTopics);
            } else {
                LOG.debug("consumer started, subscribed to topic pattern [{}], matching {} topics", (Object)this.topicPattern, (Object)this.subscribedTopicPatternTopics.size());
            }
        } else {
            LOG.debug("consumer started, subscribed to topics {}", (Object)this.topics);
        }
    }

    protected void onBatchOfRecordsReceived(KafkaConsumerRecords<String, V> records) {
    }

    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, V> record) {
    }

    private void installRebalanceListeners() {
        this.rebalanceListener = new ConsumerRebalanceListener(){

            @Override
            public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set<TopicPartition> partitionsSet = Helper.from(partitions);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("partitions assigned: [{}] [client-id: {}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), (Object)HonoKafkaConsumer.this.getClientId());
                }
                HonoKafkaConsumer.this.ensurePositionsHaveBeenSetIfNeeded(partitionsSet);
                HonoKafkaConsumer.this.updateSubscribedTopicPatternTopicsAndRemoveMetrics();
                if (HonoKafkaConsumer.this.recordFetchingPaused.get()) {
                    HonoKafkaConsumer.this.getUnderlyingConsumer().pause(partitions);
                }
                HonoKafkaConsumer.this.onPartitionsAssignedBlocking(partitionsSet);
                Set allAssignedPartitions = Optional.ofNullable(HonoKafkaConsumer.this.onRebalanceDoneHandler).map(h -> Helper.from(HonoKafkaConsumer.this.getUnderlyingConsumer().assignment())).orElse(null);
                HonoKafkaConsumer.this.context.runOnContext(v -> {
                    HonoKafkaConsumer.this.onPartitionsAssigned(partitionsSet);
                    if (HonoKafkaConsumer.this.onRebalanceDoneHandler != null) {
                        HonoKafkaConsumer.this.onRebalanceDoneHandler.handle(allAssignedPartitions);
                    }
                });
            }

            @Override
            public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set<TopicPartition> partitionsSet = Helper.from(partitions);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("partitions revoked: [{}] [client-id: {}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), (Object)HonoKafkaConsumer.this.getClientId());
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet));
            }

            @Override
            public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set<TopicPartition> partitionsSet = Helper.from(partitions);
                if (LOG.isInfoEnabled()) {
                    LOG.info("partitions lost: [{}] [client-id: {}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), (Object)HonoKafkaConsumer.this.getClientId());
                }
                HonoKafkaConsumer.this.failAllSubscriptionUpdateTrackers(new ServerErrorException(503, "consumer error occurred"));
                HonoKafkaConsumer.this.onPartitionsLostBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsLost(partitionsSet));
            }
        };
        this.replaceRebalanceListener(this.kafkaConsumer, this.rebalanceListener);
    }

    private void ensurePositionsHaveBeenSetIfNeeded(Set<TopicPartition> assignedPartitions) {
        if (!assignedPartitions.isEmpty() && this.isAutoOffsetResetConfigLatest()) {
            LOG.trace("checking positions for {} newly assigned partitions...", (Object)assignedPartitions.size());
            Set<org.apache.kafka.common.TopicPartition> partitions = Helper.to(assignedPartitions);
            try {
                ArrayList<org.apache.kafka.common.TopicPartition> outOfRangeOffsetPartitions = new ArrayList<org.apache.kafka.common.TopicPartition>();
                Map<org.apache.kafka.common.TopicPartition, Long> beginningOffsets = this.getUnderlyingConsumer().beginningOffsets(partitions);
                partitions.forEach(partition -> {
                    long position = this.getUnderlyingConsumer().position((org.apache.kafka.common.TopicPartition)partition);
                    Long beginningOffset = (Long)beginningOffsets.get(partition);
                    if (beginningOffset != null && position < beginningOffset) {
                        LOG.debug("committed offset {} for [{}] is smaller than beginning offset, resetting it to the beginning offset {}", position, partition, beginningOffset);
                        this.getUnderlyingConsumer().seek((org.apache.kafka.common.TopicPartition)partition, beginningOffset);
                        outOfRangeOffsetPartitions.add((org.apache.kafka.common.TopicPartition)partition);
                    }
                });
                if (!outOfRangeOffsetPartitions.isEmpty() && LOG.isInfoEnabled()) {
                    LOG.info("found out-of-range committed offsets, corresponding records having already been deleted; positions were reset to beginning offsets; partitions: [{}] [client-id: {}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(outOfRangeOffsetPartitions), (Object)this.getClientId());
                }
            }
            catch (Exception e) {
                LOG.error("error checking positions for {} newly assigned partitions [client-id: {}]", assignedPartitions.size(), this.getClientId(), e);
            }
            LOG.trace("done checking positions for {} newly assigned partitions", (Object)assignedPartitions.size());
        }
    }

    protected final boolean isCooperativeRebalancingConfigured() {
        return Optional.ofNullable(this.consumerConfig.get("partition.assignment.strategy")).map(value -> value.equals(CooperativeStickyAssignor.class.getName())).orElse(false);
    }

    protected final boolean isAutoOffsetResetConfigLatest() {
        return Optional.ofNullable(this.consumerConfig.get("auto.offset.reset")).map(value -> value.equals("latest")).orElse(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateSubscribedTopicPatternTopicsAndRemoveMetrics() {
        if (this.topicPattern != null) {
            Set<String> oldSubscribedTopicPatternTopics = this.subscribedTopicPatternTopics;
            try {
                this.subscribedTopicPatternTopics = new HashSet<String>(this.getUnderlyingConsumer().subscription());
                if (LOG.isTraceEnabled()) {
                    LOG.trace("subscribed topics: {}", (Object)String.join((CharSequence)", ", this.subscribedTopicPatternTopics));
                }
            }
            catch (Exception e) {
                LOG.warn("error getting subscription", e);
            }
            Map<String, SubscriptionUpdateTracker> e = this.subscriptionUpdateTrackersForToBeAddedTopics;
            synchronized (e) {
                if (!this.subscriptionUpdateTrackersForToBeAddedTopics.isEmpty()) {
                    ArrayList<Handler<Void>> trackerCompletionHandlers = new ArrayList<Handler<Void>>();
                    Iterator<SubscriptionUpdateTracker> iter = this.subscriptionUpdateTrackersForToBeAddedTopics.values().iterator();
                    while (iter.hasNext()) {
                        SubscriptionUpdateTracker tracker = iter.next();
                        if (tracker.isContainedInSubscribedTopicsAfterRebalance(this.subscribedTopicPatternTopics)) {
                            LOG.trace("topic [{}] is now in subscribed topics list", (Object)tracker.getTopicName());
                            trackerCompletionHandlers.add(v -> tracker.complete());
                            iter.remove();
                            continue;
                        }
                        if (tracker.hasRebalancesLeft()) {
                            LOG.debug("topic [{}] is not in subscribed topics list, will wait for another rebalance", (Object)tracker.getTopicName());
                            continue;
                        }
                        LOG.info("topic [{}] is still not in subscribed topics list, not waiting for additional rebalance anymore", (Object)tracker.getTopicName());
                        trackerCompletionHandlers.add(v -> tracker.fail());
                        iter.remove();
                    }
                    if (!trackerCompletionHandlers.isEmpty()) {
                        trackerCompletionHandlers.forEach(handler -> this.runOnContext(v -> handler.handle(null)));
                    }
                    if (!this.subscriptionUpdateTrackersForToBeAddedTopics.isEmpty()) {
                        this.triggerTopicPatternSubscriptionUpdate();
                    }
                }
            }
            Set deletedTopics = oldSubscribedTopicPatternTopics.stream().filter(t -> !this.subscribedTopicPatternTopics.contains(t)).collect(Collectors.toSet());
            if (!deletedTopics.isEmpty()) {
                this.runOnContext(v -> this.vertx.setTimer(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, tid -> this.runOnKafkaWorkerThread(v2 -> this.removeMetricsForDeletedTopics(deletedTopics.stream().filter(t -> !this.subscribedTopicPatternTopics.contains(t))))));
            }
        }
    }

    private Future<Void> initSubscriptionAndWaitForRebalance() {
        if (this.lifecycleStatus.isStopping() || this.lifecycleStatus.isStopped()) {
            return Future.failedFuture(new ServerErrorException(503, "already stopped"));
        }
        Promise partitionAssignmentDone = Promise.promise();
        this.initialPartitionAssignmentDonePromiseRef.set(partitionAssignmentDone);
        Promise<Void> subscribeDonePromise = Promise.promise();
        if (this.topicPattern != null) {
            LOG.debug("subscribing to topics matching pattern: {}", (Object)this.topicPattern.pattern());
            this.kafkaConsumer.subscribe(this.topicPattern, subscribeDonePromise);
        } else {
            this.topics.forEach(topic -> this.kafkaConsumer.partitionsFor((String)topic).onSuccess(partitions -> {
                if (partitions.isEmpty()) {
                    LOG.info("subscription topic doesn't exist as of now: {} [client-id: {}]", topic, (Object)this.getClientId());
                }
            }));
            this.kafkaConsumer.subscribe(this.topics, subscribeDonePromise);
        }
        this.kafkaConsumerWorker = this.getKafkaConsumerWorker(this.kafkaConsumer);
        return Future.all(subscribeDonePromise.future(), partitionAssignmentDone.future()).mapEmpty();
    }

    protected void onPartitionsAssignedBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsAssigned(Set<TopicPartition> partitionsSet) {
        Optional.ofNullable(this.initialPartitionAssignmentDonePromiseRef.getAndSet(null)).ifPresent(Promise::tryComplete);
        if (this.onPartitionsAssignedHandler != null) {
            this.onPartitionsAssignedHandler.handle(partitionsSet);
        }
    }

    protected void onPartitionsRevokedBlocking(Set<TopicPartition> partitionsSet) {
    }

    protected void onPartitionsLostBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsRevoked(Set<TopicPartition> partitionsSet) {
        if (this.onPartitionsRevokedHandler != null) {
            this.onPartitionsRevokedHandler.handle(partitionsSet);
        }
    }

    private void onPartitionsLost(Set<TopicPartition> partitionsSet) {
        if (this.onPartitionsLostHandler != null) {
            this.onPartitionsLostHandler.handle(partitionsSet);
        }
    }

    @Override
    public Future<Void> stop() {
        return this.lifecycleStatus.runStopAttempt(() -> {
            if (this.pollPauseTimeoutTimerId != null) {
                this.vertx.cancelTimer(this.pollPauseTimeoutTimerId);
                this.pollPauseTimeoutTimerId = null;
            }
            return Optional.ofNullable(this.kafkaConsumer).map(consumer -> consumer.close().onSuccess(ok -> LOG.info("Kafka consumer stopped successfully")).onComplete(ar -> Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.unregisterKafkaConsumer(this.kafkaConsumer.unwrap())))).orElseGet(Future::succeededFuture).onFailure(t -> LOG.info("error stopping Kafka consumer", (Throwable)t));
        });
    }

    protected void runOnContext(Handler<Void> codeToRun) {
        Objects.requireNonNull(codeToRun);
        if (this.context != Vertx.currentContext()) {
            this.context.runOnContext(go -> codeToRun.handle(null));
        } else {
            codeToRun.handle(null);
        }
    }

    protected void runOnKafkaWorkerThread(Handler<Void> handler) {
        Objects.requireNonNull(handler);
        if (this.kafkaConsumerWorker == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        if (this.lifecycleStatus.isStarted()) {
            this.kafkaConsumerWorker.submit(() -> {
                if (this.lifecycleStatus.isStarted()) {
                    try {
                        handler.handle(null);
                    }
                    catch (Exception ex) {
                        LOG.error("error running task on Kafka worker thread [client-id: {}]", (Object)this.getClientId(), (Object)ex);
                    }
                }
            });
        }
    }

    public final Set<String> getSubscribedTopicPatternTopics() {
        if (this.topicPattern == null) {
            return Set.of();
        }
        return new HashSet<String>(this.subscribedTopicPatternTopics);
    }

    public final boolean isAmongKnownSubscribedTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            return this.topics.contains(topic);
        }
        return this.subscribedTopicPatternTopics.contains(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            throw new IllegalStateException("consumer doesn't use topic pattern");
        }
        if (!this.topicPattern.matcher(topic).find()) {
            throw new IllegalArgumentException("topic doesn't match pattern");
        }
        if (!this.lifecycleStatus.isStarted()) {
            return Future.failedFuture(new ServerErrorException(500, "not started"));
        }
        if (this.subscribedTopicPatternTopics.contains(topic)) {
            LOG.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", (Object)topic);
            return Future.succeededFuture();
        }
        Map<String, SubscriptionUpdateTracker> map = this.subscriptionUpdateTrackersForToBeAddedTopics;
        synchronized (map) {
            SubscriptionUpdateTracker tracker = new SubscriptionUpdateTracker(topic);
            return Optional.ofNullable(this.subscriptionUpdateTrackersForToBeAddedTopics.putIfAbsent(topic, tracker)).map(previousTopicTracker -> {
                LOG.debug("ensureTopicIsAmongSubscribedTopics: will wait for ongoing invocation to complete [{}]", (Object)topic);
                return previousTopicTracker.outcome();
            }).orElseGet(() -> {
                this.triggerTopicPatternSubscriptionUpdate();
                return tracker.outcome();
            });
        }
    }

    private void triggerTopicPatternSubscriptionUpdate() {
        if (!this.subscriptionUpdateTriggered.compareAndSet(false, true)) {
            LOG.debug("ensureTopicIsAmongSubscribedTopics: subscription update already triggered");
            return;
        }
        this.runOnKafkaWorkerThread(v -> {
            this.subscriptionUpdateTriggered.set(false);
            Map<String, SubscriptionUpdateTracker> map = this.subscriptionUpdateTrackersForToBeAddedTopics;
            synchronized (map) {
                if (this.subscriptionUpdateTrackersForToBeAddedTopics.isEmpty()) {
                    return;
                }
                Iterator<SubscriptionUpdateTracker> iter = this.subscriptionUpdateTrackersForToBeAddedTopics.values().iterator();
                while (iter.hasNext()) {
                    SubscriptionUpdateTracker tracker = iter.next();
                    LOG.trace("triggerTopicPatternSubscriptionUpdate: check for topic [{}]", (Object)tracker.getTopicName());
                    try {
                        if (!this.getUnderlyingConsumer().partitionsFor(tracker.getTopicName()).isEmpty()) continue;
                        LOG.debug("triggerTopicPatternSubscriptionUpdate: topic doesn't exist yet: {}", (Object)tracker.getTopicName());
                    }
                    catch (Exception e) {
                        LOG.warn("triggerTopicPatternSubscriptionUpdate: error getting partitions for topic [{}]", (Object)tracker.getTopicName(), (Object)e);
                        iter.remove();
                        this.runOnContext(v2 -> tracker.fail(e));
                    }
                }
                if (!this.subscriptionUpdateTrackersForToBeAddedTopics.isEmpty()) {
                    LOG.trace("triggerTopicPatternSubscriptionUpdate: subscribe");
                    try {
                        LOG.info("triggering refresh of subscribed topic list ...");
                        this.getUnderlyingConsumer().subscribe(this.topicPattern, this.rebalanceListener);
                        if (!this.metadataMaxAge.isPresent() || this.metadataMaxAge.get() > 500) {
                            LOG.info("enforcing rebalance on next poll()");
                            this.getUnderlyingConsumer().enforceRebalance("trigger assignment of new topic partitions");
                        }
                    }
                    catch (Exception e) {
                        LOG.warn("triggerTopicPatternSubscriptionUpdate: error updating subscription", e);
                        this.failAllSubscriptionUpdateTrackers(e);
                    }
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failAllSubscriptionUpdateTrackers(Exception failure) {
        ArrayList<SubscriptionUpdateTracker> toBeFailedTrackers = new ArrayList<SubscriptionUpdateTracker>();
        Map<String, SubscriptionUpdateTracker> map = this.subscriptionUpdateTrackersForToBeAddedTopics;
        synchronized (map) {
            toBeFailedTrackers.addAll(this.subscriptionUpdateTrackersForToBeAddedTopics.values());
            this.subscriptionUpdateTrackersForToBeAddedTopics.clear();
        }
        toBeFailedTrackers.forEach(tracker -> this.runOnContext(v -> tracker.fail(failure)));
    }

    private void removeMetricsForDeletedTopics(Stream<String> deletedTopics) {
        Metrics metrics = this.getInternalMetricsObject(this.kafkaConsumer.unwrap());
        if (metrics != null) {
            deletedTopics.forEach(topic -> {
                metrics.removeSensor("topic." + topic + ".bytes-fetched");
                metrics.removeSensor("topic." + topic + ".records-fetched");
            });
        }
    }

    private Metrics getInternalMetricsObject(Consumer<String, V> consumer) {
        if (consumer instanceof org.apache.kafka.clients.consumer.KafkaConsumer) {
            try {
                Field field = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("metrics");
                field.setAccessible(true);
                return (Metrics)field.get(consumer);
            }
            catch (Exception e) {
                LOG.warn("failed to get metrics object", e);
            }
        }
        return null;
    }

    private void replaceRebalanceListener(KafkaConsumer<String, V> consumer, ConsumerRebalanceListener listener) {
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("rebalanceListener");
            field.setAccessible(true);
            field.set(consumer.asStream(), listener);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to adapt rebalance listener", e);
        }
    }

    private ExecutorService getKafkaConsumerWorker(KafkaConsumer<String, V> consumer) {
        ExecutorService worker;
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("worker");
            field.setAccessible(true);
            worker = (ExecutorService)field.get(consumer.asStream());
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to get worker", e);
        }
        if (worker == null) {
            throw new IllegalStateException("worker not set");
        }
        return worker;
    }

    private final class SubscriptionUpdateTracker {
        private final Promise<Void> outcome = Promise.promise();
        private final String topicName;
        private final AtomicInteger rebalancesLeft = new AtomicInteger(10);

        SubscriptionUpdateTracker(String topicName) {
            this.topicName = Objects.requireNonNull(topicName);
        }

        Future<Void> outcome() {
            return this.outcome.future();
        }

        String getTopicName() {
            return this.topicName;
        }

        boolean isContainedInSubscribedTopicsAfterRebalance(Set<String> topics) {
            this.rebalancesLeft.decrementAndGet();
            return topics.contains(this.topicName);
        }

        boolean hasRebalancesLeft() {
            return this.rebalancesLeft.get() > 0;
        }

        void complete() {
            this.outcome.tryComplete();
        }

        void fail() {
            this.fail(new ServerErrorException(503, "could not create topic %s".formatted(this.topicName)));
        }

        void fail(Throwable cause) {
            this.outcome.tryFail(cause);
        }
    }
}

