/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.timeseries.transport;

import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.caching.CacheProvider;
import org.opensearch.timeseries.caching.TimeSeriesCache;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.ml.CheckpointDao;
import org.opensearch.timeseries.ml.IntermediateResult;
import org.opensearch.timeseries.ml.ModelColdStart;
import org.opensearch.timeseries.ml.ModelManager;
import org.opensearch.timeseries.ml.ModelState;
import org.opensearch.timeseries.ml.RealTimeInferencer;
import org.opensearch.timeseries.ml.Sample;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IndexableResult;
import org.opensearch.timeseries.model.TaskType;
import org.opensearch.timeseries.model.TimeSeriesTask;
import org.opensearch.timeseries.ratelimit.CheckpointReadWorker;
import org.opensearch.timeseries.ratelimit.CheckpointWriteWorker;
import org.opensearch.timeseries.ratelimit.ColdEntityWorker;
import org.opensearch.timeseries.ratelimit.ColdStartWorker;
import org.opensearch.timeseries.ratelimit.FeatureRequest;
import org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker;
import org.opensearch.timeseries.ratelimit.RequestPriority;
import org.opensearch.timeseries.ratelimit.SaveResultStrategy;
import org.opensearch.timeseries.task.TaskCacheManager;
import org.opensearch.timeseries.task.TaskManager;
import org.opensearch.timeseries.transport.EntityResultRequest;
import org.opensearch.timeseries.util.ActionListenerExecutor;
import org.opensearch.timeseries.util.ExceptionUtil;

public class EntityResultProcessor<RCFModelType extends ThresholdedRandomCutForest, IndexableResultType extends IndexableResult, IntermediateResultType extends IntermediateResult<IndexableResultType>, IndexType extends Enum<IndexType>, IndexManagementType extends IndexManagement<IndexType>, CheckpointDaoType extends CheckpointDao<RCFModelType, IndexType, IndexManagementType>, CheckpointWriteWorkerType extends CheckpointWriteWorker<RCFModelType, IndexType, IndexManagementType, CheckpointDaoType>, ModelColdStartType extends ModelColdStart<RCFModelType, IndexType, IndexManagementType, IndexableResultType>, ModelManagerType extends ModelManager<RCFModelType, IndexableResultType, IntermediateResultType, IndexType, IndexManagementType, CheckpointDaoType, ModelColdStartType>, CacheType extends TimeSeriesCache<RCFModelType>, SaveResultStrategyType extends SaveResultStrategy<IndexableResultType, IntermediateResultType>, TaskCacheManagerType extends TaskCacheManager, TaskTypeEnum extends TaskType, TaskClass extends TimeSeriesTask, TaskManagerType extends TaskManager<TaskCacheManagerType, TaskTypeEnum, TaskClass, IndexType, IndexManagementType>, ColdStartWorkerType extends ColdStartWorker<RCFModelType, IndexType, IndexManagementType, CheckpointDaoType, CheckpointWriteWorkerType, ModelColdStartType, CacheType, IndexableResultType, IntermediateResultType, ModelManagerType, SaveResultStrategyType, TaskCacheManagerType, TaskTypeEnum, TaskClass, TaskManagerType>, InferencerType extends RealTimeInferencer<RCFModelType, IndexableResultType, IntermediateResultType, IndexType, IndexManagementType, CheckpointDaoType, CheckpointWriteWorkerType, ModelColdStartType, ModelManagerType, SaveResultStrategyType, CacheType, TaskCacheManagerType, TaskTypeEnum, TaskClass, TaskManagerType, ColdStartWorkerType>, HCCheckpointReadWorkerType extends CheckpointReadWorker<RCFModelType, IndexableResultType, IntermediateResultType, IndexType, IndexManagementType, CheckpointDaoType, CheckpointWriteWorkerType, ModelColdStartType, ModelManagerType, CacheType, SaveResultStrategyType, TaskCacheManagerType, TaskTypeEnum, TaskClass, TaskManagerType, ColdStartWorkerType, InferencerType>, ColdEntityWorkerType extends ColdEntityWorker<RCFModelType, IndexableResultType, IndexType, IndexManagementType, CheckpointDaoType, IntermediateResultType, ModelManagerType, CheckpointWriteWorkerType, ModelColdStartType, CacheType, SaveResultStrategyType, TaskCacheManagerType, TaskTypeEnum, TaskClass, TaskManagerType, ColdStartWorkerType, InferencerType, HCCheckpointReadWorkerType>> {
    private static final Logger LOG = LogManager.getLogger(EntityResultProcessor.class);
    private CacheProvider<RCFModelType, CacheType> cache;
    private HCCheckpointReadWorkerType checkpointReadQueue;
    private ColdEntityWorkerType coldEntityQueue;
    private InferencerType inferencer;
    private ThreadPool threadPool;
    private String threadPoolName;
    private Clock clock;

    public EntityResultProcessor(CacheProvider<RCFModelType, CacheType> cache, HCCheckpointReadWorkerType checkpointReadQueue, ColdEntityWorkerType coldEntityQueue, InferencerType inferencer, ThreadPool threadPool, String threadPoolName, Clock clock) {
        this.cache = cache;
        this.checkpointReadQueue = checkpointReadQueue;
        this.coldEntityQueue = coldEntityQueue;
        this.inferencer = inferencer;
        this.threadPool = threadPool;
        this.threadPoolName = threadPoolName;
        this.clock = clock;
    }

    public ActionListener<Optional<? extends Config>> onGetConfig(ActionListener<AcknowledgedResponse> listener, String configId, EntityResultRequest request, Optional<Exception> prevException, AnalysisType analysisType) {
        return ActionListenerExecutor.wrap(configOptional -> {
            if (!configOptional.isPresent()) {
                listener.onFailure((Exception)new EndRunException(configId, "Config " + configId + " is not available.", false));
                return;
            }
            Config config = (Config)configOptional.get();
            if (request.getEntities() == null) {
                listener.onFailure((Exception)new EndRunException(configId, "Fail to get any entities from request.", false));
                return;
            }
            ConcurrentHashMap<Entity, double[]> cacheMissEntities = new ConcurrentHashMap<Entity, double[]>();
            ArrayList<Map.Entry<Entity, double[]>> entityEntries = new ArrayList<Map.Entry<Entity, double[]>>(request.getEntities().entrySet());
            long deadline = this.clock.millis() + config.getInferredFrequencyInMilliseconds();
            AtomicReference<Exception> processingException = new AtomicReference<Exception>(prevException.orElse(null));
            this.processEntitiesInParallel(entityEntries, cacheMissEntities, config, configId, request, listener, processingException, deadline);
        }, exception -> {
            LOG.error((Message)new ParameterizedMessage("fail to get entity's analysis result for config [{}]: start: [{}], end: [{}]", new Object[]{configId, request.getStart(), request.getEnd()}), (Throwable)exception);
            listener.onFailure(exception);
        }, this.threadPool.executor(this.threadPoolName));
    }

    private void processEntitiesInParallel(List<Map.Entry<Entity, double[]>> entityEntries, Map<Entity, double[]> cacheMissEntities, Config config, String configId, EntityResultRequest request, ActionListener<AcknowledgedResponse> listener, AtomicReference<Exception> processingException, long deadline) {
        if (entityEntries.isEmpty()) {
            this.processCacheMissEntities(cacheMissEntities, config, configId, request, listener, processingException);
            return;
        }
        GroupedActionListener groupedListener = new GroupedActionListener(ActionListener.wrap(r -> this.processCacheMissEntities(cacheMissEntities, config, configId, request, listener, processingException), e -> {
            LOG.error("Error occurred during parallel entity processing", (Throwable)e);
            processingException.updateAndGet(existing -> {
                if (existing == null) {
                    return e;
                }
                return ExceptionUtil.selectHigherPriorityException(e, existing);
            });
            this.processCacheMissEntities(cacheMissEntities, config, configId, request, listener, processingException);
        }), entityEntries.size());
        for (Map.Entry<Entity, double[]> entityEntry : entityEntries) {
            this.threadPool.executor(this.threadPoolName).execute(() -> this.processSingleEntity(entityEntry, cacheMissEntities, config, configId, request, (ActionListener<Void>)groupedListener, deadline));
        }
    }

    private void processSingleEntity(Map.Entry<Entity, double[]> entityEntry, Map<Entity, double[]> cacheMissEntities, Config config, String configId, EntityResultRequest request, ActionListener<Void> listener, long deadline) {
        if (this.clock.millis() >= deadline) {
            LOG.warn("Timeout reached for config [{}], skipping entity processing.", (Object)config.getId());
            listener.onResponse(null);
            return;
        }
        Entity entity = entityEntry.getKey();
        if (this.isEntityFromOldNodeMsg(entity) && config.getCategoryFields() != null && config.getCategoryFields().size() == 1) {
            Map<String, String> attrValues = entity.getAttributes();
            entity = Entity.createSingleAttributeEntity(config.getCategoryFields().get(0), attrValues.get(""));
        }
        Entity finalEntity = entity;
        Optional<String> modelIdOptional = entity.getModelId(configId);
        if (modelIdOptional.isEmpty()) {
            listener.onResponse(null);
            return;
        }
        try {
            String modelId = modelIdOptional.get();
            double[] datapoint = entityEntry.getValue();
            ModelState entityModel = this.cache.get().get(modelId, config);
            if (entityModel == null) {
                cacheMissEntities.put(finalEntity, datapoint);
                listener.onResponse(null);
                return;
            }
            ((RealTimeInferencer)this.inferencer).process(new Sample(datapoint, Instant.ofEpochMilli(request.getStart()), Instant.ofEpochMilli(request.getEnd())), entityModel, config, request.getTaskId(), (ActionListener<Boolean>)ActionListener.wrap(r -> listener.onResponse(null), e -> {
                LOG.error("Failed to process entity " + String.valueOf(finalEntity), (Throwable)e);
                listener.onFailure(e);
            }));
        }
        catch (Exception e2) {
            LOG.error("Failed to process entity " + String.valueOf(finalEntity), (Throwable)e2);
            listener.onFailure(e2);
        }
    }

    private void processCacheMissEntities(Map<Entity, double[]> cacheMissEntities, Config config, String configId, EntityResultRequest request, ActionListener<AcknowledgedResponse> listener, AtomicReference<Exception> processingException) {
        try {
            Pair<List<Entity>, List<Entity>> hotColdEntities = this.cache.get().selectUpdateCandidate(cacheMissEntities.keySet(), configId, config);
            ArrayList<FeatureRequest> hotEntityRequests = new ArrayList<FeatureRequest>();
            ArrayList<FeatureRequest> coldEntityRequests = new ArrayList<FeatureRequest>();
            for (Object hotEntity : (List)hotColdEntities.getLeft()) {
                double[] hotEntityValue = cacheMissEntities.get(hotEntity);
                if (hotEntityValue == null) {
                    LOG.error((Message)new ParameterizedMessage("feature value should not be null: [{}]", hotEntity));
                    continue;
                }
                hotEntityRequests.add(new FeatureRequest(System.currentTimeMillis() + config.getInferredFrequencyInMilliseconds(), configId, RequestPriority.MEDIUM, hotEntityValue, request.getStart(), (Entity)hotEntity, request.getTaskId()));
            }
            RequestPriority coldEntityPriority = config.isLongFrequency() ? RequestPriority.MEDIUM : RequestPriority.LOW;
            for (Entity coldEntity : (List)hotColdEntities.getRight()) {
                double[] coldEntityValue = cacheMissEntities.get(coldEntity);
                if (coldEntityValue == null) {
                    LOG.error((Message)new ParameterizedMessage("feature value should not be null: [{}]", (Object)coldEntity));
                    continue;
                }
                coldEntityRequests.add(new FeatureRequest(System.currentTimeMillis() + config.getInferredFrequencyInMilliseconds(), configId, coldEntityPriority, coldEntityValue, request.getStart(), coldEntity, request.getTaskId()));
            }
            ((RateLimitedRequestWorker)this.checkpointReadQueue).putAll(hotEntityRequests);
            ((RateLimitedRequestWorker)this.coldEntityQueue).putAll(coldEntityRequests);
            Exception aggregated = processingException.get();
            if (aggregated != null) {
                listener.onFailure(aggregated);
            } else {
                listener.onResponse((Object)new AcknowledgedResponse(true));
            }
        }
        catch (Exception e) {
            LOG.error("Error processing cache miss entities for " + configId, (Throwable)e);
            listener.onFailure(e);
        }
    }

    private boolean isEntityFromOldNodeMsg(Entity categoricalValues) {
        Map<String, String> attrValues = categoricalValues.getAttributes();
        return attrValues != null && attrValues.containsKey("");
    }
}

