/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.util;

import com.fasterxml.jackson.core.type.TypeReference;
import io.modelcontextprotocol.spec.McpSession;
import io.modelcontextprotocol.util.Assert;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class KeepAliveScheduler {
    private static final Logger logger = LoggerFactory.getLogger(KeepAliveScheduler.class);
    private static final TypeReference<Object> OBJECT_TYPE_REF = new TypeReference<Object>(){};
    private final Duration initialDelay;
    private final Duration interval;
    private final Scheduler scheduler;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private Disposable currentSubscription;
    private final Supplier<Flux<McpSession>> mcpSessions;

    KeepAliveScheduler(Scheduler scheduler, Duration initialDelay, Duration interval, Supplier<Flux<McpSession>> mcpSessions) {
        this.scheduler = scheduler;
        this.initialDelay = initialDelay;
        this.interval = interval;
        this.mcpSessions = mcpSessions;
    }

    public static Builder builder(Supplier<Flux<McpSession>> mcpSessions) {
        return new Builder(mcpSessions);
    }

    public Disposable start() {
        if (this.isRunning.compareAndSet(false, true)) {
            this.currentSubscription = Flux.interval((Duration)this.initialDelay, (Duration)this.interval, (Scheduler)this.scheduler).doOnNext(tick -> this.mcpSessions.get().flatMap(session -> session.sendRequest("ping", null, OBJECT_TYPE_REF).doOnError(e -> logger.warn("Failed to send keep-alive ping to session {}: {}", session, (Object)e.getMessage())).onErrorComplete()).subscribe()).doOnCancel(() -> this.isRunning.set(false)).doOnComplete(() -> this.isRunning.set(false)).onErrorComplete(error -> {
                logger.error("KeepAlive scheduler error", error);
                this.isRunning.set(false);
                return true;
            }).subscribe();
            return this.currentSubscription;
        }
        throw new IllegalStateException("KeepAlive scheduler is already running. Stop it first.");
    }

    public void stop() {
        if (this.currentSubscription != null && !this.currentSubscription.isDisposed()) {
            this.currentSubscription.dispose();
        }
        this.isRunning.set(false);
    }

    public boolean isRunning() {
        return this.isRunning.get();
    }

    public void shutdown() {
        this.stop();
        if (this.scheduler instanceof Disposable) {
            this.scheduler.dispose();
        }
    }

    public static class Builder {
        private Scheduler scheduler = Schedulers.boundedElastic();
        private Duration initialDelay = Duration.ofSeconds(0L);
        private Duration interval = Duration.ofSeconds(30L);
        private Supplier<Flux<McpSession>> mcpSessions;

        Builder(Supplier<Flux<McpSession>> mcpSessions) {
            Assert.notNull(mcpSessions, "McpSessions supplier must not be null");
            this.mcpSessions = mcpSessions;
        }

        public Builder scheduler(Scheduler scheduler) {
            Assert.notNull(scheduler, "Scheduler must not be null");
            this.scheduler = scheduler;
            return this;
        }

        public Builder initialDelay(Duration initialDelay) {
            Assert.notNull(initialDelay, "Initial delay must not be null");
            this.initialDelay = initialDelay;
            return this;
        }

        public Builder interval(Duration interval) {
            Assert.notNull(interval, "Interval must not be null");
            this.interval = interval;
            return this;
        }

        public KeepAliveScheduler build() {
            return new KeepAliveScheduler(this.scheduler, this.initialDelay, this.interval, this.mcpSessions);
        }
    }
}

