View Javadoc

1   /*
2    * Copyright 2006-2008 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.launch.support;
17  
18  import java.io.BufferedReader;
19  import java.io.IOException;
20  import java.io.InputStreamReader;
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Properties;
29  import java.util.Set;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.springframework.batch.core.BatchStatus;
34  import org.springframework.batch.core.ExitStatus;
35  import org.springframework.batch.core.Job;
36  import org.springframework.batch.core.JobExecution;
37  import org.springframework.batch.core.JobInstance;
38  import org.springframework.batch.core.JobParameter;
39  import org.springframework.batch.core.JobParameters;
40  import org.springframework.batch.core.JobParametersIncrementer;
41  import org.springframework.batch.core.configuration.JobLocator;
42  import org.springframework.batch.core.converter.DefaultJobParametersConverter;
43  import org.springframework.batch.core.converter.JobParametersConverter;
44  import org.springframework.batch.core.explore.JobExplorer;
45  import org.springframework.batch.core.launch.JobExecutionNotFailedException;
46  import org.springframework.batch.core.launch.JobExecutionNotRunningException;
47  import org.springframework.batch.core.launch.JobExecutionNotStoppedException;
48  import org.springframework.batch.core.launch.JobLauncher;
49  import org.springframework.batch.core.launch.JobParametersNotFoundException;
50  import org.springframework.batch.core.repository.JobRepository;
51  import org.springframework.beans.factory.BeanDefinitionStoreException;
52  import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
53  import org.springframework.context.ConfigurableApplicationContext;
54  import org.springframework.context.support.ClassPathXmlApplicationContext;
55  import org.springframework.util.Assert;
56  import org.springframework.util.StringUtils;
57  
58  /**
59   * <p>
60   * Basic launcher for starting jobs from the command line. In general, it is
61   * assumed that this launcher will primarily be used to start a job via a script
62   * from an Enterprise Scheduler. Therefore, exit codes are mapped to integers so
63   * that schedulers can use the returned values to determine the next course of
64   * action. The returned values can also be useful to operations teams in
65   * determining what should happen upon failure. For example, a returned code of
66   * 5 might mean that some resource wasn't available and the job should be
67   * restarted. However, a code of 10 might mean that something critical has
68   * happened and the issue should be escalated.
69   * </p>
70   * 
71   * <p>
72   * With any launch of a batch job within Spring Batch, a Spring context
73   * containing the {@link Job} and some execution context has to be created. This
74   * command line launcher can be used to load the job and its context from a
75   * single location. All dependencies of the launcher will then be satisfied by
76   * autowiring by type from the combined application context. Default values are
77   * provided for all fields except the {@link JobLauncher} and {@link JobLocator}
78   * . Therefore, if autowiring fails to set it (it should be noted that
79   * dependency checking is disabled because most of the fields have default
80   * values and thus don't require dependencies to be fulfilled via autowiring)
81   * then an exception will be thrown. It should also be noted that even if an
82   * exception is thrown by this class, it will be mapped to an integer and
83   * returned.
84   * </p>
85   * 
86   * <p>
87   * Notice a property is available to set the {@link SystemExiter}. This class is
88   * used to exit from the main method, rather than calling System.exit()
89   * directly. This is because unit testing a class the calls System.exit() is
90   * impossible without kicking off the test within a new JVM, which it is
91   * possible to do, however it is a complex solution, much more so than
92   * strategizing the exiter.
93   * </p>
94   * 
95   * <p>
96   * The arguments to this class can be provided on the command line (separated by
97   * spaces), or through stdin (separated by new line). They are as follows:
98   * </p>
99   * 
100  * <code>
101  * jobPath <options> jobIdentifier (jobParameters)*
102  * </code>
103  * 
104  * <p>
105  * The command line options are as follows
106  * <ul>
107  * <li>jobPath: the xml application context containing a {@link Job}
108  * <li>-restart: (optional) to restart the last failed execution</li>
109  * <li>-stop: (optional) to stop a running execution</li>
110  * <li>-abandon: (optional) to abandon a stopped execution</li>
111  * <li>-next: (optional) to start the next in a sequence according to the
112  * {@link JobParametersIncrementer} in the {@link Job}</li>
113  * <li>jobIdentifier: the name of the job or the id of a job execution (for
114  * -stop, -abandon or -restart).
115  * <li>jobParameters: 0 to many parameters that will be used to launch a job
116  * specified in the form of <code>key=value</code> pairs.
117  * </ul>
118  * </p>
119  * 
120  * <p>
121  * If the <code>-next</code> option is used the parameters on the command line
122  * (if any) are appended to those retrieved from the incrementer, overriding any
123  * with the same key.
124  * </p>
125  * 
126  * <p>
127  * The combined application context must contain only one instance of
128  * {@link JobLauncher}. The job parameters passed in to the command line will be
129  * converted to {@link Properties} by assuming that each individual element is
130  * one parameter that is separated by an equals sign. For example,
131  * "vendor.id=290232". The resulting properties instance is converted to
132  * {@link JobParameters} using a {@link JobParametersConverter} from the
133  * application context (if there is one, or a
134  * {@link DefaultJobParametersConverter} otherwise). Below is an example
135  * arguments list: "
136  * 
137  * <p>
138  * <code>
139  * java org.springframework.batch.core.launch.support.CommandLineJobRunner testJob.xml 
140  * testJob schedule.date=2008/01/24 vendor.id=3902483920 
141  * <code>
142  * </p>
143  * 
144  * <p>
145  * Once arguments have been successfully parsed, autowiring will be used to set
146  * various dependencies. The {@JobLauncher} for example, will be
147  * loaded this way. If none is contained in the bean factory (it searches by
148  * type) then a {@link BeanDefinitionStoreException} will be thrown. The same
149  * exception will also be thrown if there is more than one present. Assuming the
150  * JobLauncher has been set correctly, the jobIdentifier argument will be used
151  * to obtain an actual {@link Job}. If a {@link JobLocator} has been set, then
152  * it will be used, if not the beanFactory will be asked, using the
153  * jobIdentifier as the bean id.
154  * </p>
155  * 
156  * @author Dave Syer
157  * @author Lucas Ward
158  * @since 1.0
159  */
160 public class CommandLineJobRunner {
161 
162 	protected static final Log logger = LogFactory.getLog(CommandLineJobRunner.class);
163 
164 	private ExitCodeMapper exitCodeMapper = new SimpleJvmExitCodeMapper();
165 
166 	private JobLauncher launcher;
167 
168 	private JobLocator jobLocator;
169 
170 	// Package private for unit test
171 	private static SystemExiter systemExiter = new JvmSystemExiter();
172 
173 	private static String message = "";
174 
175 	private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter();
176 
177 	private JobExplorer jobExplorer;
178 
179 	private JobRepository jobRepository;
180 
181 	/**
182 	 * Injection setter for the {@link JobLauncher}.
183 	 * 
184 	 * @param launcher the launcher to set
185 	 */
186 	public void setLauncher(JobLauncher launcher) {
187 		this.launcher = launcher;
188 	}
189 
190 	/**
191 	 * @param jobRepository the jobRepository to set
192 	 */
193 	public void setJobRepository(JobRepository jobRepository) {
194 		this.jobRepository = jobRepository;
195 	}
196 
197 	/**
198 	 * Injection setter for {@link JobExplorer}.
199 	 * 
200 	 * @param jobExplorer the {@link JobExplorer} to set
201 	 */
202 	public void setJobExplorer(JobExplorer jobExplorer) {
203 		this.jobExplorer = jobExplorer;
204 	}
205 
206 	/**
207 	 * Injection setter for the {@link ExitCodeMapper}.
208 	 * 
209 	 * @param exitCodeMapper the exitCodeMapper to set
210 	 */
211 	public void setExitCodeMapper(ExitCodeMapper exitCodeMapper) {
212 		this.exitCodeMapper = exitCodeMapper;
213 	}
214 
215 	/**
216 	 * Static setter for the {@link SystemExiter} so it can be adjusted before
217 	 * dependency injection. Typically overridden by
218 	 * {@link #setSystemExiter(SystemExiter)}.
219 	 * 
220 	 * @param systemExitor
221 	 */
222 	public static void presetSystemExiter(SystemExiter systemExiter) {
223 		CommandLineJobRunner.systemExiter = systemExiter;
224 	}
225 
226 	/**
227 	 * Retrieve the error message set by an instance of
228 	 * {@link CommandLineJobRunner} as it exits. Empty if the last job launched
229 	 * was successful.
230 	 * 
231 	 * @return the error message
232 	 */
233 	public static String getErrorMessage() {
234 		return message;
235 	}
236 
237 	/**
238 	 * Injection setter for the {@link SystemExiter}.
239 	 * 
240 	 * @param systemExitor
241 	 */
242 	public void setSystemExiter(SystemExiter systemExiter) {
243 		CommandLineJobRunner.systemExiter = systemExiter;
244 	}
245 
246 	/**
247 	 * Injection setter for {@link JobParametersConverter}.
248 	 * 
249 	 * @param jobParametersConverter
250 	 */
251 	public void setJobParametersConverter(JobParametersConverter jobParametersConverter) {
252 		this.jobParametersConverter = jobParametersConverter;
253 	}
254 
255 	/**
256 	 * Delegate to the exiter to (possibly) exit the VM gracefully.
257 	 * 
258 	 * @param status
259 	 */
260 	public void exit(int status) {
261 		systemExiter.exit(status);
262 	}
263 
264 	/**
265 	 * {@link JobLocator} to find a job to run.
266 	 * @param jobLocator a {@link JobLocator}
267 	 */
268 	public void setJobLocator(JobLocator jobLocator) {
269 		this.jobLocator = jobLocator;
270 	}
271 
272 	/*
273 	 * Start a job by obtaining a combined classpath using the job launcher and
274 	 * job paths. If a JobLocator has been set, then use it to obtain an actual
275 	 * job, if not ask the context for it.
276 	 */
277 	int start(String jobPath, String jobIdentifier, String[] parameters, Set<String> opts) {
278 
279 		ConfigurableApplicationContext context = null;
280 
281 		try {
282 			context = new ClassPathXmlApplicationContext(jobPath);
283 			context.getAutowireCapableBeanFactory().autowireBeanProperties(this,
284 					AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, false);
285 
286 			Assert.state(launcher != null, "A JobLauncher must be provided.  Please add one to the configuration.");
287 			if (opts.contains("-restart") || opts.contains("-next")) {
288 				Assert.state(jobExplorer != null,
289 						"A JobExplorer must be provided for a restart or start next operation.  Please add one to the configuration.");
290 			}
291 
292 			String jobName = jobIdentifier;
293 
294 			JobParameters jobParameters = jobParametersConverter.getJobParameters(StringUtils
295 					.splitArrayElementsIntoProperties(parameters, "="));
296 			Assert.isTrue(parameters == null || parameters.length == 0 || !jobParameters.isEmpty(),
297 					"Invalid JobParameters " + Arrays.asList(parameters)
298 							+ ". If parameters are provided they should be in the form name=value (no whitespace).");
299 
300 			if (opts.contains("-stop")) {
301 				List<JobExecution> jobExecutions = getRunningJobExecutions(jobIdentifier);
302 				if (jobExecutions == null) {
303 					throw new JobExecutionNotRunningException("No running execution found for job=" + jobIdentifier);
304 				}
305 				for (JobExecution jobExecution : jobExecutions) {
306 					jobExecution.setStatus(BatchStatus.STOPPING);
307 					jobRepository.update(jobExecution);
308 				}
309 				return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode());
310 			}
311 
312 			if (opts.contains("-abandon")) {
313 				List<JobExecution> jobExecutions = getStoppedJobExecutions(jobIdentifier);
314 				if (jobExecutions == null) {
315 					throw new JobExecutionNotStoppedException("No stopped execution found for job=" + jobIdentifier);
316 				}
317 				for (JobExecution jobExecution : jobExecutions) {
318 					jobExecution.setStatus(BatchStatus.ABANDONED);
319 					jobRepository.update(jobExecution);
320 				}
321 				return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode());
322 			}
323 
324 			if (opts.contains("-restart")) {
325 				JobExecution jobExecution = getLastFailedJobExecution(jobIdentifier);
326 				if (jobExecution == null) {
327 					throw new JobExecutionNotFailedException("No failed or stopped execution found for job="
328 							+ jobIdentifier);
329 				}
330 				jobParameters = jobExecution.getJobInstance().getJobParameters();
331 				jobName = jobExecution.getJobInstance().getJobName();
332 			}
333 
334 			Job job;
335 			if (jobLocator != null) {
336 				job = jobLocator.getJob(jobName);
337 			}
338 			else {
339 				job = (Job) context.getBean(jobName);
340 			}
341 
342 			if (opts.contains("-next")) {
343 				JobParameters nextParameters = getNextJobParameters(job);
344 				Map<String, JobParameter> map = new HashMap<String, JobParameter>(nextParameters.getParameters());
345 				map.putAll(jobParameters.getParameters());
346 				jobParameters = new JobParameters(map);
347 			}
348 
349 			JobExecution jobExecution = launcher.run(job, jobParameters);
350 			return exitCodeMapper.intValue(jobExecution.getExitStatus().getExitCode());
351 
352 		}
353 		catch (Throwable e) {
354 			String message = "Job Terminated in error: " + e.getMessage();
355 			logger.error(message, e);
356 			CommandLineJobRunner.message = message;
357 			return exitCodeMapper.intValue(ExitStatus.FAILED.getExitCode());
358 		}
359 		finally {
360 			if (context != null) {
361 				context.close();
362 			}
363 		}
364 	}
365 
366 	/**
367 	 * @param jobIdentifier a job execution id or job name
368 	 * @param minStatus the highest status to exclude from the result
369 	 * @return
370 	 */
371 	private List<JobExecution> getJobExecutionsWithStatusGreaterThan(String jobIdentifier, BatchStatus minStatus) {
372 
373 		Long executionId = getLongIdentifier(jobIdentifier);
374 		if (executionId != null) {
375 			JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
376 			if (jobExecution.getStatus().isGreaterThan(minStatus)) {
377 				return Arrays.asList(jobExecution);
378 			}
379 			return Collections.emptyList();
380 		}
381 
382 		int start = 0;
383 		int count = 100;
384 		List<JobExecution> executions = new ArrayList<JobExecution>();
385 		List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count);
386 
387 		while (!lastInstances.isEmpty()) {
388 
389 			for (JobInstance jobInstance : lastInstances) {
390 				List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
391 				if (jobExecutions == null || jobExecutions.isEmpty()) {
392 					continue;
393 				}
394 				for (JobExecution jobExecution : jobExecutions) {
395 					if (jobExecution.getStatus().isGreaterThan(minStatus)) {
396 						executions.add(jobExecution);
397 					}
398 				}
399 			}
400 
401 			start += count;
402 			lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count);
403 
404 		}
405 
406 		return executions;
407 
408 	}
409 
410 	private JobExecution getLastFailedJobExecution(String jobIdentifier) {
411 		List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STOPPING);
412 		if (jobExecutions.isEmpty()) {
413 			return null;
414 		}
415 		return jobExecutions.get(0);
416 	}
417 
418 	private List<JobExecution> getStoppedJobExecutions(String jobIdentifier) {
419 		List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STARTED);
420 		if (jobExecutions.isEmpty()) {
421 			return null;
422 		}
423 		List<JobExecution> result = new ArrayList<JobExecution>();
424 		for (JobExecution jobExecution : jobExecutions) {
425 			if (jobExecution.getStatus() != BatchStatus.ABANDONED) {
426 				result.add(jobExecution);
427 			}
428 		}
429 		return result.isEmpty() ? null : result;
430 	}
431 
432 	private List<JobExecution> getRunningJobExecutions(String jobIdentifier) {
433 		List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.COMPLETED);
434 		if (jobExecutions.isEmpty()) {
435 			return null;
436 		}
437 		List<JobExecution> result = new ArrayList<JobExecution>();
438 		for (JobExecution jobExecution : jobExecutions) {
439 			if (jobExecution.isRunning()) {
440 				result.add(jobExecution);
441 			}
442 		}
443 		return result.isEmpty() ? null : result;
444 	}
445 
446 	private Long getLongIdentifier(String jobIdentifier) {
447 		try {
448 			return new Long(jobIdentifier);
449 		}
450 		catch (NumberFormatException e) {
451 			// Not an ID - must be a name
452 			return null;
453 		}
454 	}
455 
456 	/**
457 	 * @param job the job that we need to find the next parameters for
458 	 * @return the next job parameters if they can be located
459 	 * @throws JobParametersNotFoundException if there is a problem
460 	 */
461 	private JobParameters getNextJobParameters(Job job) throws JobParametersNotFoundException {
462 		String jobIdentifier = job.getName();
463 		JobParameters jobParameters;
464 		List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, 0, 1);
465 
466 		JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
467 		if (incrementer == null) {
468 			throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobIdentifier);
469 		}
470 
471 		if (lastInstances.isEmpty()) {
472 			jobParameters = incrementer.getNext(new JobParameters());
473 			if (jobParameters == null) {
474 				throw new JobParametersNotFoundException("No bootstrap parameters found from incrementer for job="
475 						+ jobIdentifier);
476 			}
477 		}
478 		else {
479 			jobParameters = incrementer.getNext(lastInstances.get(0).getJobParameters());
480 		}
481 		return jobParameters;
482 	}
483 
484 	/**
485 	 * Launch a batch job using a {@link CommandLineJobRunner}. Creates a new
486 	 * Spring context for the job execution, and uses a common parent for all
487 	 * such contexts. No exception are thrown from this method, rather
488 	 * exceptions are logged and an integer returned through the exit status in
489 	 * a {@link JvmSystemExiter} (which can be overridden by defining one in the
490 	 * Spring context).<br/>
491 	 * Parameters can be provided in the form key=value, and will be converted
492 	 * using the injected {@link JobParametersConverter}.
493 	 * 
494 	 * @param args <p>
495 	 * <ul>
496 	 * <li>-restart: (optional) if the job has failed or stopped and the most
497 	 * should be restarted. If specified then the jobIdentifier parameter can be
498 	 * interpreted either as the name of the job or the id of teh job execution
499 	 * that failed.</li>
500 	 * <li>-next: (optional) if the job has a {@link JobParametersIncrementer}
501 	 * that can be used to launch the next in a sequence</li>
502 	 * <li>jobPath: the xml application context containing a {@link Job}
503 	 * <li>jobIdentifier: the bean id of the job or id of the failed execution
504 	 * in the case of a restart.
505 	 * <li>jobParameters: 0 to many parameters that will be used to launch a
506 	 * job.
507 	 * </ul>
508 	 * The options (<code>-restart, -next</code>) can occur anywhere in the
509 	 * command line.
510 	 * </p>
511 	 */
512 	public static void main(String[] args) throws Exception {
513 
514 		CommandLineJobRunner command = new CommandLineJobRunner();
515 
516 		List<String> newargs = new ArrayList<String>(Arrays.asList(args));
517 
518 		try {
519 			if (System.in.available() > 0) {
520 				BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
521 				String line = " ";
522 				while (StringUtils.hasLength(line)) {
523 					if (!line.startsWith("#") && StringUtils.hasText(line)) {
524 						logger.debug("Stdin arg: " + line);
525 						newargs.add(line);
526 					}
527 					line = reader.readLine();
528 				}
529 			}
530 		}
531 		catch (IOException e) {
532 			logger.warn("Could not access stdin (maybe a platform limitation)");
533 			if (logger.isDebugEnabled()) {
534 				logger.debug("Exception details", e);
535 			}
536 		}
537 
538 		Set<String> opts = new HashSet<String>();
539 		List<String> params = new ArrayList<String>();
540 
541 		int count = 0;
542 		String jobPath = null;
543 		String jobIdentifier = null;
544 
545 		for (String arg : newargs) {
546 			if (arg.startsWith("-")) {
547 				opts.add(arg);
548 			}
549 			else {
550 				switch (count) {
551 				case 0:
552 					jobPath = arg;
553 					break;
554 				case 1:
555 					jobIdentifier = arg;
556 					break;
557 				default:
558 					params.add(arg);
559 					break;
560 				}
561 				count++;
562 			}
563 		}
564 
565 		if (jobPath == null || jobIdentifier == null) {
566 			String message = "At least 2 arguments are required: JobPath and jobIdentifier.";
567 			logger.error(message);
568 			CommandLineJobRunner.message = message;
569 			command.exit(1);
570 		}
571 
572 		String[] parameters = params.toArray(new String[params.size()]);
573 
574 		int result = command.start(jobPath, jobIdentifier, parameters, opts);
575 		command.exit(result);
576 	}
577 
578 }