1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.launch.support; |
17 | |
18 | import java.io.BufferedReader; |
19 | import java.io.IOException; |
20 | import java.io.InputStreamReader; |
21 | import java.util.ArrayList; |
22 | import java.util.Arrays; |
23 | import java.util.Collections; |
24 | import java.util.HashMap; |
25 | import java.util.HashSet; |
26 | import java.util.List; |
27 | import java.util.Map; |
28 | import java.util.Properties; |
29 | import java.util.Set; |
30 | |
31 | import org.apache.commons.logging.Log; |
32 | import org.apache.commons.logging.LogFactory; |
33 | import org.springframework.batch.core.BatchStatus; |
34 | import org.springframework.batch.core.ExitStatus; |
35 | import org.springframework.batch.core.Job; |
36 | import org.springframework.batch.core.JobExecution; |
37 | import org.springframework.batch.core.JobInstance; |
38 | import org.springframework.batch.core.JobParameter; |
39 | import org.springframework.batch.core.JobParameters; |
40 | import org.springframework.batch.core.JobParametersIncrementer; |
41 | import org.springframework.batch.core.configuration.JobLocator; |
42 | import org.springframework.batch.core.converter.DefaultJobParametersConverter; |
43 | import org.springframework.batch.core.converter.JobParametersConverter; |
44 | import org.springframework.batch.core.explore.JobExplorer; |
45 | import org.springframework.batch.core.launch.JobExecutionNotFailedException; |
46 | import org.springframework.batch.core.launch.JobExecutionNotRunningException; |
47 | import org.springframework.batch.core.launch.JobExecutionNotStoppedException; |
48 | import org.springframework.batch.core.launch.JobLauncher; |
49 | import org.springframework.batch.core.launch.JobParametersNotFoundException; |
50 | import org.springframework.batch.core.repository.JobRepository; |
51 | import org.springframework.beans.factory.BeanDefinitionStoreException; |
52 | import org.springframework.beans.factory.config.AutowireCapableBeanFactory; |
53 | import org.springframework.context.ConfigurableApplicationContext; |
54 | import org.springframework.context.support.ClassPathXmlApplicationContext; |
55 | import org.springframework.util.Assert; |
56 | import org.springframework.util.StringUtils; |
57 | |
58 | /** |
59 | * <p> |
60 | * Basic launcher for starting jobs from the command line. In general, it is |
61 | * assumed that this launcher will primarily be used to start a job via a script |
62 | * from an Enterprise Scheduler. Therefore, exit codes are mapped to integers so |
63 | * that schedulers can use the returned values to determine the next course of |
64 | * action. The returned values can also be useful to operations teams in |
65 | * determining what should happen upon failure. For example, a returned code of |
66 | * 5 might mean that some resource wasn't available and the job should be |
67 | * restarted. However, a code of 10 might mean that something critical has |
68 | * happened and the issue should be escalated. |
69 | * </p> |
70 | * |
71 | * <p> |
72 | * With any launch of a batch job within Spring Batch, a Spring context |
73 | * containing the {@link Job} and some execution context has to be created. This |
74 | * command line launcher can be used to load the job and its context from a |
75 | * single location. All dependencies of the launcher will then be satisfied by |
76 | * autowiring by type from the combined application context. Default values are |
77 | * provided for all fields except the {@link JobLauncher} and {@link JobLocator} |
78 | * . Therefore, if autowiring fails to set it (it should be noted that |
79 | * dependency checking is disabled because most of the fields have default |
80 | * values and thus don't require dependencies to be fulfilled via autowiring) |
81 | * then an exception will be thrown. It should also be noted that even if an |
82 | * exception is thrown by this class, it will be mapped to an integer and |
83 | * returned. |
84 | * </p> |
85 | * |
86 | * <p> |
87 | * Notice a property is available to set the {@link SystemExiter}. This class is |
88 | * used to exit from the main method, rather than calling System.exit() |
89 | * directly. This is because unit testing a class the calls System.exit() is |
90 | * impossible without kicking off the test within a new JVM, which it is |
91 | * possible to do, however it is a complex solution, much more so than |
92 | * strategizing the exiter. |
93 | * </p> |
94 | * |
95 | * <p> |
96 | * The arguments to this class can be provided on the command line (separated by |
97 | * spaces), or through stdin (separated by new line). They are as follows: |
98 | * </p> |
99 | * |
100 | * <code> |
101 | * jobPath <options> jobIdentifier (jobParameters)* |
102 | * </code> |
103 | * |
104 | * <p> |
105 | * The command line options are as follows |
106 | * <ul> |
107 | * <li>jobPath: the xml application context containing a {@link Job} |
108 | * <li>-restart: (optional) to restart the last failed execution</li> |
109 | * <li>-stop: (optional) to stop a running execution</li> |
110 | * <li>-abandon: (optional) to abandon a stopped execution</li> |
111 | * <li>-next: (optional) to start the next in a sequence according to the |
112 | * {@link JobParametersIncrementer} in the {@link Job}</li> |
113 | * <li>jobIdentifier: the name of the job or the id of a job execution (for |
114 | * -stop, -abandon or -restart). |
115 | * <li>jobParameters: 0 to many parameters that will be used to launch a job |
116 | * specified in the form of <code>key=value</code> pairs. |
117 | * </ul> |
118 | * </p> |
119 | * |
120 | * <p> |
121 | * If the <code>-next</code> option is used the parameters on the command line |
122 | * (if any) are appended to those retrieved from the incrementer, overriding any |
123 | * with the same key. |
124 | * </p> |
125 | * |
126 | * <p> |
127 | * The combined application context must contain only one instance of |
128 | * {@link JobLauncher}. The job parameters passed in to the command line will be |
129 | * converted to {@link Properties} by assuming that each individual element is |
130 | * one parameter that is separated by an equals sign. For example, |
131 | * "vendor.id=290232". The resulting properties instance is converted to |
132 | * {@link JobParameters} using a {@link JobParametersConverter} from the |
133 | * application context (if there is one, or a |
134 | * {@link DefaultJobParametersConverter} otherwise). Below is an example |
135 | * arguments list: " |
136 | * |
137 | * <p> |
138 | * <code> |
139 | * java org.springframework.batch.core.launch.support.CommandLineJobRunner testJob.xml |
140 | * testJob schedule.date=2008/01/24 vendor.id=3902483920 |
141 | * <code> |
142 | * </p> |
143 | * |
144 | * <p> |
145 | * Once arguments have been successfully parsed, autowiring will be used to set |
146 | * various dependencies. The {@JobLauncher} for example, will be |
147 | * loaded this way. If none is contained in the bean factory (it searches by |
148 | * type) then a {@link BeanDefinitionStoreException} will be thrown. The same |
149 | * exception will also be thrown if there is more than one present. Assuming the |
150 | * JobLauncher has been set correctly, the jobIdentifier argument will be used |
151 | * to obtain an actual {@link Job}. If a {@link JobLocator} has been set, then |
152 | * it will be used, if not the beanFactory will be asked, using the |
153 | * jobIdentifier as the bean id. |
154 | * </p> |
155 | * |
156 | * @author Dave Syer |
157 | * @author Lucas Ward |
158 | * @since 1.0 |
159 | */ |
160 | public class CommandLineJobRunner { |
161 | |
162 | protected static final Log logger = LogFactory.getLog(CommandLineJobRunner.class); |
163 | |
164 | private ExitCodeMapper exitCodeMapper = new SimpleJvmExitCodeMapper(); |
165 | |
166 | private JobLauncher launcher; |
167 | |
168 | private JobLocator jobLocator; |
169 | |
170 | // Package private for unit test |
171 | private static SystemExiter systemExiter = new JvmSystemExiter(); |
172 | |
173 | private static String message = ""; |
174 | |
175 | private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter(); |
176 | |
177 | private JobExplorer jobExplorer; |
178 | |
179 | private JobRepository jobRepository; |
180 | |
181 | private final static List<String> VALID_OPTS = Arrays.asList(new String [] {"-restart", "-next", "-stop", "-abandon"}); |
182 | |
183 | /** |
184 | * Injection setter for the {@link JobLauncher}. |
185 | * |
186 | * @param launcher the launcher to set |
187 | */ |
188 | public void setLauncher(JobLauncher launcher) { |
189 | this.launcher = launcher; |
190 | } |
191 | |
192 | /** |
193 | * @param jobRepository the jobRepository to set |
194 | */ |
195 | public void setJobRepository(JobRepository jobRepository) { |
196 | this.jobRepository = jobRepository; |
197 | } |
198 | |
199 | /** |
200 | * Injection setter for {@link JobExplorer}. |
201 | * |
202 | * @param jobExplorer the {@link JobExplorer} to set |
203 | */ |
204 | public void setJobExplorer(JobExplorer jobExplorer) { |
205 | this.jobExplorer = jobExplorer; |
206 | } |
207 | |
208 | /** |
209 | * Injection setter for the {@link ExitCodeMapper}. |
210 | * |
211 | * @param exitCodeMapper the exitCodeMapper to set |
212 | */ |
213 | public void setExitCodeMapper(ExitCodeMapper exitCodeMapper) { |
214 | this.exitCodeMapper = exitCodeMapper; |
215 | } |
216 | |
217 | /** |
218 | * Static setter for the {@link SystemExiter} so it can be adjusted before |
219 | * dependency injection. Typically overridden by |
220 | * {@link #setSystemExiter(SystemExiter)}. |
221 | * |
222 | * @param systemExiter |
223 | */ |
224 | public static void presetSystemExiter(SystemExiter systemExiter) { |
225 | CommandLineJobRunner.systemExiter = systemExiter; |
226 | } |
227 | |
228 | /** |
229 | * Retrieve the error message set by an instance of |
230 | * {@link CommandLineJobRunner} as it exits. Empty if the last job launched |
231 | * was successful. |
232 | * |
233 | * @return the error message |
234 | */ |
235 | public static String getErrorMessage() { |
236 | return message; |
237 | } |
238 | |
239 | /** |
240 | * Injection setter for the {@link SystemExiter}. |
241 | * |
242 | * @param systemExiter |
243 | */ |
244 | public void setSystemExiter(SystemExiter systemExiter) { |
245 | CommandLineJobRunner.systemExiter = systemExiter; |
246 | } |
247 | |
248 | /** |
249 | * Injection setter for {@link JobParametersConverter}. |
250 | * |
251 | * @param jobParametersConverter |
252 | */ |
253 | public void setJobParametersConverter(JobParametersConverter jobParametersConverter) { |
254 | this.jobParametersConverter = jobParametersConverter; |
255 | } |
256 | |
257 | /** |
258 | * Delegate to the exiter to (possibly) exit the VM gracefully. |
259 | * |
260 | * @param status |
261 | */ |
262 | public void exit(int status) { |
263 | systemExiter.exit(status); |
264 | } |
265 | |
266 | /** |
267 | * {@link JobLocator} to find a job to run. |
268 | * @param jobLocator a {@link JobLocator} |
269 | */ |
270 | public void setJobLocator(JobLocator jobLocator) { |
271 | this.jobLocator = jobLocator; |
272 | } |
273 | |
274 | /* |
275 | * Start a job by obtaining a combined classpath using the job launcher and |
276 | * job paths. If a JobLocator has been set, then use it to obtain an actual |
277 | * job, if not ask the context for it. |
278 | */ |
279 | int start(String jobPath, String jobIdentifier, String[] parameters, Set<String> opts) { |
280 | |
281 | ConfigurableApplicationContext context = null; |
282 | |
283 | try { |
284 | context = new ClassPathXmlApplicationContext(jobPath); |
285 | context.getAutowireCapableBeanFactory().autowireBeanProperties(this, |
286 | AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, false); |
287 | |
288 | Assert.state(launcher != null, "A JobLauncher must be provided. Please add one to the configuration."); |
289 | if (opts.contains("-restart") || opts.contains("-next")) { |
290 | Assert.state(jobExplorer != null, |
291 | "A JobExplorer must be provided for a restart or start next operation. Please add one to the configuration."); |
292 | } |
293 | |
294 | String jobName = jobIdentifier; |
295 | |
296 | JobParameters jobParameters = jobParametersConverter.getJobParameters(StringUtils |
297 | .splitArrayElementsIntoProperties(parameters, "=")); |
298 | Assert.isTrue(parameters == null || parameters.length == 0 || !jobParameters.isEmpty(), |
299 | "Invalid JobParameters " + Arrays.asList(parameters) |
300 | + ". If parameters are provided they should be in the form name=value (no whitespace)."); |
301 | |
302 | if (opts.contains("-stop")) { |
303 | List<JobExecution> jobExecutions = getRunningJobExecutions(jobIdentifier); |
304 | if (jobExecutions == null) { |
305 | throw new JobExecutionNotRunningException("No running execution found for job=" + jobIdentifier); |
306 | } |
307 | for (JobExecution jobExecution : jobExecutions) { |
308 | jobExecution.setStatus(BatchStatus.STOPPING); |
309 | jobRepository.update(jobExecution); |
310 | } |
311 | return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode()); |
312 | } |
313 | |
314 | if (opts.contains("-abandon")) { |
315 | List<JobExecution> jobExecutions = getStoppedJobExecutions(jobIdentifier); |
316 | if (jobExecutions == null) { |
317 | throw new JobExecutionNotStoppedException("No stopped execution found for job=" + jobIdentifier); |
318 | } |
319 | for (JobExecution jobExecution : jobExecutions) { |
320 | jobExecution.setStatus(BatchStatus.ABANDONED); |
321 | jobRepository.update(jobExecution); |
322 | } |
323 | return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode()); |
324 | } |
325 | |
326 | if (opts.contains("-restart")) { |
327 | JobExecution jobExecution = getLastFailedJobExecution(jobIdentifier); |
328 | if (jobExecution == null) { |
329 | throw new JobExecutionNotFailedException("No failed or stopped execution found for job=" |
330 | + jobIdentifier); |
331 | } |
332 | jobParameters = jobExecution.getJobParameters(); |
333 | jobName = jobExecution.getJobInstance().getJobName(); |
334 | } |
335 | |
336 | Job job; |
337 | if (jobLocator != null) { |
338 | job = jobLocator.getJob(jobName); |
339 | } |
340 | else { |
341 | job = (Job) context.getBean(jobName); |
342 | } |
343 | |
344 | if (opts.contains("-next")) { |
345 | JobParameters nextParameters = getNextJobParameters(job); |
346 | Map<String, JobParameter> map = new HashMap<String, JobParameter>(nextParameters.getParameters()); |
347 | map.putAll(jobParameters.getParameters()); |
348 | jobParameters = new JobParameters(map); |
349 | } |
350 | |
351 | JobExecution jobExecution = launcher.run(job, jobParameters); |
352 | return exitCodeMapper.intValue(jobExecution.getExitStatus().getExitCode()); |
353 | |
354 | } |
355 | catch (Throwable e) { |
356 | String message = "Job Terminated in error: " + e.getMessage(); |
357 | logger.error(message, e); |
358 | CommandLineJobRunner.message = message; |
359 | return exitCodeMapper.intValue(ExitStatus.FAILED.getExitCode()); |
360 | } |
361 | finally { |
362 | if (context != null) { |
363 | context.close(); |
364 | } |
365 | } |
366 | } |
367 | |
368 | /** |
369 | * @param jobIdentifier a job execution id or job name |
370 | * @param minStatus the highest status to exclude from the result |
371 | * @return |
372 | */ |
373 | private List<JobExecution> getJobExecutionsWithStatusGreaterThan(String jobIdentifier, BatchStatus minStatus) { |
374 | |
375 | Long executionId = getLongIdentifier(jobIdentifier); |
376 | if (executionId != null) { |
377 | JobExecution jobExecution = jobExplorer.getJobExecution(executionId); |
378 | if (jobExecution.getStatus().isGreaterThan(minStatus)) { |
379 | return Arrays.asList(jobExecution); |
380 | } |
381 | return Collections.emptyList(); |
382 | } |
383 | |
384 | int start = 0; |
385 | int count = 100; |
386 | List<JobExecution> executions = new ArrayList<JobExecution>(); |
387 | List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count); |
388 | |
389 | while (!lastInstances.isEmpty()) { |
390 | |
391 | for (JobInstance jobInstance : lastInstances) { |
392 | List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance); |
393 | if (jobExecutions == null || jobExecutions.isEmpty()) { |
394 | continue; |
395 | } |
396 | for (JobExecution jobExecution : jobExecutions) { |
397 | if (jobExecution.getStatus().isGreaterThan(minStatus)) { |
398 | executions.add(jobExecution); |
399 | } |
400 | } |
401 | } |
402 | |
403 | start += count; |
404 | lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count); |
405 | |
406 | } |
407 | |
408 | return executions; |
409 | |
410 | } |
411 | |
412 | private JobExecution getLastFailedJobExecution(String jobIdentifier) { |
413 | List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STOPPING); |
414 | if (jobExecutions.isEmpty()) { |
415 | return null; |
416 | } |
417 | return jobExecutions.get(0); |
418 | } |
419 | |
420 | private List<JobExecution> getStoppedJobExecutions(String jobIdentifier) { |
421 | List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STARTED); |
422 | if (jobExecutions.isEmpty()) { |
423 | return null; |
424 | } |
425 | List<JobExecution> result = new ArrayList<JobExecution>(); |
426 | for (JobExecution jobExecution : jobExecutions) { |
427 | if (jobExecution.getStatus() != BatchStatus.ABANDONED) { |
428 | result.add(jobExecution); |
429 | } |
430 | } |
431 | return result.isEmpty() ? null : result; |
432 | } |
433 | |
434 | private List<JobExecution> getRunningJobExecutions(String jobIdentifier) { |
435 | List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.COMPLETED); |
436 | if (jobExecutions.isEmpty()) { |
437 | return null; |
438 | } |
439 | List<JobExecution> result = new ArrayList<JobExecution>(); |
440 | for (JobExecution jobExecution : jobExecutions) { |
441 | if (jobExecution.isRunning()) { |
442 | result.add(jobExecution); |
443 | } |
444 | } |
445 | return result.isEmpty() ? null : result; |
446 | } |
447 | |
448 | private Long getLongIdentifier(String jobIdentifier) { |
449 | try { |
450 | return new Long(jobIdentifier); |
451 | } |
452 | catch (NumberFormatException e) { |
453 | // Not an ID - must be a name |
454 | return null; |
455 | } |
456 | } |
457 | |
458 | /** |
459 | * @param job the job that we need to find the next parameters for |
460 | * @return the next job parameters if they can be located |
461 | * @throws JobParametersNotFoundException if there is a problem |
462 | */ |
463 | private JobParameters getNextJobParameters(Job job) throws JobParametersNotFoundException { |
464 | String jobIdentifier = job.getName(); |
465 | JobParameters jobParameters; |
466 | List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, 0, 1); |
467 | |
468 | JobParametersIncrementer incrementer = job.getJobParametersIncrementer(); |
469 | if (incrementer == null) { |
470 | throw new JobParametersNotFoundException("No job parameters incrementer found for job=" + jobIdentifier); |
471 | } |
472 | |
473 | if (lastInstances.isEmpty()) { |
474 | jobParameters = incrementer.getNext(new JobParameters()); |
475 | if (jobParameters == null) { |
476 | throw new JobParametersNotFoundException("No bootstrap parameters found from incrementer for job=" |
477 | + jobIdentifier); |
478 | } |
479 | } |
480 | else { |
481 | List<JobExecution> lastExecutions = jobExplorer.getJobExecutions(lastInstances.get(0)); |
482 | jobParameters = incrementer.getNext(lastExecutions.get(0).getJobParameters()); |
483 | } |
484 | return jobParameters; |
485 | } |
486 | |
487 | /** |
488 | * Launch a batch job using a {@link CommandLineJobRunner}. Creates a new |
489 | * Spring context for the job execution, and uses a common parent for all |
490 | * such contexts. No exception are thrown from this method, rather |
491 | * exceptions are logged and an integer returned through the exit status in |
492 | * a {@link JvmSystemExiter} (which can be overridden by defining one in the |
493 | * Spring context).<br/> |
494 | * Parameters can be provided in the form key=value, and will be converted |
495 | * using the injected {@link JobParametersConverter}. |
496 | * |
497 | * @param args <p> |
498 | * <ul> |
499 | * <li>-restart: (optional) if the job has failed or stopped and the most |
500 | * should be restarted. If specified then the jobIdentifier parameter can be |
501 | * interpreted either as the name of the job or the id of teh job execution |
502 | * that failed.</li> |
503 | * <li>-next: (optional) if the job has a {@link JobParametersIncrementer} |
504 | * that can be used to launch the next in a sequence</li> |
505 | * <li>jobPath: the xml application context containing a {@link Job} |
506 | * <li>jobIdentifier: the bean id of the job or id of the failed execution |
507 | * in the case of a restart. |
508 | * <li>jobParameters: 0 to many parameters that will be used to launch a |
509 | * job. |
510 | * </ul> |
511 | * The options (<code>-restart, -next</code>) can occur anywhere in the |
512 | * command line. |
513 | * </p> |
514 | */ |
515 | public static void main(String[] args) throws Exception { |
516 | |
517 | CommandLineJobRunner command = new CommandLineJobRunner(); |
518 | |
519 | List<String> newargs = new ArrayList<String>(Arrays.asList(args)); |
520 | |
521 | try { |
522 | if (System.in.available() > 0) { |
523 | BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); |
524 | String line = " "; |
525 | while (StringUtils.hasLength(line)) { |
526 | if (!line.startsWith("#") && StringUtils.hasText(line)) { |
527 | logger.debug("Stdin arg: " + line); |
528 | newargs.add(line); |
529 | } |
530 | line = reader.readLine(); |
531 | } |
532 | } |
533 | } |
534 | catch (IOException e) { |
535 | logger.warn("Could not access stdin (maybe a platform limitation)"); |
536 | if (logger.isDebugEnabled()) { |
537 | logger.debug("Exception details", e); |
538 | } |
539 | } |
540 | |
541 | Set<String> opts = new HashSet<String>(); |
542 | List<String> params = new ArrayList<String>(); |
543 | |
544 | int count = 0; |
545 | String jobPath = null; |
546 | String jobIdentifier = null; |
547 | |
548 | for (String arg : newargs) { |
549 | if (VALID_OPTS.contains(arg)) { |
550 | opts.add(arg); |
551 | } |
552 | else { |
553 | switch (count) { |
554 | case 0: |
555 | jobPath = arg; |
556 | break; |
557 | case 1: |
558 | jobIdentifier = arg; |
559 | break; |
560 | default: |
561 | params.add(arg); |
562 | break; |
563 | } |
564 | count++; |
565 | } |
566 | } |
567 | |
568 | if (jobPath == null || jobIdentifier == null) { |
569 | String message = "At least 2 arguments are required: JobPath and jobIdentifier."; |
570 | logger.error(message); |
571 | CommandLineJobRunner.message = message; |
572 | command.exit(1); |
573 | } |
574 | |
575 | String[] parameters = params.toArray(new String[params.size()]); |
576 | |
577 | int result = command.start(jobPath, jobIdentifier, parameters, opts); |
578 | command.exit(result); |
579 | } |
580 | |
581 | } |