001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.mapreduce;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.net.InetSocketAddress;
024import java.security.PrivilegedExceptionAction;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.ServiceConfigurationError;
028import java.util.ServiceLoader;
029
030import com.google.common.annotations.VisibleForTesting;
031import org.apache.commons.logging.Log;
032import org.apache.commons.logging.LogFactory;
033import org.apache.hadoop.classification.InterfaceAudience;
034import org.apache.hadoop.classification.InterfaceStability;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.io.Text;
039import org.apache.hadoop.mapred.JobConf;
040import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
041import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
042import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
043import org.apache.hadoop.mapreduce.util.ConfigUtil;
044import org.apache.hadoop.mapreduce.v2.LogParams;
045import org.apache.hadoop.security.UserGroupInformation;
046import org.apache.hadoop.security.token.SecretManager.InvalidToken;
047import org.apache.hadoop.security.token.Token;
048
049/**
050 * Provides a way to access information about the map/reduce cluster.
051 */
052@InterfaceAudience.Public
053@InterfaceStability.Evolving
054public class Cluster {
055  
056  @InterfaceStability.Evolving
057  public static enum JobTrackerStatus {INITIALIZING, RUNNING};
058
059  private ClientProtocolProvider clientProtocolProvider;
060  private ClientProtocol client;
061  private UserGroupInformation ugi;
062  private Configuration conf;
063  private FileSystem fs = null;
064  private Path sysDir = null;
065  private Path stagingAreaDir = null;
066  private Path jobHistoryDir = null;
067  private static final Log LOG = LogFactory.getLog(Cluster.class);
068
069  @VisibleForTesting
070  static Iterable<ClientProtocolProvider> frameworkLoader =
071      ServiceLoader.load(ClientProtocolProvider.class);
072  private volatile List<ClientProtocolProvider> providerList = null;
073
074  private void initProviderList() {
075    if (providerList == null) {
076      synchronized (frameworkLoader) {
077        if (providerList == null) {
078          List<ClientProtocolProvider> localProviderList =
079              new ArrayList<ClientProtocolProvider>();
080          try {
081            for (ClientProtocolProvider provider : frameworkLoader) {
082              localProviderList.add(provider);
083            }
084          } catch(ServiceConfigurationError e) {
085            LOG.info("Failed to instantiate ClientProtocolProvider, please "
086                         + "check the /META-INF/services/org.apache."
087                         + "hadoop.mapreduce.protocol.ClientProtocolProvider "
088                         + "files on the classpath", e);
089          }
090          providerList = localProviderList;
091        }
092      }
093    }
094  }
095
096  static {
097    ConfigUtil.loadResources();
098  }
099  
100  public Cluster(Configuration conf) throws IOException {
101    this(null, conf);
102  }
103
104  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
105      throws IOException {
106    this.conf = conf;
107    this.ugi = UserGroupInformation.getCurrentUser();
108    initialize(jobTrackAddr, conf);
109  }
110  
111  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
112      throws IOException {
113
114    initProviderList();
115    for (ClientProtocolProvider provider : providerList) {
116      LOG.debug("Trying ClientProtocolProvider : "
117          + provider.getClass().getName());
118      ClientProtocol clientProtocol = null;
119      try {
120        if (jobTrackAddr == null) {
121          clientProtocol = provider.create(conf);
122        } else {
123          clientProtocol = provider.create(jobTrackAddr, conf);
124        }
125
126        if (clientProtocol != null) {
127          clientProtocolProvider = provider;
128          client = clientProtocol;
129          LOG.debug("Picked " + provider.getClass().getName()
130              + " as the ClientProtocolProvider");
131          break;
132        } else {
133          LOG.debug("Cannot pick " + provider.getClass().getName()
134              + " as the ClientProtocolProvider - returned null protocol");
135        }
136      } catch (Exception e) {
137        LOG.info("Failed to use " + provider.getClass().getName()
138            + " due to error: ", e);
139      }
140    }
141
142    if (null == clientProtocolProvider || null == client) {
143      throw new IOException(
144          "Cannot initialize Cluster. Please check your configuration for "
145              + MRConfig.FRAMEWORK_NAME
146              + " and the correspond server addresses.");
147    }
148  }
149
150  ClientProtocol getClient() {
151    return client;
152  }
153  
154  Configuration getConf() {
155    return conf;
156  }
157  
158  /**
159   * Close the <code>Cluster</code>.
160   */
161  public synchronized void close() throws IOException {
162    clientProtocolProvider.close(client);
163  }
164
165  private Job[] getJobs(JobStatus[] stats) throws IOException {
166    List<Job> jobs = new ArrayList<Job>();
167    for (JobStatus stat : stats) {
168      jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
169    }
170    return jobs.toArray(new Job[0]);
171  }
172
173  /**
174   * Get the file system where job-specific files are stored
175   * 
176   * @return object of FileSystem
177   * @throws IOException
178   * @throws InterruptedException
179   */
180  public synchronized FileSystem getFileSystem() 
181      throws IOException, InterruptedException {
182    if (this.fs == null) {
183      try {
184        this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
185          public FileSystem run() throws IOException, InterruptedException {
186            final Path sysDir = new Path(client.getSystemDir());
187            return sysDir.getFileSystem(getConf());
188          }
189        });
190      } catch (InterruptedException e) {
191        throw new RuntimeException(e);
192      }
193    }
194    return fs;
195  }
196
197  /**
198   * Get job corresponding to jobid.
199   * 
200   * @param jobId
201   * @return object of {@link Job}
202   * @throws IOException
203   * @throws InterruptedException
204   */
205  public Job getJob(JobID jobId) throws IOException, InterruptedException {
206    JobStatus status = client.getJobStatus(jobId);
207    if (status != null) {
208      JobConf conf;
209      try {
210        conf = new JobConf(status.getJobFile());
211      } catch (RuntimeException ex) {
212        // If job file doesn't exist it means we can't find the job
213        if (ex.getCause() instanceof FileNotFoundException) {
214          return null;
215        } else {
216          throw ex;
217        }
218      }
219      return Job.getInstance(this, status, conf);
220    }
221    return null;
222  }
223  
224  /**
225   * Get all the queues in cluster.
226   * 
227   * @return array of {@link QueueInfo}
228   * @throws IOException
229   * @throws InterruptedException
230   */
231  public QueueInfo[] getQueues() throws IOException, InterruptedException {
232    return client.getQueues();
233  }
234  
235  /**
236   * Get queue information for the specified name.
237   * 
238   * @param name queuename
239   * @return object of {@link QueueInfo}
240   * @throws IOException
241   * @throws InterruptedException
242   */
243  public QueueInfo getQueue(String name) 
244      throws IOException, InterruptedException {
245    return client.getQueue(name);
246  }
247
248  /**
249   * Get log parameters for the specified jobID or taskAttemptID
250   * @param jobID the job id.
251   * @param taskAttemptID the task attempt id. Optional.
252   * @return the LogParams
253   * @throws IOException
254   * @throws InterruptedException
255   */
256  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
257      throws IOException, InterruptedException {
258    return client.getLogFileParams(jobID, taskAttemptID);
259  }
260
261  /**
262   * Get current cluster status.
263   * 
264   * @return object of {@link ClusterMetrics}
265   * @throws IOException
266   * @throws InterruptedException
267   */
268  public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
269    return client.getClusterMetrics();
270  }
271  
272  /**
273   * Get all active trackers in the cluster.
274   * 
275   * @return array of {@link TaskTrackerInfo}
276   * @throws IOException
277   * @throws InterruptedException
278   */
279  public TaskTrackerInfo[] getActiveTaskTrackers() 
280      throws IOException, InterruptedException  {
281    return client.getActiveTrackers();
282  }
283  
284  /**
285   * Get blacklisted trackers.
286   * 
287   * @return array of {@link TaskTrackerInfo}
288   * @throws IOException
289   * @throws InterruptedException
290   */
291  public TaskTrackerInfo[] getBlackListedTaskTrackers() 
292      throws IOException, InterruptedException  {
293    return client.getBlacklistedTrackers();
294  }
295  
296  /**
297   * Get all the jobs in cluster.
298   * 
299   * @return array of {@link Job}
300   * @throws IOException
301   * @throws InterruptedException
302   * @deprecated Use {@link #getAllJobStatuses()} instead.
303   */
304  @Deprecated
305  public Job[] getAllJobs() throws IOException, InterruptedException {
306    return getJobs(client.getAllJobs());
307  }
308
309  /**
310   * Get job status for all jobs in the cluster.
311   * @return job status for all jobs in cluster
312   * @throws IOException
313   * @throws InterruptedException
314   */
315  public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
316    return client.getAllJobs();
317  }
318
319  /**
320   * Grab the jobtracker system directory path where 
321   * job-specific files will  be placed.
322   * 
323   * @return the system directory where job-specific files are to be placed.
324   */
325  public Path getSystemDir() throws IOException, InterruptedException {
326    if (sysDir == null) {
327      sysDir = new Path(client.getSystemDir());
328    }
329    return sysDir;
330  }
331  
332  /**
333   * Grab the jobtracker's view of the staging directory path where 
334   * job-specific files will  be placed.
335   * 
336   * @return the staging directory where job-specific files are to be placed.
337   */
338  public Path getStagingAreaDir() throws IOException, InterruptedException {
339    if (stagingAreaDir == null) {
340      stagingAreaDir = new Path(client.getStagingAreaDir());
341    }
342    return stagingAreaDir;
343  }
344
345  /**
346   * Get the job history file path for a given job id. The job history file at 
347   * this path may or may not be existing depending on the job completion state.
348   * The file is present only for the completed jobs.
349   * @param jobId the JobID of the job submitted by the current user.
350   * @return the file path of the job history file
351   * @throws IOException
352   * @throws InterruptedException
353   */
354  public String getJobHistoryUrl(JobID jobId) throws IOException, 
355    InterruptedException {
356    if (jobHistoryDir == null) {
357      jobHistoryDir = new Path(client.getJobHistoryDir());
358    }
359    return new Path(jobHistoryDir, jobId.toString() + "_"
360                    + ugi.getShortUserName()).toString();
361  }
362
363  /**
364   * Gets the Queue ACLs for current user
365   * @return array of QueueAclsInfo object for current user.
366   * @throws IOException
367   */
368  public QueueAclsInfo[] getQueueAclsForCurrentUser() 
369      throws IOException, InterruptedException  {
370    return client.getQueueAclsForCurrentUser();
371  }
372
373  /**
374   * Gets the root level queues.
375   * @return array of JobQueueInfo object.
376   * @throws IOException
377   */
378  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
379    return client.getRootQueues();
380  }
381  
382  /**
383   * Returns immediate children of queueName.
384   * @param queueName
385   * @return array of JobQueueInfo which are children of queueName
386   * @throws IOException
387   */
388  public QueueInfo[] getChildQueues(String queueName) 
389      throws IOException, InterruptedException {
390    return client.getChildQueues(queueName);
391  }
392  
393  /**
394   * Get the JobTracker's status.
395   * 
396   * @return {@link JobTrackerStatus} of the JobTracker
397   * @throws IOException
398   * @throws InterruptedException
399   */
400  public JobTrackerStatus getJobTrackerStatus() throws IOException,
401      InterruptedException {
402    return client.getJobTrackerStatus();
403  }
404  
405  /**
406   * Get the tasktracker expiry interval for the cluster
407   * @return the expiry interval in msec
408   */
409  public long getTaskTrackerExpiryInterval() throws IOException,
410      InterruptedException {
411    return client.getTaskTrackerExpiryInterval();
412  }
413
414  /**
415   * Get a delegation token for the user from the JobTracker.
416   * @param renewer the user who can renew the token
417   * @return the new token
418   * @throws IOException
419   */
420  public Token<DelegationTokenIdentifier> 
421      getDelegationToken(Text renewer) throws IOException, InterruptedException{
422    // client has already set the service
423    return client.getDelegationToken(renewer);
424  }
425
426  /**
427   * Renew a delegation token
428   * @param token the token to renew
429   * @return the new expiration time
430   * @throws InvalidToken
431   * @throws IOException
432   * @deprecated Use {@link Token#renew} instead
433   */
434  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
435                                   ) throws InvalidToken, IOException,
436                                            InterruptedException {
437    return token.renew(getConf());
438  }
439
440  /**
441   * Cancel a delegation token from the JobTracker
442   * @param token the token to cancel
443   * @throws IOException
444   * @deprecated Use {@link Token#cancel} instead
445   */
446  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
447                                    ) throws IOException,
448                                             InterruptedException {
449    token.cancel(getConf());
450  }
451
452}