/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.opensearch.executor;

import com.google.common.base.Suppliers;
import com.google.common.collect.Multimap;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ListSqlOperatorTable;
import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.locationtech.jts.geom.Point;
import org.opensearch.sql.ast.statement.ExplainMode;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper;
import org.opensearch.sql.calcite.utils.DynamicFieldsResultProcessor;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprTupleValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.executor.ExecutionContext;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.Explain;
import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.PPLFuncImpTable;
import org.opensearch.sql.monitor.profile.MetricName;
import org.opensearch.sql.monitor.profile.ProfileMetric;
import org.opensearch.sql.monitor.profile.QueryProfiling;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprGeoPointValue;
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction;
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.transport.client.node.NodeClient;

public class OpenSearchExecutionEngine
implements ExecutionEngine {
    private static final Logger logger = LogManager.getLogger(OpenSearchExecutionEngine.class);
    private final OpenSearchClient client;
    private final ExecutionProtector executionProtector;
    private final PlanSerializer planSerializer;

    public OpenSearchExecutionEngine(OpenSearchClient client, ExecutionProtector executionProtector, PlanSerializer planSerializer) {
        this.client = client;
        this.executionProtector = executionProtector;
        this.planSerializer = planSerializer;
        this.registerOpenSearchFunctions();
    }

    public void execute(PhysicalPlan physicalPlan, ResponseListener<ExecutionEngine.QueryResponse> listener) {
        this.execute(physicalPlan, ExecutionContext.emptyExecutionContext(), listener);
    }

    public void execute(PhysicalPlan physicalPlan, ExecutionContext context, ResponseListener<ExecutionEngine.QueryResponse> listener) {
        PhysicalPlan plan = this.executionProtector.protect(physicalPlan);
        this.client.schedule(() -> {
            try {
                ArrayList<ExprValue> result = new ArrayList<ExprValue>();
                context.getSplit().ifPresent(arg_0 -> ((PhysicalPlan)plan).add(arg_0));
                plan.open();
                Integer querySizeLimit = context.getQuerySizeLimit();
                while (plan.hasNext() && (querySizeLimit == null || result.size() < querySizeLimit)) {
                    result.add((ExprValue)plan.next());
                }
                ExecutionEngine.QueryResponse response = new ExecutionEngine.QueryResponse(physicalPlan.schema(), result, this.planSerializer.convertToCursor(plan));
                listener.onResponse((Object)response);
            }
            catch (Exception e) {
                listener.onFailure(e);
            }
            finally {
                plan.close();
            }
        });
    }

    public void explain(PhysicalPlan plan, ResponseListener<ExecutionEngine.ExplainResponse> listener) {
        this.client.schedule(() -> {
            try {
                Explain openSearchExplain = new Explain(this){

                    public ExecutionEngine.ExplainResponseNode visitTableScan(TableScanOperator node, Object context) {
                        return this.explain((PhysicalPlan)node, context, explainNode -> explainNode.setDescription(Map.of("request", node.explain())));
                    }
                };
                listener.onResponse((Object)openSearchExplain.apply(plan));
            }
            catch (Exception e) {
                listener.onFailure(e);
            }
        });
    }

    private Hook.Closeable getPhysicalPlanInHook(AtomicReference<String> physical, SqlExplainLevel level) {
        return Hook.PLAN_BEFORE_IMPLEMENTATION.addThread(obj -> {
            RelRoot relRoot = (RelRoot)obj;
            physical.set(RelOptUtil.toString((RelNode)relRoot.rel, (SqlExplainLevel)level));
        });
    }

    private Hook.Closeable getCodegenInHook(AtomicReference<String> codegen) {
        return Hook.JAVA_PLAN.addThread(obj -> codegen.set((String)obj));
    }

    public void explain(RelNode rel, ExplainMode mode, CalcitePlanContext context, ResponseListener<ExecutionEngine.ExplainResponse> listener) {
        this.client.schedule(() -> {
            block13: {
                try {
                    if (mode == ExplainMode.SIMPLE) {
                        String logical = RelOptUtil.toString((RelNode)rel, (SqlExplainLevel)SqlExplainLevel.NO_ATTRIBUTES);
                        listener.onResponse((Object)new ExecutionEngine.ExplainResponse(new ExecutionEngine.ExplainResponseNodeV2(logical, null, null)));
                        break block13;
                    }
                    SqlExplainLevel level = mode == ExplainMode.COST ? SqlExplainLevel.ALL_ATTRIBUTES : SqlExplainLevel.EXPPLAN_ATTRIBUTES;
                    String logical = RelOptUtil.toString((RelNode)rel, (SqlExplainLevel)level);
                    AtomicReference<String> physical = new AtomicReference<String>();
                    AtomicReference<String> javaCode = new AtomicReference<String>();
                    try (Hook.Closeable closeable = this.getPhysicalPlanInHook(physical, level);){
                        if (mode == ExplainMode.EXTENDED) {
                            this.getCodegenInHook(javaCode);
                            CalcitePlanContext.skipEncoding.set(true);
                        }
                        CalciteToolsHelper.OpenSearchRelRunners.run((CalcitePlanContext)context, (RelNode)rel);
                    }
                    listener.onResponse((Object)new ExecutionEngine.ExplainResponse(new ExecutionEngine.ExplainResponseNodeV2(logical, physical.get(), javaCode.get())));
                }
                catch (Exception e) {
                    listener.onFailure(e);
                }
                finally {
                    CalcitePlanContext.skipEncoding.remove();
                }
            }
        });
    }

    public void execute(RelNode rel, CalcitePlanContext context, ResponseListener<ExecutionEngine.QueryResponse> listener) {
        this.client.schedule(() -> {
            try (PreparedStatement statement = CalciteToolsHelper.OpenSearchRelRunners.run((CalcitePlanContext)context, (RelNode)rel);){
                ProfileMetric metric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
                long execTime = System.nanoTime();
                ResultSet result = statement.executeQuery();
                ExecutionEngine.QueryResponse response = this.buildResultSet(result, rel.getRowType(), context.sysLimit.querySizeLimit());
                metric.add(System.nanoTime() - execTime);
                listener.onResponse((Object)response);
            }
            catch (SQLException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static Object processValue(Object value) {
        if (value == null) {
            return null;
        }
        if (value instanceof Point) {
            Point point = (Point)value;
            return new OpenSearchExprGeoPointValue(point.getY(), point.getX());
        }
        if (value instanceof Map) {
            Map map = (Map)value;
            HashMap<String, Object> convertedMap = new HashMap<String, Object>();
            for (Map.Entry entry : map.entrySet()) {
                convertedMap.put((String)entry.getKey(), OpenSearchExecutionEngine.processValue(entry.getValue()));
            }
            return convertedMap;
        }
        if (value instanceof List) {
            List list = (List)value;
            ArrayList<Object> convertedList = new ArrayList<Object>();
            for (Object item : list) {
                convertedList.add(OpenSearchExecutionEngine.processValue(item));
            }
            return convertedList;
        }
        return value;
    }

    private ExecutionEngine.QueryResponse buildResultSet(ResultSet resultSet, RelDataType rowTypes, Integer querySizeLimit) throws SQLException {
        String columnName;
        int i;
        ResultSetMetaData metaData = resultSet.getMetaData();
        int columnCount = metaData.getColumnCount();
        List<RelDataType> fieldTypes = rowTypes.getFieldList().stream().map(RelDataTypeField::getType).toList();
        ArrayList<ExprTupleValue> values = new ArrayList<ExprTupleValue>();
        while (resultSet.next() && (querySizeLimit == null || values.size() < querySizeLimit)) {
            LinkedHashMap<String, ExprValue> row = new LinkedHashMap<String, ExprValue>();
            for (i = 1; i <= columnCount; ++i) {
                columnName = metaData.getColumnName(i);
                Object value = resultSet.getObject(columnName);
                Object converted = OpenSearchExecutionEngine.processValue(value);
                ExprValue exprValue = ExprValueUtils.fromObjectValue((Object)converted);
                row.put(columnName, exprValue);
            }
            values.add(ExprTupleValue.fromExprValueMap(row));
        }
        ArrayList<ExecutionEngine.Schema.Column> columns = new ArrayList<ExecutionEngine.Schema.Column>(metaData.getColumnCount());
        for (i = 1; i <= columnCount; ++i) {
            columnName = metaData.getColumnName(i);
            RelDataType fieldType = fieldTypes.get(i - 1);
            Object exprType = fieldType.getSqlTypeName() == SqlTypeName.ANY ? (!values.isEmpty() ? ((ExprValue)((ExprValue)values.getFirst()).tupleValue().get(columnName)).type() : ExprCoreType.UNDEFINED) : OpenSearchTypeFactory.convertRelDataTypeToExprType((RelDataType)fieldType);
            columns.add(new ExecutionEngine.Schema.Column(columnName, null, exprType));
        }
        ExecutionEngine.Schema schema = new ExecutionEngine.Schema(columns);
        ExecutionEngine.QueryResponse response = new ExecutionEngine.QueryResponse(schema, values, null);
        ExecutionEngine.QueryResponse processedResponse = DynamicFieldsResultProcessor.expandDynamicFields((ExecutionEngine.QueryResponse)response);
        return processedResponse;
    }

    private void registerOpenSearchFunctions() {
        Optional<NodeClient> nodeClient = this.client.getNodeClient();
        if (nodeClient.isPresent()) {
            SqlUserDefinedFunction geoIpFunction = new GeoIpFunction(nodeClient.get()).toUDF(BuiltinFunctionName.GEOIP.name());
            PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, (SqlOperator)geoIpFunction);
            OperatorTable.addOperator(BuiltinFunctionName.GEOIP.name(), (SqlOperator)geoIpFunction);
        } else {
            logger.info("Function [GEOIP] not registered: incompatible client type {}", (Object)this.client.getClass().getName());
        }
        SqlUserDefinedAggFunction approxDistinctCountFunction = UserDefinedFunctionUtils.createUserDefinedAggFunction(DistinctCountApproxAggFunction.class, (String)BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), (SqlReturnTypeInference)ReturnTypes.BIGINT_FORCE_NULLABLE, null);
        PPLFuncImpTable.INSTANCE.registerExternalAggOperator(BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
        OperatorTable.addOperator(BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), (SqlOperator)approxDistinctCountFunction);
    }

    public static class OperatorTable
    extends ListSqlOperatorTable {
        private static final Supplier<OperatorTable> INSTANCE = Suppliers.memoize(() -> (OperatorTable)new OperatorTable().init());
        private static final Map<String, SqlOperator> operators = new ConcurrentHashMap<String, SqlOperator>();

        public static SqlOperatorTable instance() {
            return (SqlOperatorTable)INSTANCE.get();
        }

        private ListSqlOperatorTable init() {
            this.setOperators((Multimap)OperatorTable.buildIndex(operators.values()));
            return this;
        }

        public static synchronized void addOperator(String name, SqlOperator operator) {
            operators.put(name, operator);
        }
    }
}

