View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.core.launch.support;
17  
18  import org.apache.commons.logging.Log;
19  import org.apache.commons.logging.LogFactory;
20  import org.springframework.batch.core.BatchStatus;
21  import org.springframework.batch.core.ExitStatus;
22  import org.springframework.batch.core.Job;
23  import org.springframework.batch.core.JobExecution;
24  import org.springframework.batch.core.JobInstance;
25  import org.springframework.batch.core.JobParameters;
26  import org.springframework.batch.core.JobParametersInvalidException;
27  import org.springframework.batch.core.launch.JobLauncher;
28  import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
29  import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
30  import org.springframework.batch.core.repository.JobRepository;
31  import org.springframework.batch.core.repository.JobRestartException;
32  import org.springframework.beans.factory.InitializingBean;
33  import org.springframework.core.task.SyncTaskExecutor;
34  import org.springframework.core.task.TaskExecutor;
35  import org.springframework.core.task.TaskRejectedException;
36  import org.springframework.util.Assert;
37  
38  /**
39   * Simple implementation of the {@link JobLauncher} interface. The Spring Core
40   * {@link TaskExecutor} interface is used to launch a {@link Job}. This means
41   * that the type of executor set is very important. If a
42   * {@link SyncTaskExecutor} is used, then the job will be processed
43   * <strong>within the same thread that called the launcher.</strong> Care should
44   * be taken to ensure any users of this class understand fully whether or not
45   * the implementation of TaskExecutor used will start tasks synchronously or
46   * asynchronously. The default setting uses a synchronous task executor.
47   *
48   * There is only one required dependency of this Launcher, a
49   * {@link JobRepository}. The JobRepository is used to obtain a valid
50   * JobExecution. The Repository must be used because the provided {@link Job}
51   * could be a restart of an existing {@link JobInstance}, and only the
52   * Repository can reliably recreate it.
53   *
54   * @author Lucas Ward
55   * @Author Dave Syer
56   *
57   * @since 1.0
58   *
59   * @see JobRepository
60   * @see TaskExecutor
61   */
62  public class SimpleJobLauncher implements JobLauncher, InitializingBean {
63  
64  	protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);
65  
66  	private JobRepository jobRepository;
67  
68  	private TaskExecutor taskExecutor;
69  
70  	/**
71  	 * Run the provided job with the given {@link JobParameters}. The
72  	 * {@link JobParameters} will be used to determine if this is an execution
73  	 * of an existing job instance, or if a new one should be created.
74  	 *
75  	 * @param job the job to be run.
76  	 * @param jobParameters the {@link JobParameters} for this particular
77  	 * execution.
78  	 * @return JobExecutionAlreadyRunningException if the JobInstance already
79  	 * exists and has an execution already running.
80  	 * @throws JobRestartException if the execution would be a re-start, but a
81  	 * re-start is either not allowed or not needed.
82  	 * @throws JobInstanceAlreadyCompleteException if this instance has already
83  	 * completed successfully
84  	 * @throws JobParametersInvalidException
85  	 */
86  	@Override
87  	public JobExecution run(final Job job, final JobParameters jobParameters)
88  			throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
89  			JobParametersInvalidException {
90  
91  		Assert.notNull(job, "The Job must not be null.");
92  		Assert.notNull(jobParameters, "The JobParameters must not be null.");
93  
94  		final JobExecution jobExecution;
95  		JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
96  		if (lastExecution != null) {
97  			if (!job.isRestartable()) {
98  				throw new JobRestartException("JobInstance already exists and is not restartable");
99  			}
100 		}
101 
102 		// Check the validity of the parameters before doing creating anything
103 		// in the repository...
104 		job.getJobParametersValidator().validate(jobParameters);
105 
106 		/*
107 		 * There is a very small probability that a non-restartable job can be
108 		 * restarted, but only if another process or thread manages to launch
109 		 * <i>and</i> fail a job execution for this instance between the last
110 		 * assertion and the next method returning successfully.
111 		 */
112 		jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
113 
114 		try {
115 			taskExecutor.execute(new Runnable() {
116 
117 				@Override
118 				public void run() {
119 					try {
120 						logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
121 								+ "]");
122 						job.execute(jobExecution);
123 						logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
124 								+ "] and the following status: [" + jobExecution.getStatus() + "]");
125 					}
126 					catch (Throwable t) {
127 						logger.info("Job: [" + job
128 								+ "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
129 								+ "]", t);
130 						rethrow(t);
131 					}
132 				}
133 
134 				private void rethrow(Throwable t) {
135 					if (t instanceof RuntimeException) {
136 						throw (RuntimeException) t;
137 					}
138 					else if (t instanceof Error) {
139 						throw (Error) t;
140 					}
141 					throw new IllegalStateException(t);
142 				}
143 			});
144 		}
145 		catch (TaskRejectedException e) {
146 			jobExecution.upgradeStatus(BatchStatus.FAILED);
147 			if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
148 				jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
149 			}
150 			jobRepository.update(jobExecution);
151 		}
152 
153 		return jobExecution;
154 	}
155 
156 	/**
157 	 * Set the JobRepsitory.
158 	 *
159 	 * @param jobRepository
160 	 */
161 	public void setJobRepository(JobRepository jobRepository) {
162 		this.jobRepository = jobRepository;
163 	}
164 
165 	/**
166 	 * Set the TaskExecutor. (Optional)
167 	 *
168 	 * @param taskExecutor
169 	 */
170 	public void setTaskExecutor(TaskExecutor taskExecutor) {
171 		this.taskExecutor = taskExecutor;
172 	}
173 
174 	/**
175 	 * Ensure the required dependencies of a {@link JobRepository} have been
176 	 * set.
177 	 */
178 	@Override
179 	public void afterPropertiesSet() throws Exception {
180 		Assert.state(jobRepository != null, "A JobRepository has not been set.");
181 		if (taskExecutor == null) {
182 			logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
183 			taskExecutor = new SyncTaskExecutor();
184 		}
185 	}
186 
187 }