1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.tasklet; |
18 | |
19 | import java.io.File; |
20 | import java.util.concurrent.Callable; |
21 | import java.util.concurrent.FutureTask; |
22 | |
23 | import org.apache.commons.logging.Log; |
24 | import org.apache.commons.logging.LogFactory; |
25 | import org.springframework.batch.core.ExitStatus; |
26 | import org.springframework.batch.core.JobInterruptedException; |
27 | import org.springframework.batch.core.StepContribution; |
28 | import org.springframework.batch.core.StepExecution; |
29 | import org.springframework.batch.core.listener.StepExecutionListenerSupport; |
30 | import org.springframework.batch.core.scope.context.ChunkContext; |
31 | import org.springframework.batch.repeat.RepeatStatus; |
32 | import org.springframework.beans.factory.InitializingBean; |
33 | import org.springframework.core.task.SimpleAsyncTaskExecutor; |
34 | import org.springframework.core.task.TaskExecutor; |
35 | import org.springframework.util.Assert; |
36 | |
37 | /** |
38 | * {@link Tasklet} that executes a system command. |
39 | * |
40 | * The system command is executed asynchronously using injected |
41 | * {@link #setTaskExecutor(TaskExecutor)} - timeout value is required to be set, |
42 | * so that the batch job does not hang forever if the external process hangs. |
43 | * |
44 | * Tasklet periodically checks for termination status (i.e. |
45 | * {@link #setCommand(String)} finished its execution or |
46 | * {@link #setTimeout(long)} expired or job was interrupted). The check interval |
47 | * is given by {@link #setTerminationCheckInterval(long)}. |
48 | * |
49 | * When job interrupt is detected tasklet's execution is terminated immediately |
50 | * by throwing {@link JobInterruptedException}. |
51 | * |
52 | * {@link #setInterruptOnCancel(boolean)} specifies whether the tasklet should |
53 | * attempt to interrupt the thread that executes the system command if it is |
54 | * still running when tasklet exits (abnormally). |
55 | * |
56 | * @author Robert Kasanicky |
57 | */ |
58 | public class SystemCommandTasklet extends StepExecutionListenerSupport implements Tasklet, InitializingBean { |
59 | |
60 | protected static final Log logger = LogFactory.getLog(SystemCommandTasklet.class); |
61 | |
62 | private String command; |
63 | |
64 | private String[] environmentParams = null; |
65 | |
66 | private File workingDirectory = null; |
67 | |
68 | private SystemProcessExitCodeMapper systemProcessExitCodeMapper = new SimpleSystemProcessExitCodeMapper(); |
69 | |
70 | private long timeout = 0; |
71 | |
72 | private long checkInterval = 1000; |
73 | |
74 | private StepExecution execution = null; |
75 | |
76 | private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(); |
77 | |
78 | private boolean interruptOnCancel = false; |
79 | |
80 | /** |
81 | * Execute system command and map its exit code to {@link ExitStatus} using |
82 | * {@link SystemProcessExitCodeMapper}. |
83 | */ |
84 | @Override |
85 | public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { |
86 | |
87 | FutureTask<Integer> systemCommandTask = new FutureTask<Integer>(new Callable<Integer>() { |
88 | |
89 | @Override |
90 | public Integer call() throws Exception { |
91 | Process process = Runtime.getRuntime().exec(command, environmentParams, workingDirectory); |
92 | return process.waitFor(); |
93 | } |
94 | |
95 | }); |
96 | |
97 | long t0 = System.currentTimeMillis(); |
98 | |
99 | taskExecutor.execute(systemCommandTask); |
100 | |
101 | while (true) { |
102 | Thread.sleep(checkInterval); |
103 | if (systemCommandTask.isDone()) { |
104 | contribution.setExitStatus(systemProcessExitCodeMapper.getExitStatus(systemCommandTask.get())); |
105 | return RepeatStatus.FINISHED; |
106 | } |
107 | else if (System.currentTimeMillis() - t0 > timeout) { |
108 | systemCommandTask.cancel(interruptOnCancel); |
109 | throw new SystemCommandException("Execution of system command did not finish within the timeout"); |
110 | } |
111 | else if (execution.isTerminateOnly()) { |
112 | systemCommandTask.cancel(interruptOnCancel); |
113 | throw new JobInterruptedException("Job interrupted while executing system command '" + command + "'"); |
114 | } |
115 | } |
116 | |
117 | } |
118 | |
119 | /** |
120 | * @param command command to be executed in a separate system process |
121 | */ |
122 | public void setCommand(String command) { |
123 | this.command = command; |
124 | } |
125 | |
126 | /** |
127 | * @param envp environment parameter values, inherited from parent process |
128 | * when not set (or set to null). |
129 | */ |
130 | public void setEnvironmentParams(String[] envp) { |
131 | this.environmentParams = envp; |
132 | } |
133 | |
134 | /** |
135 | * @param dir working directory of the spawned process, inherited from |
136 | * parent process when not set (or set to null). |
137 | */ |
138 | public void setWorkingDirectory(String dir) { |
139 | if (dir == null) { |
140 | this.workingDirectory = null; |
141 | return; |
142 | } |
143 | this.workingDirectory = new File(dir); |
144 | Assert.isTrue(workingDirectory.exists(), "working directory must exist"); |
145 | Assert.isTrue(workingDirectory.isDirectory(), "working directory value must be a directory"); |
146 | |
147 | } |
148 | |
149 | @Override |
150 | public void afterPropertiesSet() throws Exception { |
151 | Assert.hasLength(command, "'command' property value is required"); |
152 | Assert.notNull(systemProcessExitCodeMapper, "SystemProcessExitCodeMapper must be set"); |
153 | Assert.isTrue(timeout > 0, "timeout value must be greater than zero"); |
154 | Assert.notNull(taskExecutor, "taskExecutor is required"); |
155 | } |
156 | |
157 | /** |
158 | * @param systemProcessExitCodeMapper maps system process return value to |
159 | * <code>ExitStatus</code> returned by Tasklet. |
160 | * {@link SimpleSystemProcessExitCodeMapper} is used by default. |
161 | */ |
162 | public void setSystemProcessExitCodeMapper(SystemProcessExitCodeMapper systemProcessExitCodeMapper) { |
163 | this.systemProcessExitCodeMapper = systemProcessExitCodeMapper; |
164 | } |
165 | |
166 | /** |
167 | * Timeout in milliseconds. |
168 | * @param timeout upper limit for how long the execution of the external |
169 | * program is allowed to last. |
170 | */ |
171 | public void setTimeout(long timeout) { |
172 | this.timeout = timeout; |
173 | } |
174 | |
175 | /** |
176 | * The time interval how often the tasklet will check for termination |
177 | * status. |
178 | * |
179 | * @param checkInterval time interval in milliseconds (1 second by default). |
180 | */ |
181 | public void setTerminationCheckInterval(long checkInterval) { |
182 | this.checkInterval = checkInterval; |
183 | } |
184 | |
185 | /** |
186 | * Get a reference to {@link StepExecution} for interrupt checks during |
187 | * system command execution. |
188 | */ |
189 | @Override |
190 | public void beforeStep(StepExecution stepExecution) { |
191 | this.execution = stepExecution; |
192 | } |
193 | |
194 | /** |
195 | * Sets the task executor that will be used to execute the system command |
196 | * NB! Avoid using a synchronous task executor |
197 | */ |
198 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
199 | this.taskExecutor = taskExecutor; |
200 | } |
201 | |
202 | /** |
203 | * If <code>true</code> tasklet will attempt to interrupt the thread |
204 | * executing the system command if {@link #setTimeout(long)} has been |
205 | * exceeded or user interrupts the job. <code>false</code> by default |
206 | */ |
207 | public void setInterruptOnCancel(boolean interruptOnCancel) { |
208 | this.interruptOnCancel = interruptOnCancel; |
209 | } |
210 | |
211 | } |