/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumer;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumerHelper;
import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncHandlingAutoCommitKafkaConsumer<V>
extends HonoKafkaConsumer<V> {
    public static final String CONFIG_HONO_OFFSETS_SKIP_RECOMMIT_PERIOD_SECONDS = "hono.offsets.skip.recommit.period.seconds";
    public static final String CONFIG_HONO_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT_MILLIS = "hono.offsets.commit.record.completion.timeout.millis";
    public static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5L);
    public static final Duration DEFAULT_OFFSETS_SKIP_RECOMMIT_PERIOD = Duration.ofHours(1L);
    public static final Duration DEFAULT_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT = Duration.ofMillis(300L);
    public static final Duration MAX_POLL_PAUSE = Duration.ofMillis(200L);
    public static final int THROTTLING_THRESHOLD_PERCENTAGE_OF_MAX_POLL_RECORDS = 50;
    private static final Logger LOG = LoggerFactory.getLogger(AsyncHandlingAutoCommitKafkaConsumer.class);
    private final int throttlingThreshold;
    private final int throttlingResumeDelta;
    private final long commitIntervalMillis;
    private final long skipOffsetRecommitPeriodSeconds;
    private final long offsetsCommitRecordCompletionTimeoutMillis;
    private final Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> offsetsMap = new HashMap<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets>();
    private final Map<org.apache.kafka.common.TopicPartition, Long> lastKnownCommittedOffsets = new HashMap<org.apache.kafka.common.TopicPartition, Long>();
    private final AtomicBoolean periodicCommitInvocationInProgress = new AtomicBoolean();
    private final AtomicBoolean periodicCommitRetryAfterRebalanceNeeded = new AtomicBoolean();
    private final AtomicBoolean skipPeriodicCommit = new AtomicBoolean();
    private final AtomicInteger recordsInProcessingCounter = new AtomicInteger();
    private final AtomicInteger recordsLeftInBatchCounter = new AtomicInteger();
    private final AtomicReference<UncompletedRecordsCompletionLatch> uncompletedRecordsCompletionLatchRef = new AtomicReference();
    private Instant fetchingPauseStartTime = Instant.MAX;
    private Long periodicCommitTimerId;
    private Instant lastPollInstant = Instant.EPOCH;

    public AsyncHandlingAutoCommitKafkaConsumer(Vertx vertx, Set<String> topics, Function<KafkaConsumerRecord<String, V>, Future<Void>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, topics, null, recordHandler, consumerConfig);
    }

    public AsyncHandlingAutoCommitKafkaConsumer(Vertx vertx, Pattern topicPattern, Function<KafkaConsumerRecord<String, V>, Future<Void>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, null, topicPattern, recordHandler, consumerConfig);
    }

    private AsyncHandlingAutoCommitKafkaConsumer(Vertx vertx, Set<String> topics, Pattern topicPattern, Function<KafkaConsumerRecord<String, V>, Future<Void>> recordHandler, Map<String, String> consumerConfig) {
        super(vertx, topics, topicPattern, AsyncHandlingAutoCommitKafkaConsumer.validateAndAdaptConsumerConfig(consumerConfig));
        this.setRecordHandler(record -> this.handleRecord((KafkaConsumerRecord<String, V>)record, recordHandler));
        int maxPollRecords = this.getMaxPollRecordsConfig(consumerConfig);
        this.throttlingThreshold = Math.max(maxPollRecords * 50 / 100, 1);
        this.throttlingResumeDelta = this.throttlingThreshold * 5 / 100;
        this.commitIntervalMillis = AsyncHandlingAutoCommitKafkaConsumer.getCommitInterval(consumerConfig);
        this.skipOffsetRecommitPeriodSeconds = AsyncHandlingAutoCommitKafkaConsumer.getSkipOffsetRecommitPeriodSeconds(consumerConfig);
        this.offsetsCommitRecordCompletionTimeoutMillis = AsyncHandlingAutoCommitKafkaConsumer.getOffsetsCommitRecordCompletionTimeoutMillis(consumerConfig);
    }

    @Override
    protected final void onBatchOfRecordsReceived(KafkaConsumerRecords<String, V> records) {
        this.recordsLeftInBatchCounter.set(records.size());
        this.lastPollInstant = Instant.now();
    }

    private void handleRecord(KafkaConsumerRecord<String, V> record, Function<KafkaConsumerRecord<String, V>, Future<Void>> recordHandler) {
        int recordsLeftInBatch = this.recordsLeftInBatchCounter.decrementAndGet();
        int recordsInProcessing = this.recordsInProcessingCounter.incrementAndGet();
        if (recordsInProcessing >= this.throttlingThreshold) {
            if (this.lastPollInstant.plus(MAX_POLL_PAUSE).isAfter(Instant.now()) && (recordsLeftInBatch > 0 || recordsInProcessing == this.throttlingThreshold) && this.pauseRecordHandlingAndPolling(MAX_POLL_PAUSE)) {
                LOG.debug("paused consumer record handling/polling; no. of records in processing: {}, throttling threshold: {} [client-id: {}]", recordsInProcessing, this.throttlingThreshold, this.getClientId());
            } else if (recordsLeftInBatch == 0 && this.pauseRecordFetching()) {
                LOG.info("suspending record fetching; no. of records in processing: {}, throttling threshold: {} [client-id: {}]", recordsInProcessing, this.throttlingThreshold, this.getClientId());
                this.fetchingPauseStartTime = Instant.now();
            }
        }
        org.apache.kafka.common.TopicPartition topicPartition = new org.apache.kafka.common.TopicPartition(record.topic(), record.partition());
        OffsetsQueueEntry offsetsQueueEntry = this.setRecordReceived(record.offset(), topicPartition);
        try {
            recordHandler.apply(record).onComplete(ar -> this.setRecordHandlingComplete(offsetsQueueEntry, topicPartition));
        }
        catch (Exception e) {
            LOG.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}] [client-id: {}]", record.topic(), record.partition(), record.offset(), record.headers(), this.getClientId(), e);
            this.setRecordHandlingComplete(offsetsQueueEntry, topicPartition);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setRecordHandlingComplete(OffsetsQueueEntry offsetsQueueEntry, org.apache.kafka.common.TopicPartition topicPartition) {
        offsetsQueueEntry.setHandlingComplete();
        AtomicReference<UncompletedRecordsCompletionLatch> atomicReference = this.uncompletedRecordsCompletionLatchRef;
        synchronized (atomicReference) {
            Optional.ofNullable(this.uncompletedRecordsCompletionLatchRef.get()).ifPresent(latch -> latch.onRecordHandlingCompleted(topicPartition));
        }
        int recordsInProcessing = this.recordsInProcessingCounter.decrementAndGet();
        if (recordsInProcessing <= this.throttlingThreshold && this.resumeRecordFetching()) {
            LOG.info("resumed consumer record fetching after {}ms; current no. of records in processing: {} [client-id: {}]", Duration.between(this.fetchingPauseStartTime, Instant.now()).toMillis(), recordsInProcessing, this.getClientId());
        } else if (recordsInProcessing <= this.throttlingThreshold - this.throttlingResumeDelta && this.resumeRecordHandlingAndPolling()) {
            LOG.debug("resumed consumer record polling; current no. of records in processing: {} [client-id: {}]", (Object)recordsInProcessing, (Object)this.getClientId());
        }
    }

    private static Map<String, String> validateAndAdaptConsumerConfig(Map<String, String> consumerConfig) {
        if (Strings.isNullOrEmpty(consumerConfig.get("group.id"))) {
            throw new IllegalArgumentException("group.id config entry has to be set");
        }
        consumerConfig.put("enable.auto.commit", "false");
        return consumerConfig;
    }

    private int getMaxPollRecordsConfig(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get("max.poll.records")).map(Integer::parseInt).orElse(500);
    }

    private static long getCommitInterval(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get("auto.commit.interval.ms")).map(Long::parseLong).orElse(DEFAULT_COMMIT_INTERVAL.toMillis());
    }

    private static long getSkipOffsetRecommitPeriodSeconds(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get(CONFIG_HONO_OFFSETS_SKIP_RECOMMIT_PERIOD_SECONDS)).map(Long::parseLong).orElse(DEFAULT_OFFSETS_SKIP_RECOMMIT_PERIOD.toSeconds());
    }

    private static long getOffsetsCommitRecordCompletionTimeoutMillis(Map<String, String> consumerConfig) {
        return Optional.ofNullable(consumerConfig.get(CONFIG_HONO_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT_MILLIS)).map(Long::parseUnsignedLong).orElse(DEFAULT_OFFSETS_COMMIT_RECORD_COMPLETION_TIMEOUT.toMillis());
    }

    @Override
    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, V> record) {
        OffsetsQueueEntry queueEntry = this.setRecordReceived(record.offset(), new org.apache.kafka.common.TopicPartition(record.topic(), record.partition()));
        queueEntry.setHandlingComplete();
    }

    @Override
    public Future<Void> start() {
        this.addOnKafkaConsumerReadyHandler(ready -> {
            this.periodicCommitTimerId = this.vertx.setPeriodic(this.commitIntervalMillis, tid -> this.doPeriodicCommit());
        });
        return super.start();
    }

    @Override
    public Future<Void> stop() {
        if (this.periodicCommitTimerId != null) {
            this.vertx.cancelTimer(this.periodicCommitTimerId);
        }
        return super.stop().onComplete(v -> this.clearObsoleteTopicPartitionOffsets(List.of()));
    }

    @Override
    protected void onPartitionsAssignedBlocking(Set<TopicPartition> partitionsSet) {
        this.clearObsoleteTopicPartitionOffsets(this.getUnderlyingConsumer().assignment());
        if (this.topicPattern != null) {
            Set<String> subscribedTopicPatternTopics = this.getSubscribedTopicPatternTopics();
            this.lastKnownCommittedOffsets.entrySet().removeIf(entry -> !subscribedTopicPatternTopics.contains(((org.apache.kafka.common.TopicPartition)entry.getKey()).topic()));
        }
        if (!partitionsSet.isEmpty() && this.isAutoOffsetResetConfigLatest()) {
            this.ensureOffsetCommitsExistForNewlyAssignedPartitions(partitionsSet);
        }
        this.skipPeriodicCommit.set(false);
        if (this.periodicCommitRetryAfterRebalanceNeeded.get()) {
            this.runOnContext(v -> {
                if (this.periodicCommitRetryAfterRebalanceNeeded.compareAndSet(true, false)) {
                    this.doPeriodicCommit();
                }
            });
        }
    }

    private synchronized void ensureOffsetCommitsExistForNewlyAssignedPartitions(Set<TopicPartition> partitionsSet) {
        ArrayList<org.apache.kafka.common.TopicPartition> partitionsForNextCommit = new ArrayList<org.apache.kafka.common.TopicPartition>();
        Set<org.apache.kafka.common.TopicPartition> partitionsToCheckCommittedOffsetsFor = partitionsSet.stream().map(Helper::to).filter(partition -> !this.offsetsMap.containsKey(partition) && this.lastKnownCommittedOffsets.get(partition) == null).collect(Collectors.toSet());
        this.fetchCommittedOffsetsOnPartitionsAssigned(partitionsToCheckCommittedOffsetsFor);
        partitionsSet.stream().map(Helper::to).filter(partition -> !this.offsetsMap.containsKey(partition)).forEach(partition -> {
            try {
                long position = this.getUnderlyingConsumer().position((org.apache.kafka.common.TopicPartition)partition);
                boolean positionCommitted = Optional.ofNullable(this.lastKnownCommittedOffsets.get(partition)).map(committedPos -> committedPos.equals(position)).orElse(false);
                if (!positionCommitted) {
                    partitionsForNextCommit.add((org.apache.kafka.common.TopicPartition)partition);
                }
                this.offsetsMap.put((org.apache.kafka.common.TopicPartition)partition, new TopicPartitionOffsets((org.apache.kafka.common.TopicPartition)partition, position, positionCommitted));
            }
            catch (Exception ex) {
                LOG.warn("error fetching position for newly assigned partition [{}] [client-id: {}]", partition, this.getClientId(), ex);
            }
        });
        if (LOG.isDebugEnabled() && !partitionsForNextCommit.isEmpty()) {
            LOG.debug("onPartitionsAssigned: partitions to be part of next offset commit: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitionsForNextCommit));
        }
    }

    private void fetchCommittedOffsetsOnPartitionsAssigned(Set<org.apache.kafka.common.TopicPartition> partitions) {
        if (!partitions.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("onPartitionsAssigned: fetching committed offsets for [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
            }
            try {
                this.getUnderlyingConsumer().committed(partitions).forEach((partition, position) -> {
                    if (position != null) {
                        this.lastKnownCommittedOffsets.put((org.apache.kafka.common.TopicPartition)partition, position.offset());
                    } else {
                        this.lastKnownCommittedOffsets.remove(partition);
                    }
                });
            }
            catch (Exception ex) {
                LOG.warn("error fetching committed offsets for newly assigned partitions [{}] [client-id: {}]", HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), this.getClientId(), ex);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void onPartitionsRevokedBlocking(Set<TopicPartition> partitionsSet) {
        this.skipPeriodicCommit.set(true);
        if (!partitionsSet.isEmpty() && this.offsetsCommitRecordCompletionTimeoutMillis > 0L) {
            UncompletedRecordsCompletionLatch latch = null;
            AtomicReference<UncompletedRecordsCompletionLatch> atomicReference = this.uncompletedRecordsCompletionLatchRef;
            synchronized (atomicReference) {
                Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> uncompletedRecordsPartitions = this.getUncompletedRecordsPartitions(Helper.to(partitionsSet));
                if (!uncompletedRecordsPartitions.isEmpty()) {
                    LOG.info("init latch to wait up to {}ms for the completion of record handling concerning {} [client-id: {}]", this.offsetsCommitRecordCompletionTimeoutMillis, uncompletedRecordsPartitions.size() <= 10 ? uncompletedRecordsPartitions.keySet() : uncompletedRecordsPartitions.size() + " partitions", this.getClientId());
                    latch = new UncompletedRecordsCompletionLatch(uncompletedRecordsPartitions);
                    this.uncompletedRecordsCompletionLatchRef.set(latch);
                }
            }
            if (latch != null) {
                try {
                    if (latch.await(this.offsetsCommitRecordCompletionTimeoutMillis, TimeUnit.MILLISECONDS)) {
                        LOG.trace("latch to wait for the completion of record handling was released in time");
                    } else {
                        LOG.info("timed out waiting for record handling to finish after {}ms [client-id: {}]", (Object)this.offsetsCommitRecordCompletionTimeoutMillis, (Object)this.getClientId());
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                finally {
                    this.uncompletedRecordsCompletionLatchRef.set(null);
                }
            }
        }
        this.commitOffsetsSync();
    }

    private void commitOffsetsSync() {
        if (Vertx.currentContext() != null) {
            throw new IllegalStateException("must be run on the polling thread");
        }
        Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets = this.getOffsetsToCommit();
        if (!offsets.isEmpty()) {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("commitSync; offsets: [{}]", (Object)HonoKafkaConsumerHelper.getOffsetsDebugString(offsets));
                }
                this.getUnderlyingConsumer().commitSync(offsets);
                this.setCommittedOffsets(offsets);
                LOG.trace("commitSync succeeded");
            }
            catch (Exception e) {
                LOG.warn("commit failed: {} [client-id: {}]", (Object)e, (Object)this.getClientId());
            }
        } else {
            LOG.trace("skip commitSync - no offsets to commit");
        }
    }

    private void doPeriodicCommit() {
        if (this.skipPeriodicCommit.get()) {
            return;
        }
        this.periodicCommitRetryAfterRebalanceNeeded.set(false);
        if (!this.periodicCommitInvocationInProgress.compareAndSet(false, true)) {
            LOG.trace("periodic commit already triggered, skipping invocation");
            return;
        }
        this.runOnKafkaWorkerThread(v -> {
            Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets = this.getOffsetsToCommit();
            if (!offsets.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("do periodic commit; offsets: [{}]", (Object)HonoKafkaConsumerHelper.getOffsetsDebugString(offsets));
                }
                try {
                    this.getUnderlyingConsumer().commitAsync(offsets, (committedOffsets, error) -> {
                        if (error instanceof RebalanceInProgressException) {
                            LOG.debug("could not do periodic commit: {} [client-id: {}]", (Object)error, (Object)this.getClientId());
                            if (this.isCooperativeRebalancingConfigured()) {
                                this.periodicCommitRetryAfterRebalanceNeeded.set(true);
                            }
                        } else if (error != null) {
                            LOG.info("periodic commit failed: {} [client-id: {}]", (Object)error, (Object)this.getClientId());
                        } else {
                            LOG.trace("periodic commit succeeded");
                            this.setCommittedOffsets(committedOffsets);
                        }
                    });
                }
                catch (Exception ex) {
                    LOG.error("error doing periodic commit [client-id: {}]", (Object)this.getClientId(), (Object)ex);
                }
            } else {
                LOG.trace("skip periodic commit - no offsets to commit");
            }
            this.periodicCommitInvocationInProgress.set(false);
        });
    }

    private synchronized Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> getUncompletedRecordsPartitions(Set<org.apache.kafka.common.TopicPartition> partitions) {
        return this.offsetsMap.entrySet().stream().filter(entry -> partitions.contains(entry.getKey()) && !((TopicPartitionOffsets)entry.getValue()).allCompleted()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private synchronized OffsetsQueueEntry setRecordReceived(long recordOffset, org.apache.kafka.common.TopicPartition topicPartition) {
        return this.offsetsMap.computeIfAbsent(topicPartition, k -> new TopicPartitionOffsets(topicPartition)).addOffset(recordOffset);
    }

    private synchronized void clearObsoleteTopicPartitionOffsets(Collection<org.apache.kafka.common.TopicPartition> currentlyAssignedPartitions) {
        Objects.requireNonNull(currentlyAssignedPartitions);
        Iterator<Map.Entry<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets>> partitionOffsetsIterator = this.offsetsMap.entrySet().iterator();
        while (partitionOffsetsIterator.hasNext()) {
            Map.Entry<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> topicPartitionOffsetsEntry = partitionOffsetsIterator.next();
            if (currentlyAssignedPartitions.contains(topicPartitionOffsetsEntry.getKey())) continue;
            if (topicPartitionOffsetsEntry.getValue().needsCommit()) {
                LOG.warn("partition [{}] not assigned to consumer [{}] anymore but latest handled record offset hasn't been committed yet! {}", topicPartitionOffsetsEntry.getKey(), this.getClientId(), topicPartitionOffsetsEntry.getValue().getStateInfo());
            } else if (!topicPartitionOffsetsEntry.getValue().allCompleted()) {
                LOG.debug("partition [{}] not assigned to consumer [{}] anymore but not all read records have been fully processed yet! {}", topicPartitionOffsetsEntry.getKey(), this.getClientId(), topicPartitionOffsetsEntry.getValue().getStateInfo());
            } else {
                LOG.trace("partition [{}] not assigned to consumer anymore; no still outstanding offset commits there", (Object)topicPartitionOffsetsEntry.getKey());
            }
            partitionOffsetsIterator.remove();
        }
    }

    public boolean isOffsetsCommitNeededForTopic(String topic) {
        return this.getOffsetsToCommit().keySet().stream().anyMatch(tp -> tp.topic().equals(topic));
    }

    private synchronized Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
        return this.offsetsMap.entrySet().stream().flatMap(entry -> ((TopicPartitionOffsets)entry.getValue()).getLastSequentiallyCompletedOffsetForCommit().stream().map(uncommittedOffset -> Pair.of((org.apache.kafka.common.TopicPartition)entry.getKey(), new OffsetAndMetadata(uncommittedOffset + 1L, "")))).collect(Collectors.toMap(Pair::one, Pair::two));
    }

    private synchronized void setCommittedOffsets(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((partition, offsetAndMetadata) -> {
            Optional.ofNullable(this.offsetsMap.get(partition)).ifPresent(queue -> queue.setLastCommittedOffset(offsetAndMetadata.offset() - 1L));
            this.lastKnownCommittedOffsets.put((org.apache.kafka.common.TopicPartition)partition, offsetAndMetadata.offset());
        });
    }

    static class OffsetsQueueEntry {
        private final long offset;
        private final AtomicBoolean handlingComplete = new AtomicBoolean();

        OffsetsQueueEntry(long offset) {
            this.offset = offset;
        }

        public long getOffset() {
            return this.offset;
        }

        public void setHandlingComplete() {
            this.handlingComplete.set(true);
        }

        public boolean isHandlingComplete() {
            return this.handlingComplete.get();
        }

        public String toString() {
            return this.offset + (this.handlingComplete.get() ? " (completed)" : "");
        }
    }

    class UncompletedRecordsCompletionLatch {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> uncompletedRecordsPartitions;

        UncompletedRecordsCompletionLatch(Map<org.apache.kafka.common.TopicPartition, TopicPartitionOffsets> uncompletedRecordsPartitions) {
            this.uncompletedRecordsPartitions = uncompletedRecordsPartitions;
        }

        public void onRecordHandlingCompleted(org.apache.kafka.common.TopicPartition partition) {
            TopicPartitionOffsets offsets = this.uncompletedRecordsPartitions.get(partition);
            if (offsets != null && offsets.allCompleted()) {
                this.uncompletedRecordsPartitions.remove(partition);
                if (this.uncompletedRecordsPartitions.isEmpty()) {
                    this.latch.countDown();
                }
            }
        }

        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
            return this.latch.await(timeout, unit);
        }
    }

    class TopicPartitionOffsets {
        private static final long UNDEFINED_OFFSET = -2L;
        private final org.apache.kafka.common.TopicPartition topicPartition;
        private final Deque<OffsetsQueueEntry> queue = new ArrayDeque<OffsetsQueueEntry>();
        private long lastSequentiallyCompletedOffset = -2L;
        private long lastCommittedOffset = -2L;
        private Instant lastCommitTime;

        TopicPartitionOffsets(org.apache.kafka.common.TopicPartition topicPartition) {
            this.topicPartition = Objects.requireNonNull(topicPartition);
        }

        TopicPartitionOffsets(org.apache.kafka.common.TopicPartition topicPartition, long initialPosition, boolean initialPositionCommitted) {
            this(topicPartition);
            this.lastSequentiallyCompletedOffset = initialPosition - 1L;
            this.lastCommittedOffset = initialPositionCommitted ? this.lastSequentiallyCompletedOffset : -2L;
        }

        public OffsetsQueueEntry addOffset(long offset) {
            this.cleanupAndUpdateLastCompletedOffset();
            OffsetsQueueEntry queueEntry = new OffsetsQueueEntry(offset);
            this.queue.add(queueEntry);
            return queueEntry;
        }

        public Optional<Long> getLastSequentiallyCompletedOffsetForCommit() {
            this.cleanupAndUpdateLastCompletedOffset();
            if (this.lastSequentiallyCompletedOffset == -2L) {
                return Optional.empty();
            }
            if (!this.queue.isEmpty()) {
                LOG.trace("getOffsetsToCommit: offset {} to use for commit is {} entries behind last received offset {}; partition [{}]", this.lastSequentiallyCompletedOffset, this.queue.size(), this.queue.getLast().getOffset(), this.topicPartition);
            }
            if (this.lastSequentiallyCompletedOffset != this.lastCommittedOffset) {
                return Optional.of(this.lastSequentiallyCompletedOffset);
            }
            if (this.lastCommitTime != null && this.lastCommitTime.isBefore(Instant.now().minusSeconds(AsyncHandlingAutoCommitKafkaConsumer.this.skipOffsetRecommitPeriodSeconds))) {
                LOG.trace("getOffsetsToCommit: offset {} will be recommitted (last commit {} too long ago); partition [{}]", this.lastSequentiallyCompletedOffset, this.lastCommitTime, this.topicPartition);
                return Optional.of(this.lastSequentiallyCompletedOffset);
            }
            return Optional.empty();
        }

        private void cleanupAndUpdateLastCompletedOffset() {
            while (Optional.ofNullable(this.queue.peek()).map(OffsetsQueueEntry::isHandlingComplete).orElse(false).booleanValue()) {
                this.lastSequentiallyCompletedOffset = this.queue.remove().getOffset();
            }
        }

        public void setLastCommittedOffset(long offset) {
            if (offset >= this.lastCommittedOffset) {
                this.lastCommitTime = Instant.now();
                this.lastCommittedOffset = offset;
            }
        }

        public boolean allCompleted() {
            this.cleanupAndUpdateLastCompletedOffset();
            return this.queue.isEmpty();
        }

        public boolean needsCommit() {
            this.cleanupAndUpdateLastCompletedOffset();
            return this.lastSequentiallyCompletedOffset != -2L && this.lastSequentiallyCompletedOffset != this.lastCommittedOffset;
        }

        public String getStateInfo() {
            this.cleanupAndUpdateLastCompletedOffset();
            return "{lastSequentiallyCompletedOffset=" + this.getOffsetString(this.lastSequentiallyCompletedOffset) + ", lastCommittedOffset=" + this.getOffsetString(this.lastCommittedOffset) + (this.queue.size() <= 20 ? ", queue=" + this.queue : ", queue.size=" + this.queue.size()) + "}";
        }

        private String getOffsetString(long offset) {
            return offset == -2L ? "undefined" : Long.toString(offset);
        }
    }
}

