/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.util.concurrent;

import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.log4j.Logger;

public class MappedTaskExecutor {
    protected static final transient Logger log = Logger.getLogger(MappedTaskExecutor.class);
    private final ExecutorService service;

    public MappedTaskExecutor(ExecutorService service) {
        if (service == null) {
            throw new IllegalArgumentException();
        }
        this.service = service;
    }

    protected ExecutorService getThreadPool() {
        return this.service;
    }

    public <T> List<T> runTasks(List<Callable<T>> tasks, long timeout, TimeUnit unit, int maxParallel) throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null) {
            throw new IllegalArgumentException();
        }
        if (unit == null) {
            throw new IllegalArgumentException();
        }
        if (maxParallel <= 0) {
            throw new IllegalArgumentException();
        }
        if (tasks.isEmpty()) {
            return Collections.emptyList();
        }
        if (tasks.size() == 1) {
            return this.runOne(tasks.get(0), timeout, unit);
        }
        if (maxParallel == Integer.MAX_VALUE) {
            return this.runMaxParallel(tasks, timeout, unit);
        }
        if (maxParallel > 1) {
            return this.runLimitedParallel(tasks, timeout, unit, maxParallel);
        }
        return this.runSequence(tasks, timeout, unit);
    }

    private <T> List<T> runOne(Callable<T> task, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        Future<T> f = this.getThreadPool().submit(task);
        T t = f.get(timeout, unit);
        return Collections.singletonList(t);
    }

    private <T> List<T> runSequence(List<Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        long nanos = timeout = unit.toNanos(timeout);
        long begin = System.nanoTime();
        Iterator<Callable<T>> itr = tasks.iterator();
        LinkedList<T> list = new LinkedList<T>();
        while (itr.hasNext()) {
            if (nanos < 0L) {
                throw new TimeoutException();
            }
            Callable<T> task = itr.next();
            Future<T> f = this.getThreadPool().submit(task);
            list.add(f.get(nanos, TimeUnit.NANOSECONDS));
            nanos = timeout - (System.nanoTime() - begin);
        }
        return list;
    }

    private <T> List<T> runMaxParallel(List<Callable<T>> tasks, long timeout, TimeUnit unit) throws ExecutionException, InterruptedException {
        long begin = System.nanoTime();
        List<Future<T>> futures = this.getThreadPool().invokeAll(tasks, timeout, unit);
        Iterator<Future<T>> itr = futures.iterator();
        LinkedList<T> list = new LinkedList<T>();
        while (itr.hasNext()) {
            Future<T> f = itr.next();
            list.add(f.get());
        }
        if (log.isInfoEnabled()) {
            log.info("Ran " + tasks.size() + " tasks in parallel: elapsed=" + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin));
        }
        return list;
    }

    private <T> List<T> runLimitedParallel(List<Callable<T>> tasks, long timeout, TimeUnit unit, int maxParallel) throws ExecutionException, InterruptedException, TimeoutException {
        long nanos = timeout = unit.toNanos(timeout);
        long begin = System.nanoTime();
        Iterator<Callable<T>> titr = tasks.iterator();
        LinkedList list = new LinkedList();
        while (titr.hasNext()) {
            LinkedList<Callable<T>> taskChunk = new LinkedList<Callable<T>>();
            while (taskChunk.size() < maxParallel && titr.hasNext()) {
                taskChunk.add(titr.next());
            }
            if (nanos <= 0L) {
                throw new TimeoutException();
            }
            List futures = this.getThreadPool().invokeAll(taskChunk, nanos, TimeUnit.NANOSECONDS);
            for (Future f : futures) {
                list.add(f.get());
            }
            nanos = timeout - (System.nanoTime() - begin);
        }
        if (log.isInfoEnabled()) {
            log.info("Ran " + tasks.size() + " tasks in parallel: elapsed=" + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin));
        }
        return list;
    }
}

