1 | /* |
2 | * Copyright 2006-2008 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.launch.support; |
17 | |
18 | import java.io.BufferedReader; |
19 | import java.io.IOException; |
20 | import java.io.InputStreamReader; |
21 | import java.util.ArrayList; |
22 | import java.util.Arrays; |
23 | import java.util.Collections; |
24 | import java.util.HashMap; |
25 | import java.util.HashSet; |
26 | import java.util.List; |
27 | import java.util.Map; |
28 | import java.util.Properties; |
29 | import java.util.Set; |
30 | |
31 | import org.apache.commons.logging.Log; |
32 | import org.apache.commons.logging.LogFactory; |
33 | import org.springframework.batch.core.BatchStatus; |
34 | import org.springframework.batch.core.ExitStatus; |
35 | import org.springframework.batch.core.Job; |
36 | import org.springframework.batch.core.JobExecution; |
37 | import org.springframework.batch.core.JobInstance; |
38 | import org.springframework.batch.core.JobParameter; |
39 | import org.springframework.batch.core.JobParameters; |
40 | import org.springframework.batch.core.JobParametersIncrementer; |
41 | import org.springframework.batch.core.configuration.JobLocator; |
42 | import org.springframework.batch.core.converter.DefaultJobParametersConverter; |
43 | import org.springframework.batch.core.converter.JobParametersConverter; |
44 | import org.springframework.batch.core.explore.JobExplorer; |
45 | import org.springframework.batch.core.launch.JobExecutionNotFailedException; |
46 | import org.springframework.batch.core.launch.JobExecutionNotRunningException; |
47 | import org.springframework.batch.core.launch.JobExecutionNotStoppedException; |
48 | import org.springframework.batch.core.launch.JobLauncher; |
49 | import org.springframework.batch.core.launch.JobParametersNotFoundException; |
50 | import org.springframework.batch.core.repository.JobRepository; |
51 | import org.springframework.beans.factory.BeanDefinitionStoreException; |
52 | import org.springframework.beans.factory.config.AutowireCapableBeanFactory; |
53 | import org.springframework.context.ConfigurableApplicationContext; |
54 | import org.springframework.context.support.ClassPathXmlApplicationContext; |
55 | import org.springframework.util.Assert; |
56 | import org.springframework.util.StringUtils; |
57 | |
58 | /** |
59 | * <p> |
60 | * Basic launcher for starting jobs from the command line. In general, it is |
61 | * assumed that this launcher will primarily be used to start a job via a script |
62 | * from an Enterprise Scheduler. Therefore, exit codes are mapped to integers so |
63 | * that schedulers can use the returned values to determine the next course of |
64 | * action. The returned values can also be useful to operations teams in |
65 | * determining what should happen upon failure. For example, a returned code of |
66 | * 5 might mean that some resource wasn't available and the job should be |
67 | * restarted. However, a code of 10 might mean that something critical has |
68 | * happened and the issue should be escalated. |
69 | * </p> |
70 | * |
71 | * <p> |
72 | * With any launch of a batch job within Spring Batch, a Spring context |
73 | * containing the {@link Job} and some execution context has to be created. This |
74 | * command line launcher can be used to load the job and its context from a |
75 | * single location. All dependencies of the launcher will then be satisfied by |
76 | * autowiring by type from the combined application context. Default values are |
77 | * provided for all fields except the {@link JobLauncher} and {@link JobLocator} |
78 | * . Therefore, if autowiring fails to set it (it should be noted that |
79 | * dependency checking is disabled because most of the fields have default |
80 | * values and thus don't require dependencies to be fulfilled via autowiring) |
81 | * then an exception will be thrown. It should also be noted that even if an |
82 | * exception is thrown by this class, it will be mapped to an integer and |
83 | * returned. |
84 | * </p> |
85 | * |
86 | * <p> |
87 | * Notice a property is available to set the {@link SystemExiter}. This class is |
88 | * used to exit from the main method, rather than calling System.exit() |
89 | * directly. This is because unit testing a class the calls System.exit() is |
90 | * impossible without kicking off the test within a new JVM, which it is |
91 | * possible to do, however it is a complex solution, much more so than |
92 | * strategizing the exiter. |
93 | * </p> |
94 | * |
95 | * <p> |
96 | * The arguments to this class can be provided on the command line (separated by |
97 | * spaces), or through stdin (separated by new line). They are as follows: |
98 | * </p> |
99 | * |
100 | * <code> |
101 | * jobPath <options> jobIdentifier (jobParameters)* |
102 | * </code> |
103 | * |
104 | * <p> |
105 | * The command line options are as follows |
106 | * <ul> |
107 | * <li>jobPath: the xml application context containing a {@link Job} |
108 | * <li>-restart: (optional) to restart the last failed execution</li> |
109 | * <li>-stop: (optional) to stop a running execution</li> |
110 | * <li>-abandon: (optional) to abandon a stopped execution</li> |
111 | * <li>-next: (optional) to start the next in a sequence according to the |
112 | * {@link JobParametersIncrementer} in the {@link Job}</li> |
113 | * <li>jobIdentifier: the name of the job or the id of a job execution (for |
114 | * -stop, -abandon or -restart). |
115 | * <li>jobParameters: 0 to many parameters that will be used to launch a job |
116 | * specified in the form of <code>key=value</code> pairs. |
117 | * </ul> |
118 | * </p> |
119 | * |
120 | * <p> |
121 | * If the <code>-next</code> option is used the parameters on the command line |
122 | * (if any) are appended to those retrieved from the incrementer, overriding any |
123 | * with the same key. |
124 | * </p> |
125 | * |
126 | * <p> |
127 | * The combined application context must contain only one instance of |
128 | * {@link JobLauncher}. The job parameters passed in to the command line will be |
129 | * converted to {@link Properties} by assuming that each individual element is |
130 | * one parameter that is separated by an equals sign. For example, |
131 | * "vendor.id=290232". The resulting properties instance is converted to |
132 | * {@link JobParameters} using a {@link JobParametersConverter} from the |
133 | * application context (if there is one, or a |
134 | * {@link DefaultJobParametersConverter} otherwise). Below is an example |
135 | * arguments list: " |
136 | * |
137 | * <p> |
138 | * <code> |
139 | * java org.springframework.batch.core.launch.support.CommandLineJobRunner testJob.xml |
140 | * testJob schedule.date=2008/01/24 vendor.id=3902483920 |
141 | * <code> |
142 | * </p> |
143 | * |
144 | * <p> |
145 | * Once arguments have been successfully parsed, autowiring will be used to set |
146 | * various dependencies. The {@JobLauncher} for example, will be |
147 | * loaded this way. If none is contained in the bean factory (it searches by |
148 | * type) then a {@link BeanDefinitionStoreException} will be thrown. The same |
149 | * exception will also be thrown if there is more than one present. Assuming the |
150 | * JobLauncher has been set correctly, the jobIdentifier argument will be used |
151 | * to obtain an actual {@link Job}. If a {@link JobLocator} has been set, then |
152 | * it will be used, if not the beanFactory will be asked, using the |
153 | * jobIdentifier as the bean id. |
154 | * </p> |
155 | * |
156 | * @author Dave Syer |
157 | * @author Lucas Ward |
158 | * @since 1.0 |
159 | */ |
160 | public class CommandLineJobRunner { |
161 | |
162 | protected static final Log logger = LogFactory.getLog(CommandLineJobRunner.class); |
163 | |
164 | private ExitCodeMapper exitCodeMapper = new SimpleJvmExitCodeMapper(); |
165 | |
166 | private JobLauncher launcher; |
167 | |
168 | private JobLocator jobLocator; |
169 | |
170 | // Package private for unit test |
171 | private static SystemExiter systemExiter = new JvmSystemExiter(); |
172 | |
173 | private static String message = ""; |
174 | |
175 | private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter(); |
176 | |
177 | private JobExplorer jobExplorer; |
178 | |
179 | private JobRepository jobRepository; |
180 | |
181 | /** |
182 | * Injection setter for the {@link JobLauncher}. |
183 | * |
184 | * @param launcher the launcher to set |
185 | */ |
186 | public void setLauncher(JobLauncher launcher) { |
187 | this.launcher = launcher; |
188 | } |
189 | |
190 | /** |
191 | * @param jobRepository the jobRepository to set |
192 | */ |
193 | public void setJobRepository(JobRepository jobRepository) { |
194 | this.jobRepository = jobRepository; |
195 | } |
196 | |
197 | /** |
198 | * Injection setter for {@link JobExplorer}. |
199 | * |
200 | * @param jobExplorer the {@link JobExplorer} to set |
201 | */ |
202 | public void setJobExplorer(JobExplorer jobExplorer) { |
203 | this.jobExplorer = jobExplorer; |
204 | } |
205 | |
206 | /** |
207 | * Injection setter for the {@link ExitCodeMapper}. |
208 | * |
209 | * @param exitCodeMapper the exitCodeMapper to set |
210 | */ |
211 | public void setExitCodeMapper(ExitCodeMapper exitCodeMapper) { |
212 | this.exitCodeMapper = exitCodeMapper; |
213 | } |
214 | |
215 | /** |
216 | * Static setter for the {@link SystemExiter} so it can be adjusted before |
217 | * dependency injection. Typically overridden by |
218 | * {@link #setSystemExiter(SystemExiter)}. |
219 | * |
220 | * @param systemExitor |
221 | */ |
222 | public static void presetSystemExiter(SystemExiter systemExiter) { |
223 | CommandLineJobRunner.systemExiter = systemExiter; |
224 | } |
225 | |
226 | /** |
227 | * Retrieve the error message set by an instance of |
228 | * {@link CommandLineJobRunner} as it exits. Empty if the last job launched |
229 | * was successful. |
230 | * |
231 | * @return the error message |
232 | */ |
233 | public static String getErrorMessage() { |
234 | return message; |
235 | } |
236 | |
237 | /** |
238 | * Injection setter for the {@link SystemExiter}. |
239 | * |
240 | * @param systemExitor |
241 | */ |
242 | public void setSystemExiter(SystemExiter systemExiter) { |
243 | CommandLineJobRunner.systemExiter = systemExiter; |
244 | } |
245 | |
246 | /** |
247 | * Injection setter for {@link JobParametersConverter}. |
248 | * |
249 | * @param jobParametersConverter |
250 | */ |
251 | public void setJobParametersConverter(JobParametersConverter jobParametersConverter) { |
252 | this.jobParametersConverter = jobParametersConverter; |
253 | } |
254 | |
255 | /** |
256 | * Delegate to the exiter to (possibly) exit the VM gracefully. |
257 | * |
258 | * @param status |
259 | */ |
260 | public void exit(int status) { |
261 | systemExiter.exit(status); |
262 | } |
263 | |
264 | /** |
265 | * {@link JobLocator} to find a job to run. |
266 | * @param jobLocator a {@link JobLocator} |
267 | */ |
268 | public void setJobLocator(JobLocator jobLocator) { |
269 | this.jobLocator = jobLocator; |
270 | } |
271 | |
272 | /* |
273 | * Start a job by obtaining a combined classpath using the job launcher and |
274 | * job paths. If a JobLocator has been set, then use it to obtain an actual |
275 | * job, if not ask the context for it. |
276 | */ |
277 | int start(String jobPath, String jobIdentifier, String[] parameters, Set<String> opts) { |
278 | |
279 | ConfigurableApplicationContext context = null; |
280 | |
281 | try { |
282 | context = new ClassPathXmlApplicationContext(jobPath); |
283 | context.getAutowireCapableBeanFactory().autowireBeanProperties(this, |
284 | AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, false); |
285 | |
286 | Assert.state(launcher != null, "A JobLauncher must be provided. Please add one to the configuration."); |
287 | if (opts.contains("-restart") || opts.contains("-next")) { |
288 | Assert.state(jobExplorer != null, |
289 | "A JobExplorer must be provided for a restart or start next operation. Please add one to the configuration."); |
290 | } |
291 | |
292 | String jobName = jobIdentifier; |
293 | |
294 | JobParameters jobParameters = jobParametersConverter.getJobParameters(StringUtils |
295 | .splitArrayElementsIntoProperties(parameters, "=")); |
296 | Assert.isTrue(parameters == null || parameters.length == 0 || !jobParameters.isEmpty(), |
297 | "Invalid JobParameters " + Arrays.asList(parameters) |
298 | + ". If parameters are provided they should be in the form name=value (no whitespace)."); |
299 | |
300 | if (opts.contains("-stop")) { |
301 | List<JobExecution> jobExecutions = getRunningJobExecutions(jobIdentifier); |
302 | if (jobExecutions == null) { |
303 | throw new JobExecutionNotRunningException("No running execution found for job=" + jobIdentifier); |
304 | } |
305 | for (JobExecution jobExecution : jobExecutions) { |
306 | jobExecution.setStatus(BatchStatus.STOPPING); |
307 | jobRepository.update(jobExecution); |
308 | } |
309 | return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode()); |
310 | } |
311 | |
312 | if (opts.contains("-abandon")) { |
313 | List<JobExecution> jobExecutions = getStoppedJobExecutions(jobIdentifier); |
314 | if (jobExecutions == null) { |
315 | throw new JobExecutionNotStoppedException("No stopped execution found for job=" + jobIdentifier); |
316 | } |
317 | for (JobExecution jobExecution : jobExecutions) { |
318 | jobExecution.setStatus(BatchStatus.ABANDONED); |
319 | jobRepository.update(jobExecution); |
320 | } |
321 | return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode()); |
322 | } |
323 | |
324 | if (opts.contains("-restart")) { |
325 | JobExecution jobExecution = getLastFailedJobExecution(jobIdentifier); |
326 | if (jobExecution == null) { |
327 | throw new JobExecutionNotFailedException("No failed or stopped execution found for job=" |
328 | + jobIdentifier); |
329 | } |
330 | jobParameters = jobExecution.getJobInstance().getJobParameters(); |
331 | jobName = jobExecution.getJobInstance().getJobName(); |
332 | } |
333 | |
334 | Job job; |
335 | if (jobLocator != null) { |
336 | job = jobLocator.getJob(jobName); |
337 | } |
338 | else { |
339 | job = (Job) context.getBean(jobName); |
340 | } |
341 | |
342 | if (opts.contains("-next")) { |
343 | JobParameters nextParameters = getNextJobParameters(job); |
344 | Map<String, JobParameter> map = new HashMap<String, JobParameter>(nextParameters.getParameters()); |
345 | map.putAll(jobParameters.getParameters()); |
346 | jobParameters = new JobParameters(map); |
347 | } |
348 | |
349 | JobExecution jobExecution = launcher.run(job, jobParameters); |
350 | return exitCodeMapper.intValue(jobExecution.getExitStatus().getExitCode()); |
351 | |
352 | } |
353 | catch (Throwable e) { |
354 | String message = "Job Terminated in error: " + e.getMessage(); |
355 | logger.error(message, e); |
356 | CommandLineJobRunner.message = message; |
357 | return exitCodeMapper.intValue(ExitStatus.FAILED.getExitCode()); |
358 | } |
359 | finally { |
360 | if (context != null) { |
361 | context.close(); |
362 | } |
363 | } |
364 | } |
365 | |
366 | /** |
367 | * @param jobIdentifier a job execution id or job name |
368 | * @param minStatus the highest status to exclude from the result |
369 | * @return |
370 | */ |
371 | private List<JobExecution> getJobExecutionsWithStatusGreaterThan(String jobIdentifier, BatchStatus minStatus) { |
372 | |
373 | Long executionId = getLongIdentifier(jobIdentifier); |
374 | if (executionId != null) { |
375 | JobExecution jobExecution = jobExplorer.getJobExecution(executionId); |
376 | if (jobExecution.getStatus().isGreaterThan(minStatus)) { |
377 | return Arrays.asList(jobExecution); |
378 | } |
379 | return Collections.emptyList(); |
380 | } |
381 | |
382 | int start = 0; |
383 | int count = 100; |
384 | List<JobExecution> executions = new ArrayList<JobExecution>(); |
385 | List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count); |
386 | |
387 | while (!lastInstances.isEmpty()) { |
388 | |
389 | for (JobInstance jobInstance : lastInstances) { |
390 | List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance); |
391 | if (jobExecutions == null || jobExecutions.isEmpty()) { |
392 | continue; |
393 | } |
394 | for (JobExecution jobExecution : jobExecutions) { |
395 | if (jobExecution.getStatus().isGreaterThan(minStatus)) { |
396 | executions.add(jobExecution); |
397 | } |
398 | } |
399 | } |
400 | |
401 | start += count; |
402 | lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count); |
403 | |
404 | } |
405 | |
406 | return executions; |
407 | |
408 | } |
409 | |
410 | private JobExecution getLastFailedJobExecution(String jobIdentifier) { |
411 | List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STOPPING); |
412 | if (jobExecutions.isEmpty()) { |
413 | return null; |
414 | } |
415 | return jobExecutions.get(0); |
416 | } |
417 | |
418 | private List<JobExecution> getStoppedJobExecutions(String jobIdentifier) { |
419 | List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STARTED); |
420 | if (jobExecutions.isEmpty()) { |
421 | return null; |
422 | } |
423 | List<JobExecution> result = new ArrayList<JobExecution>(); |
424 | for (JobExecution jobExecution : jobExecutions) { |
425 | if (jobExecution.getStatus() != BatchStatus.ABANDONED) { |
426 | result.add(jobExecution); |
427 | } |
428 | } |
429 | return result.isEmpty() ? null : result; |
430 | } |
431 | |
432 | private List<JobExecution> getRunningJobExecutions(String jobIdentifier) { |
433 | List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.COMPLETED); |
434 | if (jobExecutions.isEmpty()) { |
435 | return null; |
436 | } |
437 | List<JobExecution> result = new ArrayList<JobExecution>(); |
438 | for (JobExecution jobExecution : jobExecutions) { |
439 | if (jobExecution.isRunning()) { |
440 | result.add(jobExecution); |
441 | } |
442 | } |
443 | return result.isEmpty() ? null : result; |
444 | } |
445 | |
446 | private Long getLongIdentifier(String jobIdentifier) { |
447 | try { |
448 | return new Long(jobIdentifier); |
449 | } |
450 | catch (NumberFormatException e) { |
451 | // Not an ID - must be a name |
452 | return null; |
453 | } |
454 | } |
455 | |
456 | /** |
457 | * @param job the job that we need to find the next parameters for |
458 | * @return the next job parameters if they can be located |
459 | * @throws JobParametersNotFoundException if there is a problem |
460 | */ |
461 | private JobParameters getNextJobParameters(Job job) throws JobParametersNotFoundException { |
462 | String jobIdentifier = job.getName(); |
463 | JobParameters jobParameters; |
464 | List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, 0, 1); |
465 | |
466 | JobParametersIncrementer incrementer = job.getJobParametersIncrementer(); |
467 | if (incrementer == null) { |
468 | throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobIdentifier); |
469 | } |
470 | |
471 | if (lastInstances.isEmpty()) { |
472 | jobParameters = incrementer.getNext(new JobParameters()); |
473 | if (jobParameters == null) { |
474 | throw new JobParametersNotFoundException("No bootstrap parameters found from incrementer for job=" |
475 | + jobIdentifier); |
476 | } |
477 | } |
478 | else { |
479 | jobParameters = incrementer.getNext(lastInstances.get(0).getJobParameters()); |
480 | } |
481 | return jobParameters; |
482 | } |
483 | |
484 | /** |
485 | * Launch a batch job using a {@link CommandLineJobRunner}. Creates a new |
486 | * Spring context for the job execution, and uses a common parent for all |
487 | * such contexts. No exception are thrown from this method, rather |
488 | * exceptions are logged and an integer returned through the exit status in |
489 | * a {@link JvmSystemExiter} (which can be overridden by defining one in the |
490 | * Spring context).<br/> |
491 | * Parameters can be provided in the form key=value, and will be converted |
492 | * using the injected {@link JobParametersConverter}. |
493 | * |
494 | * @param args <p> |
495 | * <ul> |
496 | * <li>-restart: (optional) if the job has failed or stopped and the most |
497 | * should be restarted. If specified then the jobIdentifier parameter can be |
498 | * interpreted either as the name of the job or the id of teh job execution |
499 | * that failed.</li> |
500 | * <li>-next: (optional) if the job has a {@link JobParametersIncrementer} |
501 | * that can be used to launch the next in a sequence</li> |
502 | * <li>jobPath: the xml application context containing a {@link Job} |
503 | * <li>jobIdentifier: the bean id of the job or id of the failed execution |
504 | * in the case of a restart. |
505 | * <li>jobParameters: 0 to many parameters that will be used to launch a |
506 | * job. |
507 | * </ul> |
508 | * The options (<code>-restart, -next</code>) can occur anywhere in the |
509 | * command line. |
510 | * </p> |
511 | */ |
512 | public static void main(String[] args) throws Exception { |
513 | |
514 | CommandLineJobRunner command = new CommandLineJobRunner(); |
515 | |
516 | List<String> newargs = new ArrayList<String>(Arrays.asList(args)); |
517 | |
518 | try { |
519 | if (System.in.available() > 0) { |
520 | BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); |
521 | String line = " "; |
522 | while (StringUtils.hasLength(line)) { |
523 | if (!line.startsWith("#") && StringUtils.hasText(line)) { |
524 | logger.debug("Stdin arg: " + line); |
525 | newargs.add(line); |
526 | } |
527 | line = reader.readLine(); |
528 | } |
529 | } |
530 | } |
531 | catch (IOException e) { |
532 | logger.warn("Could not access stdin (maybe a platform limitation)"); |
533 | if (logger.isDebugEnabled()) { |
534 | logger.debug("Exception details", e); |
535 | } |
536 | } |
537 | |
538 | Set<String> opts = new HashSet<String>(); |
539 | List<String> params = new ArrayList<String>(); |
540 | |
541 | int count = 0; |
542 | String jobPath = null; |
543 | String jobIdentifier = null; |
544 | |
545 | for (String arg : newargs) { |
546 | if (arg.startsWith("-")) { |
547 | opts.add(arg); |
548 | } |
549 | else { |
550 | switch (count) { |
551 | case 0: |
552 | jobPath = arg; |
553 | break; |
554 | case 1: |
555 | jobIdentifier = arg; |
556 | break; |
557 | default: |
558 | params.add(arg); |
559 | break; |
560 | } |
561 | count++; |
562 | } |
563 | } |
564 | |
565 | if (jobPath == null || jobIdentifier == null) { |
566 | String message = "At least 2 arguments are required: JobPath and jobIdentifier."; |
567 | logger.error(message); |
568 | CommandLineJobRunner.message = message; |
569 | command.exit(1); |
570 | } |
571 | |
572 | String[] parameters = params.toArray(new String[params.size()]); |
573 | |
574 | int result = command.start(jobPath, jobIdentifier, parameters, opts); |
575 | command.exit(result); |
576 | } |
577 | |
578 | } |