/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.share;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import kafka.cluster.PartitionListener;
import kafka.server.ReplicaManager;
import kafka.server.share.DelayedShareFetch;
import kafka.server.share.ShareFetchUtils;
import kafka.server.share.SharePartition;
import kafka.server.share.SharePartitionCache;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareGroupListener;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
import org.apache.kafka.server.share.fetch.PartitionRotateStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SharePartitionManager
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SharePartitionManager.class);
    private final SharePartitionCache partitionCache;
    private final ReplicaManager replicaManager;
    private final Time time;
    private final ShareSessionCache cache;
    private final GroupConfigManager groupConfigManager;
    private final int defaultRecordLockDurationMs;
    private final Timer timer;
    private final int maxInFlightMessages;
    private final int maxDeliveryCount;
    private final long remoteFetchMaxWaitMs;
    private final Persister persister;
    private final ShareGroupMetrics shareGroupMetrics;
    private final BrokerTopicStats brokerTopicStats;

    public SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache cache, int defaultRecordLockDurationMs, int maxDeliveryCount, int maxInFlightMessages, long remoteFetchMaxWaitMs, Persister persister, GroupConfigManager groupConfigManager, BrokerTopicStats brokerTopicStats) {
        this(replicaManager, time, cache, new SharePartitionCache(), defaultRecordLockDurationMs, maxDeliveryCount, maxInFlightMessages, remoteFetchMaxWaitMs, persister, groupConfigManager, new ShareGroupMetrics(time), brokerTopicStats);
    }

    private SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache cache, SharePartitionCache partitionCache, int defaultRecordLockDurationMs, int maxDeliveryCount, int maxInFlightMessages, long remoteFetchMaxWaitMs, Persister persister, GroupConfigManager groupConfigManager, ShareGroupMetrics shareGroupMetrics, BrokerTopicStats brokerTopicStats) {
        this(replicaManager, time, cache, partitionCache, defaultRecordLockDurationMs, (Timer)new SystemTimerReaper("share-group-lock-timeout-reaper", (Timer)new SystemTimer("share-group-lock-timeout")), maxDeliveryCount, maxInFlightMessages, remoteFetchMaxWaitMs, persister, groupConfigManager, shareGroupMetrics, brokerTopicStats);
    }

    SharePartitionManager(ReplicaManager replicaManager, Time time, ShareSessionCache cache, SharePartitionCache partitionCache, int defaultRecordLockDurationMs, Timer timer, int maxDeliveryCount, int maxInFlightMessages, long remoteFetchMaxWaitMs, Persister persister, GroupConfigManager groupConfigManager, ShareGroupMetrics shareGroupMetrics, BrokerTopicStats brokerTopicStats) {
        this.replicaManager = replicaManager;
        this.time = time;
        this.cache = cache;
        this.partitionCache = partitionCache;
        this.defaultRecordLockDurationMs = defaultRecordLockDurationMs;
        this.timer = timer;
        this.maxDeliveryCount = maxDeliveryCount;
        this.maxInFlightMessages = maxInFlightMessages;
        this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
        this.persister = persister;
        this.groupConfigManager = groupConfigManager;
        this.shareGroupMetrics = shareGroupMetrics;
        this.brokerTopicStats = brokerTopicStats;
        this.cache.registerShareGroupListener((ShareGroupListener)new ShareGroupListenerImpl());
    }

    public CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> fetchMessages(String groupId, String memberId, FetchParams fetchParams, int sessionEpoch, int maxFetchRecords, int batchSize, List<TopicIdPartition> topicIdPartitions) {
        log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}", new Object[]{topicIdPartitions, groupId, fetchParams});
        List rotatedTopicIdPartitions = PartitionRotateStrategy.type((PartitionRotateStrategy.StrategyType)PartitionRotateStrategy.StrategyType.ROUND_ROBIN).rotate(topicIdPartitions, new PartitionRotateStrategy.PartitionRotateMetadata(sessionEpoch));
        CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>();
        this.processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, rotatedTopicIdPartitions, batchSize, maxFetchRecords, this.brokerTopicStats));
        return future;
    }

    public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> acknowledge(String memberId, String groupId, Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics) {
        log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}", acknowledgeTopics.keySet(), (Object)groupId);
        HashMap<TopicIdPartition, CompletableFuture<Throwable>> futures = new HashMap<TopicIdPartition, CompletableFuture<Throwable>>();
        HashSet topics = new HashSet();
        acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
            topics.add(topicIdPartition.topic());
            SharePartitionKey sharePartitionKey = this.sharePartitionKey(groupId, (TopicIdPartition)topicIdPartition);
            SharePartition sharePartition = this.partitionCache.get(sharePartitionKey);
            if (sharePartition != null) {
                CompletableFuture future = new CompletableFuture();
                sharePartition.acknowledge(memberId, (List<ShareAcknowledgementBatch>)acknowledgePartitionBatches).whenComplete((result, throwable) -> {
                    if (throwable != null) {
                        this.fencedSharePartitionHandler().accept(sharePartitionKey, (Throwable)throwable);
                        future.complete(throwable);
                        return;
                    }
                    acknowledgePartitionBatches.forEach(batch -> {
                        if (batch.acknowledgeTypes().size() == 1) {
                            this.shareGroupMetrics.recordAcknowledgement(((Byte)batch.acknowledgeTypes().get(0)).byteValue(), batch.lastOffset() - batch.firstOffset() + 1L);
                        } else {
                            batch.acknowledgeTypes().forEach(arg_0 -> ((ShareGroupMetrics)this.shareGroupMetrics).recordAcknowledgement(arg_0));
                        }
                    });
                    future.complete(null);
                });
                DelayedShareFetchGroupKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
                this.replicaManager.completeDelayedShareFetchRequest((DelayedShareFetchKey)delayedShareFetchKey);
                futures.put((TopicIdPartition)topicIdPartition, future);
            } else {
                futures.put((TopicIdPartition)topicIdPartition, (CompletableFuture<Throwable>)CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
            }
        });
        topics.forEach(topic -> {
            this.brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().mark();
            this.brokerTopicStats.topicStats(topic).totalShareAcknowledgementRequestRate().mark();
        });
        return this.mapAcknowledgementFutures(futures, Optional.of(this.failedShareAcknowledgeMetricsHandler()));
    }

    public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> releaseSession(String groupId, String memberId) {
        log.trace("Release session request for groupId: {}, memberId: {}", (Object)groupId, (Object)memberId);
        Uuid memberIdUuid = Uuid.fromString((String)memberId);
        List<TopicIdPartition> topicIdPartitions = this.cachedTopicIdPartitionsInShareSession(groupId, memberIdUuid);
        ShareSessionKey key = this.shareSessionKey(groupId, memberIdUuid);
        if (this.cache.remove(key) == null) {
            log.error("Share session error for {}: no such share session found", (Object)key);
            return FutureUtils.failedFuture((Throwable)Errors.SHARE_SESSION_NOT_FOUND.exception());
        }
        log.debug("Removed share session with key {}", (Object)key);
        if (topicIdPartitions.isEmpty()) {
            return CompletableFuture.completedFuture(Map.of());
        }
        HashMap<TopicIdPartition, CompletableFuture<Throwable>> futuresMap = new HashMap<TopicIdPartition, CompletableFuture<Throwable>>();
        topicIdPartitions.forEach(topicIdPartition -> {
            SharePartitionKey sharePartitionKey = this.sharePartitionKey(groupId, (TopicIdPartition)topicIdPartition);
            SharePartition sharePartition = this.partitionCache.get(sharePartitionKey);
            if (sharePartition == null) {
                log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", (Object)groupId, topicIdPartition);
                futuresMap.put((TopicIdPartition)topicIdPartition, (CompletableFuture<Throwable>)CompletableFuture.completedFuture(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()));
            } else {
                CompletableFuture future = new CompletableFuture();
                sharePartition.releaseAcquiredRecords(memberId).whenComplete((result, throwable) -> {
                    if (throwable != null) {
                        this.fencedSharePartitionHandler().accept(sharePartitionKey, (Throwable)throwable);
                        future.complete(throwable);
                        return;
                    }
                    future.complete(null);
                });
                DelayedShareFetchGroupKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition());
                this.replicaManager.completeDelayedShareFetchRequest((DelayedShareFetchKey)delayedShareFetchKey);
                futuresMap.put((TopicIdPartition)topicIdPartition, future);
            }
        });
        return this.mapAcknowledgementFutures(futuresMap, Optional.empty());
    }

    public CompletableFuture<Void> createIdleShareFetchTimerTask(long maxWaitMs) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        IdleShareFetchTimerTask idleShareFetchTimerTask = new IdleShareFetchTimerTask(maxWaitMs, future);
        this.replicaManager.addShareFetchTimerRequest(idleShareFetchTimerTask);
        return future;
    }

    private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap, Optional<Consumer<Set<String>>> failedMetricsHandler) {
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futuresMap.values().toArray(new CompletableFuture[0]));
        return allFutures.thenApply(v -> {
            HashMap result = new HashMap();
            HashSet failedTopics = new HashSet();
            futuresMap.forEach((topicIdPartition, future) -> {
                ShareAcknowledgeResponseData.PartitionData partitionData = new ShareAcknowledgeResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition());
                Throwable t = (Throwable)future.join();
                if (t != null) {
                    partitionData.setErrorCode(Errors.forException((Throwable)t).code()).setErrorMessage(t.getMessage());
                    failedTopics.add(topicIdPartition.topic());
                }
                result.put(topicIdPartition, partitionData);
            });
            failedMetricsHandler.ifPresent(handler -> handler.accept(failedTopics));
            return result;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ShareFetchContext newContext(String groupId, List<TopicIdPartition> shareFetchData, List<TopicIdPartition> toForget, ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent, String clientConnectionId) {
        FinalContext context;
        if (reqMetadata.isFull()) {
            ShareSessionKey key = this.shareSessionKey(groupId, reqMetadata.memberId());
            if (reqMetadata.epoch() == -1) {
                if (this.cache.get(key) == null) {
                    log.error("Share session error for {}: no such share session found", (Object)key);
                    throw Errors.SHARE_SESSION_NOT_FOUND.exception();
                }
                context = new FinalContext();
            } else {
                if (isAcknowledgeDataPresent.booleanValue()) {
                    log.error("Acknowledge data present in Initial Fetch Request for group {} member {}", (Object)groupId, (Object)reqMetadata.memberId());
                    throw Errors.INVALID_REQUEST.exception();
                }
                if (this.cache.remove(key) != null) {
                    log.debug("Removed share session with key {}", (Object)key);
                }
                ImplicitLinkedHashCollection cachedSharePartitions = new ImplicitLinkedHashCollection(shareFetchData.size());
                shareFetchData.forEach(topicIdPartition -> cachedSharePartitions.mustAdd((ImplicitLinkedHashCollection.Element)new CachedSharePartition(topicIdPartition, false)));
                ShareSessionKey responseShareSessionKey = this.cache.maybeCreateSession(groupId, reqMetadata.memberId(), cachedSharePartitions, clientConnectionId);
                if (responseShareSessionKey == null) {
                    log.error("Could not create a share session for group {} member {}", (Object)groupId, (Object)reqMetadata.memberId());
                    throw Errors.SHARE_SESSION_LIMIT_REACHED.exception();
                }
                context = new ShareSessionContext(reqMetadata, shareFetchData);
                log.debug("Created a new ShareSessionContext with key {} isSubsequent {} returning {}. A new share session will be started.", new Object[]{responseShareSessionKey, false, SharePartitionManager.partitionsToLogString(shareFetchData)});
            }
        } else {
            ShareSessionCache shareSessionCache = this.cache;
            synchronized (shareSessionCache) {
                ShareSessionKey key = this.shareSessionKey(groupId, reqMetadata.memberId());
                ShareSession shareSession = this.cache.get(key);
                if (shareSession == null) {
                    log.error("Share session error for {}: no such share session found", (Object)key);
                    throw Errors.SHARE_SESSION_NOT_FOUND.exception();
                }
                if (shareSession.epoch != reqMetadata.epoch()) {
                    log.debug("Share session error for {}: expected epoch {}, but got {} instead", new Object[]{key, shareSession.epoch, reqMetadata.epoch()});
                    throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
                }
                Map modifiedTopicIdPartitions = shareSession.update(shareFetchData, toForget);
                this.cache.updateNumPartitions(shareSession);
                shareSession.epoch = ShareRequestMetadata.nextEpoch((int)shareSession.epoch);
                log.debug("Created a new ShareSessionContext for session key {}, epoch {}: added {}, updated {}, removed {}", new Object[]{shareSession.key(), shareSession.epoch, SharePartitionManager.partitionsToLogString((Collection)modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.ADDED)), SharePartitionManager.partitionsToLogString((Collection)modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.UPDATED)), SharePartitionManager.partitionsToLogString((Collection)modifiedTopicIdPartitions.get(ShareSession.ModifiedTopicIdPartitionType.REMOVED))});
                context = new ShareSessionContext(reqMetadata, shareSession);
            }
        }
        return context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void acknowledgeSessionUpdate(String groupId, ShareRequestMetadata reqMetadata) {
        if (reqMetadata.epoch() == 0) {
            throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
        }
        ShareSessionCache shareSessionCache = this.cache;
        synchronized (shareSessionCache) {
            ShareSessionKey key = this.shareSessionKey(groupId, reqMetadata.memberId());
            ShareSession shareSession = this.cache.get(key);
            if (shareSession == null) {
                log.debug("Share session error for {}: no such share session found", (Object)key);
                throw Errors.SHARE_SESSION_NOT_FOUND.exception();
            }
            if (reqMetadata.epoch() == -1) {
                return;
            }
            if (shareSession.epoch != reqMetadata.epoch()) {
                log.debug("Share session error for {}: expected epoch {}, but got {} instead", new Object[]{key, shareSession.epoch, reqMetadata.epoch()});
                throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
            }
            this.cache.updateNumPartitions(shareSession);
            shareSession.epoch = ShareRequestMetadata.nextEpoch((int)shareSession.epoch);
        }
    }

    public void onShareVersionToggle(ShareVersion shareVersion, boolean isEnabledFromConfig) {
        if (!shareVersion.supportsShareGroups() && !isEnabledFromConfig) {
            this.cache.removeAllSessions();
            Set<SharePartitionKey> sharePartitionKeys = this.partitionCache.cachedSharePartitionKeys();
            sharePartitionKeys.forEach(sharePartitionKey -> SharePartitionManager.removeSharePartitionFromCache(sharePartitionKey, this.partitionCache, this.replicaManager));
        }
    }

    List<TopicIdPartition> cachedTopicIdPartitionsInShareSession(String groupId, Uuid memberId) {
        ShareSessionKey key = this.shareSessionKey(groupId, memberId);
        ShareSession shareSession = this.cache.get(key);
        if (shareSession == null) {
            return List.of();
        }
        ArrayList<TopicIdPartition> cachedTopicIdPartitions = new ArrayList<TopicIdPartition>();
        shareSession.partitionMap().forEach(cachedSharePartition -> cachedTopicIdPartitions.add(new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()))));
        return cachedTopicIdPartitions;
    }

    private void addDelayedShareFetch(DelayedShareFetch delayedShareFetch, List<DelayedShareFetchKey> keys) {
        this.replicaManager.addDelayedShareFetchRequest(delayedShareFetch, keys);
    }

    @Override
    public void close() throws Exception {
        this.timer.close();
        this.shareGroupMetrics.close();
    }

    private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
        return new ShareSessionKey(groupId, memberId);
    }

    private static String partitionsToLogString(Collection<TopicIdPartition> partitions) {
        return ShareSession.partitionsToLogString(partitions, (Boolean)log.isTraceEnabled());
    }

    void processShareFetch(ShareFetch shareFetch) {
        if (shareFetch.topicIdPartitions().isEmpty()) {
            shareFetch.maybeComplete(Map.of());
            return;
        }
        ArrayList<DelayedShareFetchKey> delayedShareFetchWatchKeys = new ArrayList<DelayedShareFetchKey>();
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        HashSet<String> topics = new HashSet<String>();
        for (TopicIdPartition topicIdPartition : shareFetch.topicIdPartitions()) {
            SharePartition sharePartition;
            topics.add(topicIdPartition.topic());
            SharePartitionKey sharePartitionKey = this.sharePartitionKey(shareFetch.groupId(), topicIdPartition);
            try {
                sharePartition = this.getOrCreateSharePartition(sharePartitionKey);
            }
            catch (Exception e) {
                log.debug("Error processing share fetch request", (Throwable)e);
                shareFetch.addErroneous(topicIdPartition, (Throwable)e);
                continue;
            }
            DelayedShareFetchGroupKey delayedShareFetchKey = new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition());
            delayedShareFetchWatchKeys.add((DelayedShareFetchKey)delayedShareFetchKey);
            delayedShareFetchWatchKeys.add((DelayedShareFetchKey)new DelayedShareFetchPartitionKey(topicIdPartition.topicId(), topicIdPartition.partition()));
            CompletableFuture<Void> initializationFuture = sharePartition.maybeInitialize();
            boolean initialized = initializationFuture.isDone();
            initializationFuture.whenComplete((arg_0, arg_1) -> this.lambda$processShareFetch$12(sharePartitionKey, shareFetch, initialized, sharePartition, (DelayedShareFetchKey)delayedShareFetchKey, arg_0, arg_1));
            sharePartitions.put(topicIdPartition, sharePartition);
        }
        topics.forEach(topic -> {
            this.brokerTopicStats.allTopicsStats().totalShareFetchRequestRate().mark();
            this.brokerTopicStats.topicStats(topic).totalShareFetchRequestRate().mark();
        });
        if (shareFetch.errorInAllPartitions()) {
            shareFetch.maybeComplete(Map.of());
            return;
        }
        this.addDelayedShareFetch(new DelayedShareFetch(shareFetch, this.replicaManager, this.fencedSharePartitionHandler(), sharePartitions, this.shareGroupMetrics, this.time, this.remoteFetchMaxWaitMs), delayedShareFetchWatchKeys);
    }

    private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitionKey) {
        return this.partitionCache.computeIfAbsent(sharePartitionKey, k -> {
            int leaderEpoch = ShareFetchUtils.leaderEpoch(this.replicaManager, sharePartitionKey.topicIdPartition().topicPartition());
            SharePartitionListener listener = new SharePartitionListener(sharePartitionKey, this.replicaManager, this.partitionCache);
            this.replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), listener);
            return new SharePartition(sharePartitionKey.groupId(), sharePartitionKey.topicIdPartition(), leaderEpoch, this.maxInFlightMessages, this.maxDeliveryCount, this.defaultRecordLockDurationMs, this.timer, this.time, this.persister, this.replicaManager, this.groupConfigManager, listener);
        });
    }

    private void handleInitializationException(SharePartitionKey sharePartitionKey, ShareFetch shareFetch, Throwable throwable) {
        if (throwable instanceof LeaderNotAvailableException) {
            log.debug("The share partition with key {} is not initialized yet", (Object)sharePartitionKey);
            return;
        }
        SharePartitionManager.removeSharePartitionFromCache(sharePartitionKey, this.partitionCache, this.replicaManager);
        log.debug("Error initializing share partition with key {}", (Object)sharePartitionKey, (Object)throwable);
        shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), throwable);
    }

    private BiConsumer<SharePartitionKey, Throwable> fencedSharePartitionHandler() {
        return (sharePartitionKey, throwable) -> {
            if (throwable instanceof NotLeaderOrFollowerException || throwable instanceof FencedStateEpochException || throwable instanceof GroupIdNotFoundException || throwable instanceof UnknownTopicOrPartitionException) {
                log.info("The share partition with key {} is fenced: {}", sharePartitionKey, (Object)throwable.getMessage());
                SharePartitionManager.removeSharePartitionFromCache(sharePartitionKey, this.partitionCache, this.replicaManager);
            }
        };
    }

    private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) {
        return new SharePartitionKey(groupId, topicIdPartition);
    }

    private static void removeSharePartitionFromCache(SharePartitionKey sharePartitionKey, SharePartitionCache partitionCache, ReplicaManager replicaManager) {
        SharePartition sharePartition = partitionCache.remove(sharePartitionKey);
        if (sharePartition != null) {
            sharePartition.markFenced();
            replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(), sharePartition.listener());
        }
    }

    private Consumer<Set<String>> failedShareAcknowledgeMetricsHandler() {
        return failedTopics -> failedTopics.forEach(topic -> {
            this.brokerTopicStats.allTopicsStats().failedShareAcknowledgementRequestRate().mark();
            this.brokerTopicStats.topicStats(topic).failedShareAcknowledgementRequestRate().mark();
        });
    }

    private /* synthetic */ void lambda$processShareFetch$12(SharePartitionKey sharePartitionKey, ShareFetch shareFetch, boolean initialized, SharePartition sharePartition, DelayedShareFetchKey delayedShareFetchKey, Void result, Throwable throwable) {
        if (throwable != null) {
            this.handleInitializationException(sharePartitionKey, shareFetch, throwable);
        }
        if (!initialized) {
            this.shareGroupMetrics.partitionLoadTime(sharePartition.loadStartTimeMs());
            this.replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
        }
    }

    private class ShareGroupListenerImpl
    implements ShareGroupListener {
        private ShareGroupListenerImpl() {
        }

        public void onMemberLeave(String groupId, Uuid memberId) {
            SharePartitionManager.this.releaseSession(groupId, memberId.toString());
        }

        public void onGroupEmpty(String groupId) {
            Set<TopicIdPartition> topicIdPartitions = SharePartitionManager.this.partitionCache.topicIdPartitionsForGroup(groupId);
            if (topicIdPartitions != null) {
                topicIdPartitions.forEach(topicIdPartition -> SharePartitionManager.removeSharePartitionFromCache(new SharePartitionKey(groupId, topicIdPartition), SharePartitionManager.this.partitionCache, SharePartitionManager.this.replicaManager));
            }
        }
    }

    private static class IdleShareFetchTimerTask
    extends TimerTask {
        private final CompletableFuture<Void> future;

        public IdleShareFetchTimerTask(long delayMs, CompletableFuture<Void> future) {
            super(delayMs);
            this.future = future;
        }

        public void run() {
            this.future.complete(null);
        }
    }

    static class SharePartitionListener
    implements PartitionListener {
        private final SharePartitionKey sharePartitionKey;
        private final ReplicaManager replicaManager;
        private final SharePartitionCache partitionCache;

        SharePartitionListener(SharePartitionKey sharePartitionKey, ReplicaManager replicaManager, SharePartitionCache partitionCache) {
            this.sharePartitionKey = sharePartitionKey;
            this.replicaManager = replicaManager;
            this.partitionCache = partitionCache;
        }

        @Override
        public void onFailed(TopicPartition topicPartition) {
            log.debug("The share partition failed listener is invoked for the topic-partition: {}, share-partition: {}", (Object)topicPartition, (Object)this.sharePartitionKey);
            this.onUpdate(topicPartition);
        }

        @Override
        public void onDeleted(TopicPartition topicPartition) {
            log.debug("The share partition delete listener is invoked for the topic-partition: {}, share-partition: {}", (Object)topicPartition, (Object)this.sharePartitionKey);
            this.onUpdate(topicPartition);
        }

        @Override
        public void onBecomingFollower(TopicPartition topicPartition) {
            log.debug("The share partition becoming follower listener is invoked for the topic-partition: {}, share-partition: {}", (Object)topicPartition, (Object)this.sharePartitionKey);
            this.onUpdate(topicPartition);
        }

        private void onUpdate(TopicPartition topicPartition) {
            if (!this.sharePartitionKey.topicIdPartition().topicPartition().equals((Object)topicPartition)) {
                log.error("The share partition listener is invoked for the wrong topic-partition: {}, share-partition: {}", (Object)topicPartition, (Object)this.sharePartitionKey);
                return;
            }
            SharePartitionManager.removeSharePartitionFromCache(this.sharePartitionKey, this.partitionCache, this.replicaManager);
        }
    }
}

