001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.lib.service.scheduler;
020
021import org.apache.hadoop.classification.InterfaceAudience;
022import org.apache.hadoop.lib.lang.RunnableCallable;
023import org.apache.hadoop.lib.server.BaseService;
024import org.apache.hadoop.lib.server.Server;
025import org.apache.hadoop.lib.server.ServiceException;
026import org.apache.hadoop.lib.service.Instrumentation;
027import org.apache.hadoop.lib.service.Scheduler;
028import org.apache.hadoop.lib.util.Check;
029import org.apache.hadoop.util.Time;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import java.text.MessageFormat;
034import java.util.concurrent.Callable;
035import java.util.concurrent.ScheduledExecutorService;
036import java.util.concurrent.ScheduledThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038
039@InterfaceAudience.Private
040public class SchedulerService extends BaseService implements Scheduler {
041  private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class);
042
043  private static final String INST_GROUP = "scheduler";
044
045  public static final String PREFIX = "scheduler";
046
047  public static final String CONF_THREADS = "threads";
048
049  private ScheduledExecutorService scheduler;
050
051  public SchedulerService() {
052    super(PREFIX);
053  }
054
055  @Override
056  public void init() throws ServiceException {
057    int threads = getServiceConfig().getInt(CONF_THREADS, 5);
058    scheduler = new ScheduledThreadPoolExecutor(threads);
059    LOG.debug("Scheduler started");
060  }
061
062  @Override
063  public void destroy() {
064    try {
065      long limit = Time.now() + 30 * 1000;
066      scheduler.shutdownNow();
067      while (!scheduler.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
068        LOG.debug("Waiting for scheduler to shutdown");
069        if (Time.now() > limit) {
070          LOG.warn("Gave up waiting for scheduler to shutdown");
071          break;
072        }
073      }
074      if (scheduler.isTerminated()) {
075        LOG.debug("Scheduler shutdown");
076      }
077    } catch (InterruptedException ex) {
078      LOG.warn(ex.getMessage(), ex);
079    }
080  }
081
082  @Override
083  public Class[] getServiceDependencies() {
084    return new Class[]{Instrumentation.class};
085  }
086
087  @Override
088  public Class getInterface() {
089    return Scheduler.class;
090  }
091
092  @Override
093  public void schedule(final Callable<?> callable, long delay, long interval, TimeUnit unit) {
094    Check.notNull(callable, "callable");
095    if (!scheduler.isShutdown()) {
096      LOG.debug("Scheduling callable [{}], interval [{}] seconds, delay [{}] in [{}]",
097                new Object[]{callable, delay, interval, unit});
098      Runnable r = new Runnable() {
099        @Override
100        public void run() {
101          String instrName = callable.getClass().getSimpleName();
102          Instrumentation instr = getServer().get(Instrumentation.class);
103          if (getServer().getStatus() == Server.Status.HALTED) {
104            LOG.debug("Skipping [{}], server status [{}]", callable, getServer().getStatus());
105            instr.incr(INST_GROUP, instrName + ".skips", 1);
106          } else {
107            LOG.debug("Executing [{}]", callable);
108            instr.incr(INST_GROUP, instrName + ".execs", 1);
109            Instrumentation.Cron cron = instr.createCron().start();
110            try {
111              callable.call();
112            } catch (Exception ex) {
113              instr.incr(INST_GROUP, instrName + ".fails", 1);
114              LOG.error("Error executing [{}], {}", new Object[]{callable, ex.getMessage(), ex});
115            } finally {
116              instr.addCron(INST_GROUP, instrName, cron.stop());
117            }
118          }
119        }
120      };
121      scheduler.scheduleWithFixedDelay(r, delay, interval, unit);
122    } else {
123      throw new IllegalStateException(
124        MessageFormat.format("Scheduler shutting down, ignoring scheduling of [{}]", callable));
125    }
126  }
127
128  @Override
129  public void schedule(Runnable runnable, long delay, long interval, TimeUnit unit) {
130    schedule((Callable<?>) new RunnableCallable(runnable), delay, interval, unit);
131  }
132
133}