/*
 * Decompiled with CFR 0.152.
 */
package com.bigdata.service;

import com.bigdata.bfs.BigdataFileSystem;
import com.bigdata.bfs.GlobalFileSystemHelper;
import com.bigdata.btree.IndexMetadata;
import com.bigdata.counters.AbstractStatisticsCollector;
import com.bigdata.counters.CounterSet;
import com.bigdata.counters.ICounterSetAccess;
import com.bigdata.counters.OneShotInstrument;
import com.bigdata.counters.ganglia.BigdataGangliaService;
import com.bigdata.counters.ganglia.BigdataMetadataFactory;
import com.bigdata.counters.ganglia.HostMetricsCollector;
import com.bigdata.counters.ganglia.QueryEngineMetricsCollector;
import com.bigdata.counters.query.QueryUtil;
import com.bigdata.ganglia.DefaultMetadataFactory;
import com.bigdata.ganglia.GangliaMetadataFactory;
import com.bigdata.ganglia.GangliaSlopeEnum;
import com.bigdata.ganglia.IGangliaDefaults;
import com.bigdata.ganglia.util.GangliaUtil;
import com.bigdata.journal.NoSuchIndexException;
import com.bigdata.journal.TemporaryStore;
import com.bigdata.journal.TemporaryStoreFactory;
import com.bigdata.relation.locator.DefaultResourceLocator;
import com.bigdata.service.AbstractClient;
import com.bigdata.service.AbstractIndexCache;
import com.bigdata.service.DefaultClientDelegate;
import com.bigdata.service.Event;
import com.bigdata.service.IBigdataClient;
import com.bigdata.service.IBigdataFederation;
import com.bigdata.service.IDataService;
import com.bigdata.service.IFederationDelegate;
import com.bigdata.service.ILoadBalancerService;
import com.bigdata.service.IMetadataService;
import com.bigdata.service.IService;
import com.bigdata.service.ListIndicesTask;
import com.bigdata.service.ndx.IClientIndex;
import com.bigdata.service.ndx.ScaleOutIndexCounters;
import com.bigdata.sparse.GlobalRowStoreHelper;
import com.bigdata.sparse.SparseRowStore;
import com.bigdata.util.DaemonThreadFactory;
import com.bigdata.util.InnerCause;
import com.bigdata.util.concurrent.ShutdownHelper;
import com.bigdata.util.concurrent.TaskCounters;
import com.bigdata.util.concurrent.ThreadPoolExecutorStatisticsTask;
import com.bigdata.util.httpd.AbstractHTTPD;
import com.bigdata.util.httpd.NanoHTTPD;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URLEncoder;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;

public abstract class AbstractFederation<T>
implements IBigdataFederation<T> {
    protected static final Logger log = Logger.getLogger(IBigdataFederation.class);
    private final AtomicReference<AbstractClient<T>> client = new AtomicReference();
    private final boolean collectPlatformStatistics;
    private final boolean collectQueueStatistics;
    private final int httpdPort;
    private final AtomicBoolean open = new AtomicBoolean(false);
    private final ThreadPoolExecutor threadPool;
    private final ScheduledExecutorService scheduledExecutorService;
    private final AtomicReference<AbstractHTTPD> httpd = new AtomicReference();
    private final AtomicReference<String> httpdURL = new AtomicReference();
    private final DefaultResourceLocator<?> resourceLocator;
    private final TaskCounters taskCounters = new TaskCounters();
    private final Map<String, ScaleOutIndexCounters> scaleOutIndexCounters = new HashMap<String, ScaleOutIndexCounters>();
    private final AtomicReference<AbstractStatisticsCollector> statisticsCollector = new AtomicReference();
    private final AtomicReference<FutureTask<Void>> gangliaFuture = new AtomicReference();
    private final AtomicReference<BigdataGangliaService> gangliaService = new AtomicReference();
    private final Lock countersLock = new ReentrantLock(false);
    private CounterSet countersRoot;
    private CounterSet serviceRoot;
    private final GlobalRowStoreHelper globalRowStoreHelper = new GlobalRowStoreHelper(this);
    private final GlobalFileSystemHelper globalFileSystemHelper = new GlobalFileSystemHelper(this);
    private final TemporaryStoreFactory tempStoreFactory;
    private static String ERR_NO_SERVICE_UUID = "Service UUID is not assigned yet.";
    private static String ERR_SERVICE_NOT_READY = "Service is not ready yet.";
    private final BlockingQueue<Event> events = new LinkedBlockingQueue<Event>();

    @Override
    public AbstractClient<T> getClient() {
        AbstractClient<T> t = this.client.get();
        if (t == null) {
            throw new IllegalStateException();
        }
        return t;
    }

    public final boolean isOpen() {
        return this.open.get();
    }

    public synchronized void shutdown() {
        Object t;
        if (!this.open.compareAndSet(true, false)) {
            return;
        }
        long begin = System.currentTimeMillis();
        if (log.isInfoEnabled()) {
            log.info("begin");
        }
        try {
            FutureTask ft = this.gangliaFuture.getAndSet(null);
            if (ft != null) {
                ft.cancel(true);
            }
            this.gangliaService.set(null);
            new ShutdownHelper(this.threadPool, 10L, TimeUnit.SECONDS){

                @Override
                public void logTimeout() {
                    log.warn("Awaiting thread pool termination: elapsed=" + TimeUnit.NANOSECONDS.toMillis(this.elapsed()) + "ms");
                }
            };
            t = this.statisticsCollector.getAndSet(null);
            if (t != null) {
                ((AbstractStatisticsCollector)t).stop();
            }
            new ShutdownHelper(this.scheduledExecutorService, 10L, TimeUnit.SECONDS){

                @Override
                public void logTimeout() {
                    log.warn("Awaiting sample service termination: elapsed=" + TimeUnit.NANOSECONDS.toMillis(this.elapsed()) + "ms");
                }
            };
        }
        catch (InterruptedException e) {
            log.warn("Interrupted awaiting thread pool termination.", e);
        }
        new SendEventsTask().run();
        t = this.httpd.getAndSet(null);
        if (t != null) {
            ((NanoHTTPD)t).shutdown();
        }
        this.httpdURL.set(null);
        if (log.isInfoEnabled()) {
            log.info("done: elapsed=" + (System.currentTimeMillis() - begin));
        }
        if ((t = (AbstractClient)this.client.getAndSet(null)) != null) {
            t.disconnect(false);
        }
        this.tempStoreFactory.closeAll();
    }

    public synchronized void shutdownNow() {
        FutureTask ft;
        if (!this.open.compareAndSet(true, false)) {
            return;
        }
        long begin = System.currentTimeMillis();
        if (log.isInfoEnabled()) {
            log.info("begin");
        }
        this.threadPool.shutdownNow();
        AbstractClient<T> t = this.statisticsCollector.getAndSet(null);
        if (t != null) {
            ((AbstractStatisticsCollector)((Object)t)).stop();
        }
        if ((ft = (FutureTask)this.gangliaFuture.getAndSet(null)) != null) {
            ft.cancel(true);
        }
        this.gangliaService.set(null);
        this.scheduledExecutorService.shutdownNow();
        this.events.clear();
        t = this.httpd.getAndSet(null);
        if (t != null) {
            ((NanoHTTPD)((Object)t)).shutdownNow();
        }
        this.httpdURL.set(null);
        if (log.isInfoEnabled()) {
            log.info("done: elapsed=" + (System.currentTimeMillis() - begin));
        }
        if ((t = this.client.get()) != null) {
            t.disconnect(true);
        }
        this.tempStoreFactory.closeAll();
    }

    @Override
    public synchronized void destroy() {
        if (this.isOpen()) {
            this.shutdownNow();
        }
        this.tempStoreFactory.closeAll();
    }

    protected final void assertOpen() {
        if (this.client == null) {
            throw new IllegalStateException();
        }
    }

    public ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    @Override
    public boolean getCollectPlatformStatistics() {
        return this.collectPlatformStatistics;
    }

    @Override
    public boolean getCollectQueueStatistics() {
        return this.collectQueueStatistics;
    }

    @Override
    public int getHttpdPort() {
        return this.httpdPort;
    }

    @Override
    public final String getHttpdURL() {
        return this.httpdURL.get();
    }

    @Override
    public DefaultResourceLocator<?> getResourceLocator() {
        this.assertOpen();
        return this.resourceLocator;
    }

    public TaskCounters getTaskCounters() {
        return this.taskCounters;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ScaleOutIndexCounters getIndexCounters(String name) {
        if (name == null) {
            throw new IllegalArgumentException();
        }
        Map<String, ScaleOutIndexCounters> map = this.scaleOutIndexCounters;
        synchronized (map) {
            ScaleOutIndexCounters t = this.scaleOutIndexCounters.get(name);
            if (t == null) {
                t = new ScaleOutIndexCounters(this);
                this.scaleOutIndexCounters.put(name, t);
                this.getServiceCounterSet().makePath("Indices").makePath(name).attach(t.getCounters(), true);
            }
            return t;
        }
    }

    @Override
    public ScheduledFuture<?> addScheduledTask(Runnable task, long initialDelay, long delay, TimeUnit unit) {
        if (task == null) {
            throw new IllegalArgumentException();
        }
        if (log.isInfoEnabled()) {
            log.info("Scheduling task: task=" + task.getClass() + ", initialDelay=" + initialDelay + ", delay=" + delay + ", unit=" + (Object)((Object)unit));
        }
        return this.scheduledExecutorService.scheduleWithFixedDelay(task, initialDelay, delay, unit);
    }

    public final BigdataGangliaService getGangliaService() {
        return this.gangliaService.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final CounterSet getCounters() {
        this.countersLock.lock();
        try {
            if (this.countersRoot == null) {
                this.countersRoot = new CounterSet();
                AbstractStatisticsCollector tmp = this.statisticsCollector.get();
                if (tmp != null) {
                    this.countersRoot.attach(tmp.getCounters());
                }
                this.serviceRoot = this.countersRoot.makePath(this.getServiceCounterPathPrefix());
                String s = this.httpdURL.get();
                if (s != null) {
                    this.serviceRoot.addCounter("Local httpd", new OneShotInstrument<String>(s));
                }
                AbstractStatisticsCollector.addBasicServiceOrClientCounters(this.serviceRoot, this.getServiceName(), this.getServiceIface(), ((AbstractClient)this.getClient()).getProperties());
            }
            CounterSet counterSet = this.countersRoot;
            return counterSet;
        }
        finally {
            this.countersLock.unlock();
        }
    }

    @Override
    public CounterSet getHostCounterSet() {
        String pathPrefix = "/" + AbstractStatisticsCollector.fullyQualifiedHostName;
        return (CounterSet)this.getCounters().getPath(pathPrefix);
    }

    @Override
    public CounterSet getServiceCounterSet() {
        this.getCounters();
        return this.serviceRoot;
    }

    @Override
    public String getServiceCounterPathPrefix() {
        String hostname = AbstractStatisticsCollector.fullyQualifiedHostName;
        return AbstractFederation.getServiceCounterPathPrefix(this.getServiceUUID(), this.getServiceIface(), hostname);
    }

    public static String getServiceCounterPathPrefix(UUID serviceUUID, Class serviceIface, String hostname) {
        if (serviceUUID == null) {
            throw new IllegalArgumentException();
        }
        if (serviceIface == null) {
            throw new IllegalArgumentException();
        }
        if (hostname == null) {
            throw new IllegalArgumentException();
        }
        String ps = "/";
        String pathPrefix = "/" + hostname + "/" + "service" + "/" + serviceIface.getName() + "/" + serviceUUID + "/";
        return pathPrefix;
    }

    @Override
    public ExecutorService getExecutorService() {
        this.assertOpen();
        return this.threadPool;
    }

    protected AbstractFederation(IBigdataClient<T> client) {
        int threadPoolSize;
        if (client == null) {
            throw new IllegalArgumentException();
        }
        this.open.set(true);
        AbstractClient client2 = (AbstractClient)client;
        this.client.set(client2);
        if (client2.getDelegate() == null) {
            client2.setDelegate(new DefaultClientDelegate<Object>(client, null));
        }
        this.threadPool = (threadPoolSize = client.getThreadPoolSize()) == 0 ? (ThreadPoolExecutor)Executors.newCachedThreadPool(new DaemonThreadFactory(this.getClass().getName() + ".executorService")) : (ThreadPoolExecutor)Executors.newFixedThreadPool(threadPoolSize, new DaemonThreadFactory(this.getClass().getName() + ".executorService"));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(this.getClass().getName() + ".scheduledService"));
        this.tempStoreFactory = new TemporaryStoreFactory(client.getProperties());
        Properties properties = client.getProperties();
        this.collectPlatformStatistics = Boolean.parseBoolean(properties.getProperty(IBigdataClient.Options.COLLECT_PLATFORM_STATISTICS, "true"));
        if (log.isInfoEnabled()) {
            log.info(IBigdataClient.Options.COLLECT_PLATFORM_STATISTICS + "=" + this.collectPlatformStatistics);
        }
        this.collectQueueStatistics = Boolean.parseBoolean(properties.getProperty(IBigdataClient.Options.COLLECT_QUEUE_STATISTICS, "true"));
        if (log.isInfoEnabled()) {
            log.info(IBigdataClient.Options.COLLECT_QUEUE_STATISTICS + "=" + this.collectQueueStatistics);
        }
        this.httpdPort = Integer.parseInt(properties.getProperty(IBigdataClient.Options.HTTPD_PORT, "0"));
        if (log.isInfoEnabled()) {
            log.info(IBigdataClient.Options.HTTPD_PORT + "=" + this.httpdPort);
        }
        if (this.httpdPort < 0 && this.httpdPort != -1) {
            throw new RuntimeException(IBigdataClient.Options.HTTPD_PORT + " must be -1 (disabled), 0 (random port), or positive");
        }
        this.addScheduledTask(new SendEventsTask(), 100L, 2000L, TimeUnit.MILLISECONDS);
        this.getExecutorService().execute(new StartDeferredTasksTask());
        this.resourceLocator = new DefaultResourceLocator(this, null, ((AbstractClient)client).getLocatorCacheCapacity(), ((AbstractClient)client).getLocatorCacheTimeout());
    }

    @Override
    public boolean isGroupCommit() {
        return true;
    }

    @Override
    public void registerIndex(IndexMetadata metadata) {
        this.assertOpen();
        this.registerIndex(metadata, null);
    }

    @Override
    public UUID registerIndex(IndexMetadata metadata, UUID dataServiceUUID) {
        this.assertOpen();
        if (dataServiceUUID == null && (dataServiceUUID = metadata.getInitialDataServiceUUID()) == null) {
            ILoadBalancerService loadBalancerService = this.getLoadBalancerService();
            if (loadBalancerService == null) {
                try {
                    dataServiceUUID = this.getAnyDataService().getServiceUUID();
                }
                catch (Exception ex) {
                    log.error(ex);
                    throw new RuntimeException(ex);
                }
            }
            try {
                dataServiceUUID = loadBalancerService.getUnderUtilizedDataService();
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
        return this.registerIndex(metadata, new byte[][]{new byte[0]}, new UUID[]{dataServiceUUID});
    }

    @Override
    public UUID registerIndex(IndexMetadata metadata, byte[][] separatorKeys, UUID[] dataServiceUUIDs) {
        this.assertOpen();
        try {
            UUID indexUUID = this.getMetadataService().registerScaleOutIndex(metadata, separatorKeys, dataServiceUUIDs);
            return indexUUID;
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    protected abstract AbstractIndexCache<? extends IClientIndex> getIndexCache();

    @Override
    public IClientIndex getIndex(String name, long timestamp) {
        if (log.isInfoEnabled()) {
            log.info("name=" + name + " @ " + timestamp);
        }
        this.assertOpen();
        return this.getIndexCache().getIndex(name, timestamp);
    }

    @Override
    public void dropIndex(String name) {
        if (log.isInfoEnabled()) {
            log.info("name=" + name);
        }
        this.assertOpen();
        try {
            this.getMetadataService().dropScaleOutIndex(name);
            if (log.isInfoEnabled()) {
                log.info("dropped scale-out index.");
            }
            this.getIndexCache().dropIndexFromCache(name);
        }
        catch (Exception e) {
            if (InnerCause.isInnerCause(e, NoSuchIndexException.class)) {
                NoSuchIndexException tmp = new NoSuchIndexException(name);
                tmp.initCause(e);
                throw tmp;
            }
            throw new RuntimeException(e);
        }
    }

    @Override
    public Iterator<String> indexNameScan(String prefix, long timestamp) {
        if (log.isInfoEnabled()) {
            log.info("prefix=" + prefix + " @ " + timestamp);
        }
        this.assertOpen();
        try {
            String namespace = "metadata-" + (prefix == null ? "" : prefix);
            IMetadataService mds = this.getMetadataService();
            if (mds == null) {
                throw new RuntimeException("Could not discover the metadata service");
            }
            String[] names = (String[])mds.submit(new ListIndicesTask(timestamp, namespace)).get();
            return Arrays.asList(names).iterator();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public SparseRowStore getGlobalRowStore() {
        return this.globalRowStoreHelper.getGlobalRowStore();
    }

    @Override
    public SparseRowStore getGlobalRowStore(long timestamp) {
        return this.globalRowStoreHelper.get(timestamp);
    }

    @Override
    public BigdataFileSystem getGlobalFileSystem() {
        return this.globalFileSystemHelper.getGlobalFileSystem();
    }

    @Override
    public TemporaryStore getTempStore() {
        return this.tempStoreFactory.getTempStore();
    }

    public void reportCounters() {
        new ReportTask(this).run();
    }

    @Override
    public T getService() {
        return ((AbstractClient)this.getClient()).getDelegate().getService();
    }

    @Override
    public String getServiceName() {
        return ((AbstractClient)this.getClient()).getDelegate().getServiceName();
    }

    @Override
    public Class<?> getServiceIface() {
        return ((AbstractClient)this.getClient()).getDelegate().getServiceIface();
    }

    @Override
    public UUID getServiceUUID() {
        return ((AbstractClient)this.getClient()).getDelegate().getServiceUUID();
    }

    @Override
    public boolean isServiceReady() {
        AbstractClient<T> thisClient = this.client.get();
        if (thisClient == null) {
            return false;
        }
        IFederationDelegate<T> delegate = thisClient.getDelegate();
        if (delegate == null) {
            return false;
        }
        return delegate.isServiceReady();
    }

    @Override
    public void reattachDynamicCounters() {
        ((AbstractClient)this.getClient()).getDelegate().reattachDynamicCounters();
    }

    @Override
    public void didStart() {
        ((AbstractClient)this.getClient()).getDelegate().didStart();
    }

    @Override
    public AbstractHTTPD newHttpd(int httpdPort, ICounterSetAccess accessor) throws IOException {
        return ((AbstractClient)this.getClient()).getDelegate().newHttpd(httpdPort, accessor);
    }

    @Override
    public void serviceJoin(IService service, UUID serviceUUID) {
        if (!this.isOpen()) {
            return;
        }
        if (log.isInfoEnabled()) {
            log.info("service=" + service + ", serviceUUID" + serviceUUID);
        }
        ((AbstractClient)this.getClient()).getDelegate().serviceJoin(service, serviceUUID);
    }

    @Override
    public void serviceLeave(UUID serviceUUID) {
        AbstractClient<T> thisClient;
        if (!this.isOpen()) {
            return;
        }
        if (log.isInfoEnabled()) {
            log.info("serviceUUID=" + serviceUUID);
        }
        if ((thisClient = this.client.get()) != null && thisClient.isConnected()) {
            thisClient.getDelegate().serviceLeave(serviceUUID);
        }
    }

    @Override
    public IDataService[] getDataServices(UUID[] uuids) {
        IDataService[] services = new IDataService[uuids.length];
        AbstractFederation fed = this;
        int i = 0;
        UUID mdsUUID = null;
        for (UUID uuid : uuids) {
            IDataService service = fed.getDataService(uuid);
            if (service == null) {
                if (mdsUUID == null) {
                    try {
                        mdsUUID = fed.getMetadataService().getServiceUUID();
                    }
                    catch (IOException ex) {
                        throw new RuntimeException(ex);
                    }
                }
                if (uuid == mdsUUID) {
                    service = fed.getMetadataService();
                }
            }
            if (service == null) {
                throw new RuntimeException("Could not discover service: uuid=" + uuid);
            }
            services[i++] = service;
        }
        return services;
    }

    protected void sendEvent(Event e) {
        if (this.isOpen()) {
            this.events.add(e);
        }
    }

    private class SendEventsTask
    implements Runnable {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                int nevents;
                ILoadBalancerService lbs = AbstractFederation.this.getLoadBalancerService();
                if (lbs == null) {
                    return;
                }
                long begin = System.currentTimeMillis();
                LinkedList c = new LinkedList();
                AbstractFederation.this.events.drainTo(c);
                Iterator i$ = c.iterator();
                while (i$.hasNext()) {
                    Event e;
                    Event event = e = (Event)i$.next();
                    synchronized (event) {
                        lbs.notifyEvent(e);
                    }
                }
                if (log.isInfoEnabled() && (nevents = c.size()) > 0) {
                    log.info("Sent " + c.size() + " events in " + (System.currentTimeMillis() - begin) + "ms");
                }
            }
            catch (Throwable t) {
                log.warn(AbstractFederation.this.getServiceName(), t);
            }
        }
    }

    public static class ReportTask
    implements Runnable {
        protected final Logger log = Logger.getLogger(ReportTask.class);
        private final AbstractFederation<?> fed;

        public ReportTask(AbstractFederation<?> fed) {
            if (fed == null) {
                throw new IllegalArgumentException();
            }
            this.fed = fed;
        }

        @Override
        public void run() {
            try {
                this.fed.reattachDynamicCounters();
            }
            catch (Throwable t) {
                this.log.error("Could not update performance counter view : " + t, t);
            }
            try {
                this.reportPerformanceCounters();
            }
            catch (Throwable t) {
                this.log.error("Could not report performance counters : " + t, t);
            }
        }

        protected void reportPerformanceCounters() throws IOException {
            UUID serviceUUID = this.fed.getServiceUUID();
            if (serviceUUID == null) {
                if (this.log.isInfoEnabled()) {
                    this.log.info("Service UUID not assigned yet.");
                }
                return;
            }
            ILoadBalancerService loadBalancerService = this.fed.getLoadBalancerService();
            if (loadBalancerService == null) {
                this.log.warn("Could not discover load balancer service.");
                return;
            }
            if (serviceUUID.equals(loadBalancerService.getServiceUUID())) {
                return;
            }
            ByteArrayOutputStream baos = new ByteArrayOutputStream(2048);
            Properties p = ((AbstractClient)this.fed.getClient()).getProperties();
            boolean reportAll = Boolean.valueOf(p.getProperty(IBigdataClient.Options.REPORT_ALL, "false"));
            this.fed.getCounters().asXML(baos, "UTF-8", reportAll ? null : QueryUtil.getRequiredPerformanceCountersFilter());
            if (this.log.isInfoEnabled()) {
                this.log.info("reportAll=" + reportAll + ", service=" + this.fed.getServiceName() + ", #bytesReported=" + baos.size());
            }
            loadBalancerService.notify(serviceUUID, baos.toByteArray());
            if (this.log.isInfoEnabled()) {
                this.log.info("Notified the load balancer.");
            }
        }
    }

    protected class StartDeferredTasksTask
    implements Runnable {
        private final Logger log = Logger.getLogger(StartDeferredTasksTask.class);
        final long begin = System.currentTimeMillis();

        private StartDeferredTasksTask() {
        }

        @Override
        public void run() {
            try {
                this.startDeferredTasks();
            }
            catch (RejectedExecutionException t) {
                if (AbstractFederation.this.isOpen()) {
                    this.log.error(t, t);
                }
            }
            catch (Throwable t) {
                this.log.error(t, t);
                return;
            }
        }

        protected void startDeferredTasks() throws IOException {
            try {
                long elapsed = System.currentTimeMillis() - this.begin;
                while (AbstractFederation.this.getServiceUUID() == null) {
                    if (elapsed > 10000L) {
                        this.log.warn(ERR_NO_SERVICE_UUID + " : iface=" + AbstractFederation.this.getServiceIface() + ", name=" + AbstractFederation.this.getServiceName() + ", elapsed=" + elapsed);
                    } else if (this.log.isInfoEnabled()) {
                        this.log.info(ERR_NO_SERVICE_UUID);
                    }
                    try {
                        Thread.sleep(2000L);
                    }
                    catch (InterruptedException e) {}
                }
                while (!AbstractFederation.this.isServiceReady()) {
                    if (elapsed > 10000L) {
                        this.log.warn(ERR_SERVICE_NOT_READY + " : iface=" + AbstractFederation.this.getServiceIface() + ", name=" + AbstractFederation.this.getServiceName() + ", elapsed=" + elapsed);
                    } else if (this.log.isInfoEnabled()) {
                        this.log.info(ERR_SERVICE_NOT_READY + " : " + elapsed);
                    }
                    try {
                        Thread.sleep(2000L);
                    }
                    catch (InterruptedException e) {}
                }
                this.startPlatformStatisticsCollection();
                this.startQueueStatisticsCollection();
                Properties properties = ((AbstractClient)AbstractFederation.this.getClient()).getProperties();
                boolean listen = Boolean.valueOf(properties.getProperty(IBigdataClient.Options.GANGLIA_LISTEN, "true"));
                boolean report = Boolean.valueOf(properties.getProperty(IBigdataClient.Options.GANGLIA_REPORT, "true"));
                if (listen || report) {
                    this.startGangliaService((AbstractStatisticsCollector)AbstractFederation.this.statisticsCollector.get());
                }
                this.startReportTask();
                this.startHttpdService();
                AbstractFederation.this.didStart();
            }
            catch (IllegalStateException ex) {
                if (!AbstractFederation.this.isOpen()) {
                    this.log.warn("Shutdown: deferred tasks will not start.");
                    return;
                }
                throw ex;
            }
        }

        protected void startQueueStatisticsCollection() {
            if (!AbstractFederation.this.getCollectQueueStatistics()) {
                if (this.log.isInfoEnabled()) {
                    this.log.info("Queue statistics collection disabled: " + AbstractFederation.this.getServiceIface());
                }
                return;
            }
            long initialDelay = 0L;
            long delay = 1000L;
            TimeUnit unit = TimeUnit.MILLISECONDS;
            String relpath = "Thread Pool";
            ThreadPoolExecutorStatisticsTask threadPoolExecutorStatisticsTask = new ThreadPoolExecutorStatisticsTask("Thread Pool", AbstractFederation.this.threadPool, AbstractFederation.this.taskCounters);
            AbstractFederation.this.getServiceCounterSet().makePath("Thread Pool").attach(threadPoolExecutorStatisticsTask.getCounters());
            AbstractFederation.this.addScheduledTask(threadPoolExecutorStatisticsTask, 0L, 1000L, unit);
        }

        protected void startPlatformStatisticsCollection() {
            UUID serviceUUID = AbstractFederation.this.getServiceUUID();
            Properties p = ((AbstractClient)AbstractFederation.this.getClient()).getProperties();
            if (!AbstractFederation.this.getCollectPlatformStatistics()) {
                return;
            }
            p.setProperty(AbstractStatisticsCollector.Options.PROCESS_NAME, "service/" + AbstractFederation.this.getServiceIface().getName() + "/" + serviceUUID.toString());
            AbstractStatisticsCollector tmp = AbstractStatisticsCollector.newInstance(p);
            tmp.start();
            AbstractFederation.this.statisticsCollector.set(tmp);
            if (this.log.isInfoEnabled()) {
                this.log.info("Collecting platform statistics: uuid=" + serviceUUID);
            }
        }

        protected void startGangliaService(AbstractStatisticsCollector statisticsCollector) {
            if (statisticsCollector == null) {
                return;
            }
            try {
                Properties properties = ((AbstractClient)AbstractFederation.this.getClient()).getProperties();
                String hostName = AbstractStatisticsCollector.fullyQualifiedHostName;
                String serviceName = statisticsCollector.getProcessName();
                InetAddress listenGroup = InetAddress.getByName(properties.getProperty(IBigdataClient.Options.GANGLIA_LISTEN_GROUP, "239.2.11.71"));
                int listenPort = Integer.valueOf(properties.getProperty(IBigdataClient.Options.GANGLIA_LISTEN_PORT, IBigdataClient.Options.DEFAULT_GANGLIA_LISTEN_PORT));
                boolean listen = Boolean.valueOf(properties.getProperty(IBigdataClient.Options.GANGLIA_LISTEN, "true"));
                boolean report = Boolean.valueOf(properties.getProperty(IBigdataClient.Options.GANGLIA_REPORT, "true"));
                InetSocketAddress[] metricsServers = GangliaUtil.parse(properties.getProperty(IBigdataClient.Options.GANGLIA_SERVERS, "239.2.11.71"), listenGroup.getHostName(), listenPort);
                int quietPeriod = 600;
                int initialDelay = 20;
                boolean heartbeatInterval = false;
                int monitoringInterval = (int)TimeUnit.MILLISECONDS.toSeconds(Long.parseLong(properties.getProperty(IBigdataClient.Options.REPORT_DELAY, "60000")));
                String defaultUnits = "";
                GangliaSlopeEnum defaultSlope = IGangliaDefaults.DEFAULT_SLOPE;
                int defaultTMax = 180;
                int defaultDMax = 3600;
                GangliaMetadataFactory metadataFactory = new GangliaMetadataFactory(new DefaultMetadataFactory("", defaultSlope, 180, 3600));
                metadataFactory.add(new BigdataMetadataFactory(hostName, serviceName, defaultSlope, 180, 3600, 0));
                BigdataGangliaService gangliaService = new BigdataGangliaService(hostName, serviceName, metricsServers, listenGroup, listenPort, listen, report, false, 600, 20, 0, monitoringInterval, 3600, metadataFactory);
                gangliaService.addMetricCollector(new HostMetricsCollector(statisticsCollector));
                gangliaService.addMetricCollector(new QueryEngineMetricsCollector(AbstractFederation.this, statisticsCollector));
                FutureTask<Void> ft = new FutureTask<Void>(gangliaService, null);
                AbstractFederation.this.gangliaFuture.set(ft);
                AbstractFederation.this.gangliaService.set(gangliaService);
                AbstractFederation.this.getExecutorService().submit(ft);
            }
            catch (RejectedExecutionException t) {
            }
            catch (Throwable t) {
                this.log.error(t, t);
            }
        }

        protected void startReportTask() {
            Properties p = ((AbstractClient)AbstractFederation.this.getClient()).getProperties();
            long delay = Long.parseLong(p.getProperty(IBigdataClient.Options.REPORT_DELAY, "60000"));
            if (this.log.isInfoEnabled()) {
                this.log.info(IBigdataClient.Options.REPORT_DELAY + "=" + delay);
            }
            if (delay > 0L) {
                TimeUnit unit = TimeUnit.MILLISECONDS;
                long initialDelay = delay;
                AbstractFederation.this.addScheduledTask(new ReportTask(AbstractFederation.this), initialDelay, delay, unit);
                if (this.log.isInfoEnabled()) {
                    this.log.info("Started ReportTask.");
                }
            }
        }

        protected void startHttpdService() throws UnsupportedEncodingException {
            AbstractHTTPD httpd;
            String path = AbstractFederation.this.getServiceCounterPathPrefix();
            int httpdPort = AbstractFederation.this.getHttpdPort();
            if (httpdPort == -1) {
                if (this.log.isInfoEnabled()) {
                    this.log.info("httpd disabled: " + path);
                }
                return;
            }
            try {
                httpd = AbstractFederation.this.newHttpd(httpdPort, AbstractFederation.this);
            }
            catch (IOException e) {
                this.log.error("Could not start httpd: port=" + httpdPort + ", path=" + path, e);
                return;
            }
            if (httpd != null) {
                AbstractFederation.this.httpd.set(httpd);
                String s = "http://" + AbstractStatisticsCollector.fullyQualifiedHostName + ":" + httpd.getPort() + "/?path=" + URLEncoder.encode(path, "UTF-8");
                AbstractFederation.this.httpdURL.set(s);
                if (this.log.isInfoEnabled()) {
                    this.log.info("start:\n" + s);
                }
            }
        }
    }
}

