1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.tasklet; |
18 | |
19 | import java.io.File; |
20 | import java.util.concurrent.Callable; |
21 | import java.util.concurrent.FutureTask; |
22 | |
23 | import org.apache.commons.logging.Log; |
24 | import org.apache.commons.logging.LogFactory; |
25 | import org.springframework.batch.core.ExitStatus; |
26 | import org.springframework.batch.core.JobInterruptedException; |
27 | import org.springframework.batch.core.StepContribution; |
28 | import org.springframework.batch.core.StepExecution; |
29 | import org.springframework.batch.core.listener.StepExecutionListenerSupport; |
30 | import org.springframework.batch.core.scope.context.ChunkContext; |
31 | import org.springframework.batch.repeat.RepeatStatus; |
32 | import org.springframework.beans.factory.InitializingBean; |
33 | import org.springframework.core.task.SimpleAsyncTaskExecutor; |
34 | import org.springframework.core.task.TaskExecutor; |
35 | import org.springframework.util.Assert; |
36 | |
37 | /** |
38 | * {@link Tasklet} that executes a system command. |
39 | * |
40 | * The system command is executed asynchronously using injected |
41 | * {@link #setTaskExecutor(TaskExecutor)} - timeout value is required to be set, |
42 | * so that the batch job does not hang forever if the external process hangs. |
43 | * |
44 | * Tasklet periodically checks for termination status (i.e. |
45 | * {@link #setCommand(String)} finished its execution or |
46 | * {@link #setTimeout(long)} expired or job was interrupted). The check interval |
47 | * is given by {@link #setTerminationCheckInterval(long)}. |
48 | * |
49 | * When job interrupt is detected tasklet's execution is terminated immediately |
50 | * by throwing {@link JobInterruptedException}. |
51 | * |
52 | * {@link #setInterruptOnCancel(boolean)} specifies whether the tasklet should |
53 | * attempt to interrupt the thread that executes the system command if it is |
54 | * still running when tasklet exits (abnormally). |
55 | * |
56 | * @author Robert Kasanicky |
57 | */ |
58 | public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean { |
59 | |
60 | protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class); |
61 | |
62 | private String command; |
63 | |
64 | private String[] environmentParams = null; |
65 | |
66 | private File workingDirectory = null; |
67 | |
68 | private SystemProcessExitCodeMapper systemProcessExitCodeMapper = new SimpleSystemProcessExitCodeMapper(); |
69 | |
70 | private long timeout = 0; |
71 | |
72 | private long checkInterval = 1000; |
73 | |
74 | private StepExecution execution = null; |
75 | |
76 | private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); |
77 | |
78 | private boolean interruptOnCancel = false; |
79 | |
80 | /** |
81 | * Execute system command and map its exit code to {@link ExitStatus} using |
82 | * {@link SystemProcessExitCodeMapper}. |
83 | */ |
84 | public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { |
85 | |
86 | FutureTask<Integer> systemCommandTask = new FutureTask<Integer>(new Callable<Integer>() { |
87 | |
88 | public Integer call() throws Exception { |
89 | Process process = Runtime.getRuntime().exec(command, environmentParams, workingDirectory); |
90 | return process.waitFor(); |
91 | } |
92 | |
93 | }); |
94 | |
95 | long t0 = System.currentTimeMillis(); |
96 | |
97 | taskExecutor.execute(systemCommandTask); |
98 | |
99 | while (true) { |
100 | Thread.sleep(checkInterval); |
101 | if (systemCommandTask.isDone()) { |
102 | contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get())); |
103 | return RepeatStatus.FINISHED; |
104 | } |
105 | else if (System.currentTimeMillis() - t0 > timeout) { |
106 | systemCommandTask.cancel(interruptOnCancel); |
107 | throw new SystemCommandException("Execution of system command did not finish within the timeout"); |
108 | } |
109 | else if (execution.isTerminateOnly()) { |
110 | systemCommandTask.cancel(interruptOnCancel); |
111 | throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'"); |
112 | } |
113 | } |
114 | |
115 | } |
116 | |
117 | /** |
118 | * @param command command to be executed in a separate system process |
119 | */ |
120 | public void setCommand(String command) { |
121 | this.command = command; |
122 | } |
123 | |
124 | /** |
125 | * @param envp environment parameter values, inherited from parent process |
126 | * when not set (or set to null). |
127 | */ |
128 | public void setEnvironmentParams(String[] envp) { |
129 | this.environmentParams = envp; |
130 | } |
131 | |
132 | /** |
133 | * @param dir working directory of the spawned process, inherited from |
134 | * parent process when not set (or set to null). |
135 | */ |
136 | public void setWorkingDirectory(String dir) { |
137 | if (dir == null) { |
138 | this.workingDirectory = null; |
139 | return; |
140 | } |
141 | this.workingDirectory = new File(dir); |
142 | Assert.isTrue(workingDirectory.exists(), "working directory must exist"); |
143 | Assert.isTrue(workingDirectory.isDirectory(), "working directory value must be a directory"); |
144 | |
145 | } |
146 | |
147 | public void afterPropertiesSet() throws Exception { |
148 | Assert.hasLength(command, "'command' property value is required"); |
149 | Assert.notNull(systemProcessExitCodeMapper, "SystemProcessExitCodeMapper must be set"); |
150 | Assert.isTrue(timeout > 0, "timeout value must be greater than zero"); |
151 | Assert.notNull(taskExecutor, "taskExecutor is required"); |
152 | } |
153 | |
154 | /** |
155 | * @param systemProcessExitCodeMapper maps system process return value to |
156 | * <code>ExitStatus</code> returned by Tasklet. |
157 | * {@link SimpleSystemProcessExitCodeMapper} is used by default. |
158 | */ |
159 | public void setSystemProcessExitCodeMapper(SystemProcessExitCodeMapper systemProcessExitCodeMapper) { |
160 | this.systemProcessExitCodeMapper = systemProcessExitCodeMapper; |
161 | } |
162 | |
163 | /** |
164 | * Timeout in milliseconds. |
165 | * @param timeout upper limit for how long the execution of the external |
166 | * program is allowed to last. |
167 | */ |
168 | public void setTimeout(long timeout) { |
169 | this.timeout = timeout; |
170 | } |
171 | |
172 | /** |
173 | * The time interval how often the tasklet will check for termination |
174 | * status. |
175 | * |
176 | * @param checkInterval time interval in milliseconds (1 second by default). |
177 | */ |
178 | public void setTerminationCheckInterval(long checkInterval) { |
179 | this.checkInterval = checkInterval; |
180 | } |
181 | |
182 | /** |
183 | * Get a reference to {@link StepExecution} for interrupt checks during |
184 | * system command execution. |
185 | */ |
186 | @Override |
187 | public void beforeStep(StepExecution stepExecution) { |
188 | this.execution = stepExecution; |
189 | } |
190 | |
191 | /** |
192 | * Sets the task executor that will be used to execute the system command |
193 | * NB! Avoid using a synchronous task executor |
194 | */ |
195 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
196 | this.taskExecutor = taskExecutor; |
197 | } |
198 | |
199 | /** |
200 | * If <code>true</code> tasklet will attempt to interrupt the thread |
201 | * executing the system command if {@link #setTimeout(long)} has been |
202 | * exceeded or user interrupts the job. <code>false</code> by default |
203 | */ |
204 | public void setInterruptOnCancel(boolean interruptOnCancel) { |
205 | this.interruptOnCancel = interruptOnCancel; |
206 | } |
207 | |
208 | } |