package org.opentripplanner.analyst.broker;

import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.amazonaws.services.ec2.model.IamInstanceProfileSpecification;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.ShutdownBehavior;
import com.amazonaws.services.ec2.model.Tag;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
import gnu.trove.map.TIntIntMap;
import gnu.trove.map.TIntObjectMap;
import gnu.trove.map.TObjectLongMap;
import gnu.trove.map.hash.TIntIntHashMap;
import gnu.trove.map.hash.TIntObjectHashMap;
import gnu.trove.map.hash.TObjectLongHashMap;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.grizzly.http.server.Response;
import org.glassfish.grizzly.http.util.HttpStatus;
import org.opentripplanner.analyst.cluster.AnalystClusterRequest;
import org.opentripplanner.api.model.AgencyAndIdSerializer;
import org.opentripplanner.api.model.JodaLocalDateSerializer;
import org.opentripplanner.api.model.QualifiedModeSetSerializer;
import org.opentripplanner.api.model.TraverseModeSetSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opentripplanner/analyst/broker/Broker.class */
public class Broker implements Runnable {
    private static final int REDELIVERY_INTERVAL_SEC = 10;
    public static final long WORKER_STARTUP_TIME = 3600000;
    private int maxWorkers;
    private final Properties config;
    private boolean workOffline;
    private AmazonEC2 ec2;
    private String workerName;
    private String project;
    private static final Logger LOG = LoggerFactory.getLogger(Broker.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    public final CircularList<Job> jobs = new CircularList<>();
    public final int MAX_TASKS_PER_WORKER = 8;
    private int nUndeliveredTasks = 0;
    private int nWaitingConsumers = 0;
    private int nextTaskId = 0;
    private long nextRedeliveryCheckTime = System.currentTimeMillis();
    private WorkerCatalog workerCatalog = new WorkerCatalog();
    TIntIntMap deliveryTimes = new TIntIntHashMap();
    private ArrayListMultimap<String, AnalystClusterRequest> stalledHighPriorityTasks = ArrayListMultimap.create();
    private Multimap<String, AnalystClusterRequest> newHighPriorityTasks = ArrayListMultimap.create();
    private TIntObjectMap<Response> highPriorityResponses = new TIntObjectHashMap();
    Map<String, Deque<Response>> consumersByGraph = new HashMap();
    private Multimap<String, WrappedResponse> singlePointChannels = TreeMultimap.create();
    private Timer timer = new Timer();
    private TObjectLongMap<String> recentlyRequestedWorkers = new TObjectLongHashMap();
    private Multimap<String, String> activeJobsPerGraph = HashMultimap.create();
    private final Properties workerConfig = new Properties();

    /* loaded from: input_file:org/opentripplanner/analyst/broker/Broker$WrappedResponse.class */
    public static class WrappedResponse implements Comparable<WrappedResponse> {
        public final Response response;
        public final String machineId;

        public WrappedResponse(Request request, Response response) {
            this.response = response;
            this.machineId = request.getHeader("X-Worker-Id");
        }

        @Override // java.lang.Comparable
        public int compareTo(WrappedResponse wrappedResponse) {
            return this.machineId.compareTo(wrappedResponse.machineId);
        }
    }

    public Broker(Properties properties, String str, int i) {
        this.config = properties;
        if (this.config.getProperty("worker-config") != null) {
            try {
                FileInputStream fileInputStream = new FileInputStream(new File(this.config.getProperty("worker-config")));
                this.workerConfig.load(fileInputStream);
                fileInputStream.close();
            } catch (IOException e) {
                LOG.error("Error loading base worker configuration", e);
            }
        }
        this.workerConfig.setProperty("broker-address", str);
        this.workerConfig.setProperty("broker-port", "" + i);
        if (properties.getProperty("statistics-queue") != null) {
            this.workerConfig.setProperty("statistics-queue", properties.getProperty("statistics-queue"));
        }
        this.workerConfig.setProperty("graphs-bucket", properties.getProperty("graphs-bucket"));
        this.workerConfig.setProperty("pointsets-bucket", properties.getProperty("pointsets-bucket"));
        this.workerConfig.setProperty("auto-shutdown", "true");
        Boolean valueOf = Boolean.valueOf(Boolean.parseBoolean(properties.getProperty("work-offline")));
        this.workOffline = (valueOf == null ? true : valueOf).booleanValue();
        this.workerName = properties.getProperty("worker-name") != null ? properties.getProperty("worker-name") : "analyst-worker";
        this.project = properties.getProperty("project") != null ? properties.getProperty("project") : "analyst";
        this.maxWorkers = properties.getProperty("max-workers") != null ? Integer.parseInt(properties.getProperty("max-workers")) : 4;
        this.ec2 = new AmazonEC2Client();
        Region currentRegion = Regions.getCurrentRegion();
        if (currentRegion != null) {
            this.ec2.setRegion(currentRegion);
        }
    }

    public synchronized void enqueuePriorityTask(final AnalystClusterRequest analystClusterRequest, Response response) {
        boolean workersAvailableForGraph = workersAvailableForGraph(analystClusterRequest.graphId);
        if (!workersAvailableForGraph) {
            createWorkersForGraph(analystClusterRequest.graphId);
            try {
                response.setHeader("Retry-After", "30");
                response.sendError(503, "No workers available with this graph affinity, please retry shortly.");
            } catch (IOException e) {
                LOG.error("Could not finish high-priority 503 response", e);
            }
        }
        if (workersAvailableForGraph || this.workOffline) {
            int i = this.nextTaskId;
            this.nextTaskId = i + 1;
            analystClusterRequest.taskId = i;
            this.newHighPriorityTasks.put(analystClusterRequest.graphId, analystClusterRequest);
            this.highPriorityResponses.put(analystClusterRequest.taskId, response);
            this.timer.schedule(new TimerTask() { // from class: org.opentripplanner.analyst.broker.Broker.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    Broker.this.deliverHighPriorityTasks(analystClusterRequest.graphId);
                }
            }, 100L);
        }
    }

    public synchronized void deliverHighPriorityTasks(String str) {
        Collection collection = this.newHighPriorityTasks.get(str);
        if (collection.isEmpty()) {
            return;
        }
        Collection collection2 = this.singlePointChannels.get(str);
        if (!collection2.isEmpty()) {
            WrappedResponse wrappedResponse = (WrappedResponse) collection2.iterator().next();
            try {
                try {
                    wrappedResponse.response.setContentType("application/json");
                    OutputStream outputStream = wrappedResponse.response.getOutputStream();
                    mapper.writeValue(outputStream, collection);
                    outputStream.close();
                    wrappedResponse.response.resume();
                    this.newHighPriorityTasks.removeAll(str);
                    removeSinglePointChannel(str, wrappedResponse);
                    return;
                } catch (Exception e) {
                    LOG.info("Failed to deliver single point job via side channel, reverting to normal channel", e);
                    removeSinglePointChannel(str, wrappedResponse);
                }
            } catch (Throwable th) {
                removeSinglePointChannel(str, wrappedResponse);
                throw th;
            }
        }
        collection.forEach(analystClusterRequest -> {
            this.stalledHighPriorityTasks.put(str, analystClusterRequest);
        });
        LOG.info("No side channel available for graph {}, delivering {} tasks via normal channel", str, Integer.valueOf(collection.size()));
        this.nUndeliveredTasks += collection.size();
        this.newHighPriorityTasks.removeAll(str);
        notify();
    }

    public synchronized void enqueueTasks(List<AnalystClusterRequest> list) {
        Job findJob = findJob(list.get(0));
        if (!workersAvailableForGraph(findJob.graphId)) {
            createWorkersForGraph(findJob.graphId);
        }
        for (AnalystClusterRequest analystClusterRequest : list) {
            int i = this.nextTaskId;
            this.nextTaskId = i + 1;
            analystClusterRequest.taskId = i;
            findJob.addTask(analystClusterRequest);
            this.nUndeliveredTasks++;
            LOG.debug("Enqueued task id {} in job {}", Integer.valueOf(analystClusterRequest.taskId), findJob.jobId);
            if (!analystClusterRequest.graphId.equals(findJob.graphId)) {
                LOG.warn("Task graph ID {} does not match job graph ID {}.", analystClusterRequest.graphId, findJob.graphId);
            }
        }
        notify();
    }

    public boolean workersAvailableForGraph(String str) {
        this.workerCatalog.purgeDeadWorkers();
        return !this.workerCatalog.workersByGraph.get(str).isEmpty();
    }

    public void createWorkersForGraph(String str) {
        String replaceAll = UUID.randomUUID().toString().replaceAll("-", "");
        if (this.workOffline) {
            LOG.info("Work offline enabled, not creating workers for graph {}", str);
            return;
        }
        if (this.workerCatalog.observationsByWorkerId.size() >= this.maxWorkers) {
            LOG.warn("{} workers already started, not starting more; jobs on graph {} will not complete", Integer.valueOf(this.maxWorkers), str);
            return;
        }
        if (this.recentlyRequestedWorkers.containsKey(str) && this.recentlyRequestedWorkers.get(str) >= System.currentTimeMillis() - WORKER_STARTUP_TIME) {
            LOG.info("workers still starting on graph {}, not starting more", str);
            return;
        }
        LOG.info("Starting {} workers as there are none on graph {}", 1, str);
        RunInstancesRequest runInstancesRequest = new RunInstancesRequest();
        runInstancesRequest.setImageId(this.config.getProperty("ami-id"));
        runInstancesRequest.setInstanceType(InstanceType.valueOf(this.config.getProperty("worker-type")));
        runInstancesRequest.setSubnetId(this.config.getProperty("subnet-id"));
        runInstancesRequest.setMinCount(1);
        runInstancesRequest.setMaxCount(1);
        this.workerConfig.setProperty("initial-graph-id", str);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            this.workerConfig.store(byteArrayOutputStream, "Worker config");
            byteArrayOutputStream.close();
            runInstancesRequest.setUserData(new String(Base64.getEncoder().encode(byteArrayOutputStream.toByteArray())));
            if (this.config.getProperty("worker-iam-role") != null) {
                runInstancesRequest.setIamInstanceProfile(new IamInstanceProfileSpecification().withArn(this.config.getProperty("worker-iam-role")));
            }
            if (this.config.getProperty("subnet") != null) {
                runInstancesRequest.setSubnetId(this.config.getProperty("subnet"));
            }
            runInstancesRequest.setClientToken(replaceAll);
            runInstancesRequest.setInstanceInitiatedShutdownBehavior(ShutdownBehavior.Terminate);
            this.ec2.runInstances(runInstancesRequest).getReservation().getInstances().forEach(instance -> {
                instance.setTags(Arrays.asList(new Tag("name", this.workerName), new Tag("project", this.project)));
            });
            this.recentlyRequestedWorkers.put(str, System.currentTimeMillis());
            LOG.info("Requesting {} workers", 1);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized void registerSuspendedResponse(String str, Response response) {
        String header = response.getRequest().getHeader("X-Worker-Id");
        if (header == null || header.isEmpty()) {
            LOG.error("Worker did not supply a unique ID for itself . Ignoring it.");
            return;
        }
        this.workerCatalog.catalog(header, str);
        Deque<Response> deque = this.consumersByGraph.get(str);
        if (deque == null) {
            deque = new ArrayDeque();
            this.consumersByGraph.put(str, deque);
        }
        deque.addLast(response);
        this.nWaitingConsumers++;
        notify();
    }

    public synchronized boolean removeSuspendedResponse(String str, Response response) {
        Deque<Response> deque = this.consumersByGraph.get(str);
        if (deque == null || !deque.remove(response)) {
            return false;
        }
        this.nWaitingConsumers--;
        LOG.debug("Removed closed connection from queue.");
        logQueueStatus();
        return true;
    }

    public synchronized void registerSinglePointChannel(String str, WrappedResponse wrappedResponse) {
        this.singlePointChannels.put(str, wrappedResponse);
    }

    public synchronized boolean removeSinglePointChannel(String str, WrappedResponse wrappedResponse) {
        return this.singlePointChannels.remove(str, wrappedResponse);
    }

    private void logQueueStatus() {
        LOG.info("{} undelivered, of which {} high-priority", Integer.valueOf(this.nUndeliveredTasks), Integer.valueOf(this.stalledHighPriorityTasks.size()));
        LOG.info("{} producers waiting, {} consumers waiting", Integer.valueOf(this.highPriorityResponses.size()), Integer.valueOf(this.nWaitingConsumers));
        LOG.info("{} total workers", Integer.valueOf(this.workerCatalog.size()));
    }

    private void redeliver() {
        if (System.currentTimeMillis() > this.nextRedeliveryCheckTime) {
            this.nextRedeliveryCheckTime += 10000;
            LOG.info("Scanning for redelivery...");
            int i = 0;
            int i2 = 0;
            Iterator<Job> it = this.jobs.iterator();
            while (it.hasNext()) {
                Job next = it.next();
                i2 += next.invisibleUntil.size();
                i += next.redeliver();
            }
            LOG.info("{} tasks enqueued for redelivery out of {} invisible tasks.", Integer.valueOf(i), Integer.valueOf(i2));
            this.nUndeliveredTasks += i;
        }
    }

    public synchronized void deliverTasks() throws InterruptedException {
        Deque<Response> deque;
        Deque<Response> deque2;
        while (this.nUndeliveredTasks == 0) {
            LOG.debug("Task delivery thread is going to sleep, there are no tasks waiting for delivery.");
            logQueueStatus();
            wait();
            redeliver();
        }
        LOG.debug("Task delivery thread is awake and there are some undelivered tasks.");
        logQueueStatus();
        while (this.nWaitingConsumers == 0) {
            LOG.debug("Task delivery thread is going to sleep, there are no consumers waiting.");
            wait();
        }
        LOG.debug("Task delivery thread awake; consumers are waiting and tasks are available");
        for (Map.Entry entry : this.stalledHighPriorityTasks.asMap().entrySet()) {
            String str = (String) entry.getKey();
            Collection collection = (Collection) entry.getValue();
            if (this.workOffline) {
                Optional<Deque<Response>> findFirst = this.consumersByGraph.values().stream().filter(deque3 -> {
                    return !deque3.isEmpty();
                }).findFirst();
                deque2 = findFirst.isPresent() ? findFirst.get() : null;
            } else {
                deque2 = this.consumersByGraph.get(str);
            }
            if (deque2 == null || deque2.isEmpty()) {
                LOG.warn("No consumer found for graph {}, needed for {} high-priority tasks", str, Integer.valueOf(collection.size()));
            } else {
                Iterator it = collection.iterator();
                while (it.hasNext() && !deque2.isEmpty()) {
                    Response pop = deque2.pop();
                    Job job = new Job("HIGH PRIORITY");
                    job.graphId = str;
                    for (int i = 0; i < 8 && it.hasNext(); i++) {
                        job.addTask((AnalystClusterRequest) it.next());
                        it.remove();
                    }
                    deliver(job, pop);
                    this.nWaitingConsumers--;
                }
            }
        }
        while (this.nWaitingConsumers > 0) {
            this.jobs.advance();
            Job advanceToElement = !this.workOffline ? this.jobs.advanceToElement(job2 -> {
                return (job2.tasksAwaitingDelivery.isEmpty() || !this.consumersByGraph.containsKey(job2.graphId) || this.consumersByGraph.get(job2.graphId).isEmpty()) ? false : true;
            }) : this.jobs.advanceToElement(job3 -> {
                return !job3.tasksAwaitingDelivery.isEmpty();
            });
            if (advanceToElement == null) {
                break;
            }
            if (this.workOffline) {
                Optional<Deque<Response>> findFirst2 = this.consumersByGraph.values().stream().filter(deque4 -> {
                    return !deque4.isEmpty();
                }).findFirst();
                deque = findFirst2.isPresent() ? findFirst2.get() : null;
            } else {
                deque = this.consumersByGraph.get(advanceToElement.graphId);
            }
            deliver(advanceToElement, deque.pop());
            this.nWaitingConsumers--;
        }
        wait();
    }

    public Job getJobForTask(int i) {
        Iterator<Job> it = this.jobs.iterator();
        while (it.hasNext()) {
            Job next = it.next();
            if (next.containsTask(i)) {
                return next;
            }
        }
        return null;
    }

    public synchronized boolean deliver(Job job, Response response) {
        if (!response.getRequest().getRequest().getConnection().isOpen()) {
            LOG.debug("Consumer connection was closed. It will be removed.");
            return false;
        }
        List<AnalystClusterRequest> arrayList = new ArrayList<>();
        while (arrayList.size() < 8 && !job.tasksAwaitingDelivery.isEmpty()) {
            arrayList.add(job.tasksAwaitingDelivery.poll());
        }
        try {
            response.setStatus(HttpStatus.OK_200);
            mapper.writeValue(response.getOutputStream(), arrayList);
            response.resume();
            LOG.debug("Delivery of {} tasks succeeded.", Integer.valueOf(arrayList.size()));
            this.nUndeliveredTasks -= arrayList.size();
            job.markTasksDelivered(arrayList);
            return true;
        } catch (IOException e) {
            LOG.debug("Consumer connection caused IO error, it will be removed.");
            response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
            response.resume();
            job.tasksAwaitingDelivery.addAll(arrayList);
            return false;
        }
    }

    public synchronized boolean markTaskCompleted(int i) {
        Job jobForTask = getJobForTask(i);
        if (jobForTask == null) {
            LOG.error("Could not find a job containing task {}, and therefore could not mark the task as completed.");
            return false;
        }
        jobForTask.markTaskCompleted(i);
        return true;
    }

    public synchronized Response deletePriorityTask(int i) {
        return (Response) this.highPriorityResponses.remove(i);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                deliverTasks();
            } catch (InterruptedException e) {
                LOG.info("Task pump thread was interrupted.");
                return;
            }
        }
    }

    public Job findJob(AnalystClusterRequest analystClusterRequest) {
        Job findJob = findJob(analystClusterRequest.jobId);
        if (findJob != null) {
            return findJob;
        }
        Job job = new Job(analystClusterRequest.jobId);
        job.graphId = analystClusterRequest.graphId;
        this.jobs.insertAtTail(job);
        return job;
    }

    public Job findJob(String str) {
        Iterator<Job> it = this.jobs.iterator();
        while (it.hasNext()) {
            Job next = it.next();
            if (next.jobId.equals(str)) {
                return next;
            }
        }
        return null;
    }

    public synchronized boolean deleteJob(String str) {
        Job findJob = findJob(str);
        if (findJob == null) {
            return false;
        }
        this.nUndeliveredTasks -= findJob.tasksAwaitingDelivery.size();
        return this.jobs.remove(findJob);
    }

    public synchronized boolean anyJobsActive() {
        Iterator<Job> it = this.jobs.iterator();
        while (it.hasNext()) {
            if (!it.next().isComplete()) {
                return true;
            }
        }
        return false;
    }

    void activateJob(Job job) {
        this.activeJobsPerGraph.put(job.graphId, job.jobId);
    }

    void deactivateJob(Job job) {
        this.activeJobsPerGraph.remove(job.graphId, job.jobId);
    }

    static {
        mapper.registerModule(AgencyAndIdSerializer.makeModule());
        mapper.registerModule(QualifiedModeSetSerializer.makeModule());
        mapper.registerModule(JodaLocalDateSerializer.makeModule());
        mapper.registerModule(TraverseModeSetSerializer.makeModule());
    }
}
