001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.yarn.event; 020 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.BlockingQueue; 026import java.util.concurrent.LinkedBlockingQueue; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.apache.hadoop.classification.InterfaceAudience.Public; 031import org.apache.hadoop.classification.InterfaceStability.Evolving; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.service.AbstractService; 034import org.apache.hadoop.util.ShutdownHookManager; 035import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 036 037import com.google.common.annotations.VisibleForTesting; 038 039/** 040 * Dispatches {@link Event}s in a separate thread. Currently only single thread 041 * does that. Potentially there could be multiple channels for each event type 042 * class and a thread pool can be used to dispatch the events. 043 */ 044@SuppressWarnings("rawtypes") 045@Public 046@Evolving 047public class AsyncDispatcher extends AbstractService implements Dispatcher { 048 049 private static final Log LOG = LogFactory.getLog(AsyncDispatcher.class); 050 051 private final BlockingQueue<Event> eventQueue; 052 private volatile int lastEventQueueSizeLogged = 0; 053 private volatile boolean stopped = false; 054 055 // Configuration flag for enabling/disabling draining dispatcher's events on 056 // stop functionality. 057 private volatile boolean drainEventsOnStop = false; 058 059 // Indicates all the remaining dispatcher's events on stop have been drained 060 // and processed. 061 private volatile boolean drained = true; 062 private Object waitForDrained = new Object(); 063 064 // For drainEventsOnStop enabled only, block newly coming events into the 065 // queue while stopping. 066 private volatile boolean blockNewEvents = false; 067 private final EventHandler handlerInstance = new GenericEventHandler(); 068 069 private Thread eventHandlingThread; 070 protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers; 071 private boolean exitOnDispatchException; 072 073 public AsyncDispatcher() { 074 this(new LinkedBlockingQueue<Event>()); 075 } 076 077 public AsyncDispatcher(BlockingQueue<Event> eventQueue) { 078 super("Dispatcher"); 079 this.eventQueue = eventQueue; 080 this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>(); 081 } 082 083 Runnable createThread() { 084 return new Runnable() { 085 @Override 086 public void run() { 087 while (!stopped && !Thread.currentThread().isInterrupted()) { 088 drained = eventQueue.isEmpty(); 089 // blockNewEvents is only set when dispatcher is draining to stop, 090 // adding this check is to avoid the overhead of acquiring the lock 091 // and calling notify every time in the normal run of the loop. 092 if (blockNewEvents) { 093 synchronized (waitForDrained) { 094 if (drained) { 095 waitForDrained.notify(); 096 } 097 } 098 } 099 Event event; 100 try { 101 event = eventQueue.take(); 102 } catch(InterruptedException ie) { 103 if (!stopped) { 104 LOG.warn("AsyncDispatcher thread interrupted", ie); 105 } 106 return; 107 } 108 if (event != null) { 109 dispatch(event); 110 } 111 } 112 } 113 }; 114 } 115 116 @Override 117 protected void serviceInit(Configuration conf) throws Exception { 118 this.exitOnDispatchException = 119 conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, 120 Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); 121 super.serviceInit(conf); 122 } 123 124 @VisibleForTesting 125 public void setExitOnDispatchException(boolean exitOnDispatchException) { 126 this.exitOnDispatchException = exitOnDispatchException; 127 } 128 129 @Override 130 protected void serviceStart() throws Exception { 131 //start all the components 132 super.serviceStart(); 133 eventHandlingThread = new Thread(createThread()); 134 eventHandlingThread.setName("AsyncDispatcher event handler"); 135 eventHandlingThread.start(); 136 } 137 138 public void setDrainEventsOnStop() { 139 drainEventsOnStop = true; 140 } 141 142 @Override 143 protected void serviceStop() throws Exception { 144 if (drainEventsOnStop) { 145 blockNewEvents = true; 146 LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); 147 synchronized (waitForDrained) { 148 while (!drained && eventHandlingThread.isAlive()) { 149 waitForDrained.wait(1000); 150 LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + 151 eventHandlingThread.getState()); 152 } 153 } 154 } 155 stopped = true; 156 if (eventHandlingThread != null) { 157 eventHandlingThread.interrupt(); 158 try { 159 eventHandlingThread.join(); 160 } catch (InterruptedException ie) { 161 LOG.warn("Interrupted Exception while stopping", ie); 162 } 163 } 164 165 // stop all the components 166 super.serviceStop(); 167 } 168 169 @SuppressWarnings("unchecked") 170 protected void dispatch(Event event) { 171 //all events go thru this loop 172 if (LOG.isDebugEnabled()) { 173 LOG.debug("Dispatching the event " + event.getClass().getName() + "." 174 + event.toString()); 175 } 176 177 Class<? extends Enum> type = event.getType().getDeclaringClass(); 178 179 try{ 180 EventHandler handler = eventDispatchers.get(type); 181 if(handler != null) { 182 handler.handle(event); 183 } else { 184 throw new Exception("No handler for registered for " + type); 185 } 186 } catch (Throwable t) { 187 //TODO Maybe log the state of the queue 188 LOG.fatal("Error in dispatcher thread", t); 189 // If serviceStop is called, we should exit this thread gracefully. 190 if (exitOnDispatchException 191 && (ShutdownHookManager.get().isShutdownInProgress()) == false 192 && stopped == false) { 193 Thread shutDownThread = new Thread(createShutDownThread()); 194 shutDownThread.setName("AsyncDispatcher ShutDown handler"); 195 shutDownThread.start(); 196 } 197 } 198 } 199 200 @SuppressWarnings("unchecked") 201 @Override 202 public void register(Class<? extends Enum> eventType, 203 EventHandler handler) { 204 /* check to see if we have a listener registered */ 205 EventHandler<Event> registeredHandler = (EventHandler<Event>) 206 eventDispatchers.get(eventType); 207 LOG.info("Registering " + eventType + " for " + handler.getClass()); 208 if (registeredHandler == null) { 209 eventDispatchers.put(eventType, handler); 210 } else if (!(registeredHandler instanceof MultiListenerHandler)){ 211 /* for multiple listeners of an event add the multiple listener handler */ 212 MultiListenerHandler multiHandler = new MultiListenerHandler(); 213 multiHandler.addHandler(registeredHandler); 214 multiHandler.addHandler(handler); 215 eventDispatchers.put(eventType, multiHandler); 216 } else { 217 /* already a multilistener, just add to it */ 218 MultiListenerHandler multiHandler 219 = (MultiListenerHandler) registeredHandler; 220 multiHandler.addHandler(handler); 221 } 222 } 223 224 @Override 225 public EventHandler getEventHandler() { 226 return handlerInstance; 227 } 228 229 class GenericEventHandler implements EventHandler<Event> { 230 public void handle(Event event) { 231 if (blockNewEvents) { 232 return; 233 } 234 drained = false; 235 236 /* all this method does is enqueue all the events onto the queue */ 237 int qSize = eventQueue.size(); 238 if (qSize != 0 && qSize % 1000 == 0 239 && lastEventQueueSizeLogged != qSize) { 240 lastEventQueueSizeLogged = qSize; 241 LOG.info("Size of event-queue is " + qSize); 242 } 243 int remCapacity = eventQueue.remainingCapacity(); 244 if (remCapacity < 1000) { 245 LOG.warn("Very low remaining capacity in the event-queue: " 246 + remCapacity); 247 } 248 try { 249 eventQueue.put(event); 250 } catch (InterruptedException e) { 251 if (!stopped) { 252 LOG.warn("AsyncDispatcher thread interrupted", e); 253 } 254 // Need to reset drained flag to true if event queue is empty, 255 // otherwise dispatcher will hang on stop. 256 drained = eventQueue.isEmpty(); 257 throw new YarnRuntimeException(e); 258 } 259 }; 260 } 261 262 /** 263 * Multiplexing an event. Sending it to different handlers that 264 * are interested in the event. 265 * @param <T> the type of event these multiple handlers are interested in. 266 */ 267 static class MultiListenerHandler implements EventHandler<Event> { 268 List<EventHandler<Event>> listofHandlers; 269 270 public MultiListenerHandler() { 271 listofHandlers = new ArrayList<EventHandler<Event>>(); 272 } 273 274 @Override 275 public void handle(Event event) { 276 for (EventHandler<Event> handler: listofHandlers) { 277 handler.handle(event); 278 } 279 } 280 281 void addHandler(EventHandler<Event> handler) { 282 listofHandlers.add(handler); 283 } 284 285 } 286 287 Runnable createShutDownThread() { 288 return new Runnable() { 289 @Override 290 public void run() { 291 LOG.info("Exiting, bbye.."); 292 System.exit(-1); 293 } 294 }; 295 } 296 297 @VisibleForTesting 298 protected boolean isEventThreadWaiting() { 299 return eventHandlingThread.getState() == Thread.State.WAITING; 300 } 301 302 @VisibleForTesting 303 protected boolean isDrained() { 304 return this.drained; 305 } 306}