/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.cluster;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceStatus;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;

public class FlintStreamingJobHouseKeeperTask
implements Runnable {
    private final DataSourceService dataSourceService;
    private final FlintIndexMetadataService flintIndexMetadataService;
    private final FlintIndexOpFactory flintIndexOpFactory;
    private final NullAsyncQueryRequestContext nullAsyncQueryRequestContext = new NullAsyncQueryRequestContext();
    private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobHouseKeeperTask.class);
    protected static final AtomicBoolean isRunning = new AtomicBoolean(false);

    @Override
    public void run() {
        if (!isRunning.compareAndSet(false, true)) {
            LOGGER.info("Previous task is still running. Skipping this execution.");
            return;
        }
        try {
            LOGGER.info("Starting housekeeping task for auto refresh streaming jobs.");
            Map<String, FlintIndexMetadata> autoRefreshFlintIndicesMap = this.getAllAutoRefreshIndices();
            autoRefreshFlintIndicesMap.forEach((autoRefreshIndex, flintIndexMetadata) -> {
                block7: {
                    try {
                        String datasourceName = this.getDataSourceName((FlintIndexMetadata)flintIndexMetadata);
                        try {
                            DataSourceMetadata dataSourceMetadata = this.dataSourceService.getDataSourceMetadata(datasourceName);
                            if (dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED) {
                                LOGGER.info("Datasource is disabled for autoRefreshIndex: {}", autoRefreshIndex);
                                this.alterAutoRefreshIndex((String)autoRefreshIndex, (FlintIndexMetadata)flintIndexMetadata, datasourceName);
                                break block7;
                            }
                            LOGGER.debug("Datasource is enabled for autoRefreshIndex : {}", autoRefreshIndex);
                        }
                        catch (DataSourceNotFoundException exception) {
                            LOGGER.info("Datasource is deleted for autoRefreshIndex: {}", autoRefreshIndex);
                            try {
                                this.dropAutoRefreshIndex((String)autoRefreshIndex, (FlintIndexMetadata)flintIndexMetadata, datasourceName);
                            }
                            catch (IllegalStateException illegalStateException) {
                                LOGGER.debug("AutoRefresh index: {} is not in valid state for deletion.", autoRefreshIndex);
                            }
                        }
                    }
                    catch (Exception exception) {
                        LOGGER.error("Failed to alter/cancel index {}: {}", autoRefreshIndex, (Object)exception.getMessage(), (Object)exception);
                        Metrics.getInstance().getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT).increment();
                    }
                }
            });
            LOGGER.info("Finished housekeeping task for auto refresh streaming jobs.");
        }
        catch (Throwable error) {
            LOGGER.error("Error while running the streaming job cleaner task: {}", (Object)error.getMessage());
            Metrics.getInstance().getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT).increment();
        }
        finally {
            isRunning.set(false);
        }
    }

    private void dropAutoRefreshIndex(String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
        LOGGER.info("Attempting to drop auto refresh index: {}", (Object)autoRefreshIndex);
        this.flintIndexOpFactory.getDrop(datasourceName).apply(flintIndexMetadata, this.nullAsyncQueryRequestContext);
        LOGGER.info("Successfully dropped index: {}", (Object)autoRefreshIndex);
    }

    private void alterAutoRefreshIndex(String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
        LOGGER.info("Attempting to alter index: {}", (Object)autoRefreshIndex);
        FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
        flintIndexOptions.setOption("auto_refresh", "false");
        this.flintIndexOpFactory.getAlter(flintIndexOptions, datasourceName).apply(flintIndexMetadata, this.nullAsyncQueryRequestContext);
        LOGGER.info("Successfully altered index: {}", (Object)autoRefreshIndex);
    }

    private String getDataSourceName(FlintIndexMetadata flintIndexMetadata) {
        String kind;
        switch (kind = flintIndexMetadata.getKind()) {
            case "mv": {
                return flintIndexMetadata.getName().split("\\.")[0];
            }
            case "skipping": 
            case "covering": {
                return flintIndexMetadata.getSource().split("\\.")[0];
            }
        }
        throw new IllegalArgumentException(String.format("Unknown flint index kind: %s", kind));
    }

    private Map<String, FlintIndexMetadata> getAllAutoRefreshIndices() {
        Map<String, FlintIndexMetadata> flintIndexMetadataHashMap = this.flintIndexMetadataService.getFlintIndexMetadata("flint_*", this.nullAsyncQueryRequestContext);
        return flintIndexMetadataHashMap.entrySet().stream().filter(entry -> ((FlintIndexMetadata)entry.getValue()).getFlintIndexOptions().autoRefresh()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Generated
    public FlintStreamingJobHouseKeeperTask(DataSourceService dataSourceService, FlintIndexMetadataService flintIndexMetadataService, FlintIndexOpFactory flintIndexOpFactory) {
        this.dataSourceService = dataSourceService;
        this.flintIndexMetadataService = flintIndexMetadataService;
        this.flintIndexOpFactory = flintIndexOpFactory;
    }
}

