View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.step.tasklet;
18  
19  import java.io.File;
20  import java.util.concurrent.Callable;
21  import java.util.concurrent.FutureTask;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.springframework.batch.core.ExitStatus;
26  import org.springframework.batch.core.JobInterruptedException;
27  import org.springframework.batch.core.StepContribution;
28  import org.springframework.batch.core.StepExecution;
29  import org.springframework.batch.core.listener.StepExecutionListenerSupport;
30  import org.springframework.batch.core.scope.context.ChunkContext;
31  import org.springframework.batch.repeat.RepeatStatus;
32  import org.springframework.beans.factory.InitializingBean;
33  import org.springframework.core.task.SimpleAsyncTaskExecutor;
34  import org.springframework.core.task.TaskExecutor;
35  import org.springframework.util.Assert;
36  
37  /**
38   * {@link Tasklet} that executes a system command.
39   *
40   * The system command is executed asynchronously using injected
41   * {@link #setTaskExecutor(TaskExecutor)} - timeout value is required to be set,
42   * so that the batch job does not hang forever if the external process hangs.
43   *
44   * Tasklet periodically checks for termination status (i.e.
45   * {@link #setCommand(String)} finished its execution or
46   * {@link #setTimeout(long)} expired or job was interrupted). The check interval
47   * is given by {@link #setTerminationCheckInterval(long)}.
48   *
49   * When job interrupt is detected tasklet's execution is terminated immediately
50   * by throwing {@link JobInterruptedException}.
51   *
52   * {@link #setInterruptOnCancel(boolean)} specifies whether the tasklet should
53   * attempt to interrupt the thread that executes the system command if it is
54   * still running when tasklet exits (abnormally).
55   *
56   * @author Robert Kasanicky
57   */
58  public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean {
59  
60  	protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class);
61  
62  	private String command;
63  
64  	private String[] environmentParams = null;
65  
66  	private File workingDirectory = null;
67  
68  	private SystemProcessExitCodeMapper systemProcessExitCodeMapper = new SimpleSystemProcessExitCodeMapper();
69  
70  	private long timeout = 0;
71  
72  	private long checkInterval = 1000;
73  
74  	private StepExecution execution = null;
75  
76  	private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
77  
78  	private boolean interruptOnCancel = false;
79  
80  	/**
81  	 * Execute system command and map its exit code to {@link ExitStatus} using
82  	 * {@link SystemProcessExitCodeMapper}.
83  	 */
84  	@Override
85  	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
86  
87  		FutureTask<Integer> systemCommandTask = new FutureTask<Integer>(new Callable<Integer>() {
88  
89  			@Override
90  			public Integer call() throws Exception {
91  				Process process = Runtime.getRuntime().exec(command, environmentParams, workingDirectory);
92  				return process.waitFor();
93  			}
94  
95  		});
96  
97  		long t0 = System.currentTimeMillis();
98  
99  		taskExecutor.execute(systemCommandTask);
100 
101 		while (true) {
102 			Thread.sleep(checkInterval);
103 			if (systemCommandTask.isDone()) {
104 				contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get()));
105 				return RepeatStatus.FINISHED;
106 			}
107 			else if (System.currentTimeMillis() - t0 > timeout) {
108 				systemCommandTask.cancel(interruptOnCancel);
109 				throw new SystemCommandException("Execution of system command did not finish within the timeout");
110 			}
111 			else if (execution.isTerminateOnly()) {
112 				systemCommandTask.cancel(interruptOnCancel);
113 				throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'");
114 			}
115 		}
116 
117 	}
118 
119 	/**
120 	 * @param command command to be executed in a separate system process
121 	 */
122 	public void setCommand(String command) {
123 		this.command = command;
124 	}
125 
126 	/**
127 	 * @param envp environment parameter values, inherited from parent process
128 	 * when not set (or set to null).
129 	 */
130 	public void setEnvironmentParams(String[] envp) {
131 		this.environmentParams = envp;
132 	}
133 
134 	/**
135 	 * @param dir working directory of the spawned process, inherited from
136 	 * parent process when not set (or set to null).
137 	 */
138 	public void setWorkingDirectory(String dir) {
139 		if (dir == null) {
140 			this.workingDirectory = null;
141 			return;
142 		}
143 		this.workingDirectory = new File(dir);
144 		Assert.isTrue(workingDirectory.exists(), "working directory must exist");
145 		Assert.isTrue(workingDirectory.isDirectory(), "working directory value must be a directory");
146 
147 	}
148 
149 	@Override
150 	public void afterPropertiesSet() throws Exception {
151 		Assert.hasLength(command, "'command' property value is required");
152 		Assert.notNull(systemProcessExitCodeMapper, "SystemProcessExitCodeMapper must be set");
153 		Assert.isTrue(timeout > 0, "timeout value must be greater than zero");
154 		Assert.notNull(taskExecutor, "taskExecutor is required");
155 	}
156 
157 	/**
158 	 * @param systemProcessExitCodeMapper maps system process return value to
159 	 * <code>ExitStatus</code> returned by Tasklet.
160 	 * {@link SimpleSystemProcessExitCodeMapper} is used by default.
161 	 */
162 	public void setSystemProcessExitCodeMapper(SystemProcessExitCodeMapper systemProcessExitCodeMapper) {
163 		this.systemProcessExitCodeMapper = systemProcessExitCodeMapper;
164 	}
165 
166 	/**
167 	 * Timeout in milliseconds.
168 	 * @param timeout upper limit for how long the execution of the external
169 	 * program is allowed to last.
170 	 */
171 	public void setTimeout(long timeout) {
172 		this.timeout = timeout;
173 	}
174 
175 	/**
176 	 * The time interval how often the tasklet will check for termination
177 	 * status.
178 	 *
179 	 * @param checkInterval time interval in milliseconds (1 second by default).
180 	 */
181 	public void setTerminationCheckInterval(long checkInterval) {
182 		this.checkInterval = checkInterval;
183 	}
184 
185 	/**
186 	 * Get a reference to {@link StepExecution} for interrupt checks during
187 	 * system command execution.
188 	 */
189 	@Override
190 	public void beforeStep(StepExecution stepExecution) {
191 		this.execution = stepExecution;
192 	}
193 
194 	/**
195 	 * Sets the task executor that will be used to execute the system command
196 	 * NB! Avoid using a synchronous task executor
197 	 */
198 	public void setTaskExecutor(TaskExecutor taskExecutor) {
199 		this.taskExecutor = taskExecutor;
200 	}
201 
202 	/**
203 	 * If <code>true</code> tasklet will attempt to interrupt the thread
204 	 * executing the system command if {@link #setTimeout(long)} has been
205 	 * exceeded or user interrupts the job. <code>false</code> by default
206 	 */
207 	public void setInterruptOnCancel(boolean interruptOnCancel) {
208 		this.interruptOnCancel = interruptOnCancel;
209 	}
210 
211 }