1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core; |
18 | |
19 | import java.io.IOException; |
20 | import java.io.ObjectInputStream; |
21 | import java.util.ArrayList; |
22 | import java.util.Date; |
23 | import java.util.List; |
24 | |
25 | import org.springframework.batch.item.ExecutionContext; |
26 | import org.springframework.util.Assert; |
27 | |
28 | /** |
29 | * Batch domain object representation the execution of a step. Unlike |
30 | * {@link JobExecution}, there are additional properties related the |
31 | * processing of items such as commit count, etc. |
32 | * |
33 | * @author Lucas Ward |
34 | * @author Dave Syer |
35 | * |
36 | */ |
37 | public class StepExecution extends Entity { |
38 | |
39 | private final JobExecution jobExecution; |
40 | |
41 | private final String stepName; |
42 | |
43 | private volatile BatchStatus status = BatchStatus.STARTING; |
44 | |
45 | private volatile int readCount = 0; |
46 | |
47 | private volatile int writeCount = 0; |
48 | |
49 | private volatile int commitCount = 0; |
50 | |
51 | private volatile int rollbackCount = 0; |
52 | |
53 | private volatile int readSkipCount = 0; |
54 | |
55 | private volatile int processSkipCount = 0; |
56 | |
57 | private volatile int writeSkipCount = 0; |
58 | |
59 | private volatile Date startTime = new Date(System.currentTimeMillis()); |
60 | |
61 | private volatile Date endTime = null; |
62 | |
63 | private volatile Date lastUpdated = null; |
64 | |
65 | private volatile ExecutionContext executionContext = new ExecutionContext(); |
66 | |
67 | private volatile ExitStatus exitStatus = ExitStatus.EXECUTING; |
68 | |
69 | private volatile boolean terminateOnly; |
70 | |
71 | private volatile int filterCount; |
72 | |
73 | private transient volatile List<Throwable> failureExceptions = new ArrayList<Throwable>(); |
74 | |
75 | /** |
76 | * Constructor with mandatory properties. |
77 | * |
78 | * @param stepName the step to which this execution belongs |
79 | * @param jobExecution the current job execution |
80 | * @param id the id of this execution |
81 | */ |
82 | public StepExecution(String stepName, JobExecution jobExecution, Long id) { |
83 | this(stepName, jobExecution); |
84 | Assert.notNull(jobExecution, "JobExecution must be provided to re-hydrate an existing StepExecution"); |
85 | Assert.notNull(id, "The entity Id must be provided to re-hydrate an existing StepExecution"); |
86 | setId(id); |
87 | jobExecution.addStepExecution(this); |
88 | } |
89 | |
90 | /** |
91 | * Constructor that substitutes in null for the execution id |
92 | * |
93 | * @param stepName the step to which this execution belongs |
94 | * @param jobExecution the current job execution |
95 | */ |
96 | public StepExecution(String stepName, JobExecution jobExecution) { |
97 | super(); |
98 | Assert.hasLength(stepName); |
99 | this.stepName = stepName; |
100 | this.jobExecution = jobExecution; |
101 | } |
102 | |
103 | /** |
104 | * Returns the {@link ExecutionContext} for this execution |
105 | * |
106 | * @return the attributes |
107 | */ |
108 | public ExecutionContext getExecutionContext() { |
109 | return executionContext; |
110 | } |
111 | |
112 | /** |
113 | * Sets the {@link ExecutionContext} for this execution |
114 | * |
115 | * @param executionContext the attributes |
116 | */ |
117 | public void setExecutionContext(ExecutionContext executionContext) { |
118 | this.executionContext = executionContext; |
119 | } |
120 | |
121 | /** |
122 | * Returns the current number of commits for this execution |
123 | * |
124 | * @return the current number of commits |
125 | */ |
126 | public int getCommitCount() { |
127 | return commitCount; |
128 | } |
129 | |
130 | /** |
131 | * Sets the current number of commits for this execution |
132 | * |
133 | * @param commitCount the current number of commits |
134 | */ |
135 | public void setCommitCount(int commitCount) { |
136 | this.commitCount = commitCount; |
137 | } |
138 | |
139 | /** |
140 | * Returns the time that this execution ended |
141 | * |
142 | * @return the time that this execution ended |
143 | */ |
144 | public Date getEndTime() { |
145 | return endTime; |
146 | } |
147 | |
148 | /** |
149 | * Sets the time that this execution ended |
150 | * |
151 | * @param endTime the time that this execution ended |
152 | */ |
153 | public void setEndTime(Date endTime) { |
154 | this.endTime = endTime; |
155 | } |
156 | |
157 | /** |
158 | * Returns the current number of items read for this execution |
159 | * |
160 | * @return the current number of items read for this execution |
161 | */ |
162 | public int getReadCount() { |
163 | return readCount; |
164 | } |
165 | |
166 | /** |
167 | * Sets the current number of read items for this execution |
168 | * |
169 | * @param readCount the current number of read items for this execution |
170 | */ |
171 | public void setReadCount(int readCount) { |
172 | this.readCount = readCount; |
173 | } |
174 | |
175 | /** |
176 | * Returns the current number of items written for this execution |
177 | * |
178 | * @return the current number of items written for this execution |
179 | */ |
180 | public int getWriteCount() { |
181 | return writeCount; |
182 | } |
183 | |
184 | /** |
185 | * Sets the current number of written items for this execution |
186 | * |
187 | * @param writeCount the current number of written items for this execution |
188 | */ |
189 | public void setWriteCount(int writeCount) { |
190 | this.writeCount = writeCount; |
191 | } |
192 | |
193 | /** |
194 | * Returns the current number of rollbacks for this execution |
195 | * |
196 | * @return the current number of rollbacks for this execution |
197 | */ |
198 | public int getRollbackCount() { |
199 | return rollbackCount; |
200 | } |
201 | |
202 | /** |
203 | * Returns the current number of items filtered out of this execution |
204 | * |
205 | * @return the current number of items filtered out of this execution |
206 | */ |
207 | public int getFilterCount() { |
208 | return filterCount; |
209 | } |
210 | |
211 | /** |
212 | * Public setter for the number of items filtered out of this execution. |
213 | * @param filterCount the number of items filtered out of this execution to |
214 | * set |
215 | */ |
216 | public void setFilterCount(int filterCount) { |
217 | this.filterCount = filterCount; |
218 | } |
219 | |
220 | /** |
221 | * Setter for number of rollbacks for this execution |
222 | */ |
223 | public void setRollbackCount(int rollbackCount) { |
224 | this.rollbackCount = rollbackCount; |
225 | } |
226 | |
227 | /** |
228 | * Gets the time this execution started |
229 | * |
230 | * @return the time this execution started |
231 | */ |
232 | public Date getStartTime() { |
233 | return startTime; |
234 | } |
235 | |
236 | /** |
237 | * Sets the time this execution started |
238 | * |
239 | * @param startTime the time this execution started |
240 | */ |
241 | public void setStartTime(Date startTime) { |
242 | this.startTime = startTime; |
243 | } |
244 | |
245 | /** |
246 | * Returns the current status of this step |
247 | * |
248 | * @return the current status of this step |
249 | */ |
250 | public BatchStatus getStatus() { |
251 | return status; |
252 | } |
253 | |
254 | /** |
255 | * Sets the current status of this step |
256 | * |
257 | * @param status the current status of this step |
258 | */ |
259 | public void setStatus(BatchStatus status) { |
260 | this.status = status; |
261 | } |
262 | |
263 | /** |
264 | * Upgrade the status field if the provided value is greater than the |
265 | * existing one. Clients using this method to set the status can be sure |
266 | * that they don't overwrite a failed status with an successful one. |
267 | * |
268 | * @param status the new status value |
269 | */ |
270 | public void upgradeStatus(BatchStatus status) { |
271 | this.status = this.status.upgradeTo(status); |
272 | } |
273 | |
274 | /** |
275 | * @return the name of the step |
276 | */ |
277 | public String getStepName() { |
278 | return stepName; |
279 | } |
280 | |
281 | /** |
282 | * Accessor for the job execution id. |
283 | * |
284 | * @return the jobExecutionId |
285 | */ |
286 | public Long getJobExecutionId() { |
287 | if (jobExecution != null) { |
288 | return jobExecution.getId(); |
289 | } |
290 | return null; |
291 | } |
292 | |
293 | /** |
294 | * @param exitStatus |
295 | */ |
296 | public void setExitStatus(ExitStatus exitStatus) { |
297 | this.exitStatus = exitStatus; |
298 | } |
299 | |
300 | /** |
301 | * @return the exitCode |
302 | */ |
303 | public ExitStatus getExitStatus() { |
304 | return exitStatus; |
305 | } |
306 | |
307 | /** |
308 | * Accessor for the execution context information of the enclosing job. |
309 | * |
310 | * @return the {@link JobExecution} that was used to start this step |
311 | * execution. |
312 | */ |
313 | public JobExecution getJobExecution() { |
314 | return jobExecution; |
315 | } |
316 | |
317 | /** |
318 | * Factory method for {@link StepContribution}. |
319 | * |
320 | * @return a new {@link StepContribution} |
321 | */ |
322 | public StepContribution createStepContribution() { |
323 | return new StepContribution(this); |
324 | } |
325 | |
326 | /** |
327 | * On successful execution just before a chunk commit, this method should be |
328 | * called. Synchronizes access to the {@link StepExecution} so that changes |
329 | * are atomic. |
330 | * |
331 | * @param contribution |
332 | */ |
333 | public synchronized void apply(StepContribution contribution) { |
334 | readSkipCount += contribution.getReadSkipCount(); |
335 | writeSkipCount += contribution.getWriteSkipCount(); |
336 | processSkipCount += contribution.getProcessSkipCount(); |
337 | filterCount += contribution.getFilterCount(); |
338 | readCount += contribution.getReadCount(); |
339 | writeCount += contribution.getWriteCount(); |
340 | exitStatus = exitStatus.and(contribution.getExitStatus()); |
341 | } |
342 | |
343 | /** |
344 | * On unsuccessful execution after a chunk has rolled back. |
345 | */ |
346 | public synchronized void incrementRollbackCount() { |
347 | rollbackCount++; |
348 | } |
349 | |
350 | /** |
351 | * @return flag to indicate that an execution should halt |
352 | */ |
353 | public boolean isTerminateOnly() { |
354 | return this.terminateOnly; |
355 | } |
356 | |
357 | /** |
358 | * Set a flag that will signal to an execution environment that this |
359 | * execution (and its surrounding job) wishes to exit. |
360 | */ |
361 | public void setTerminateOnly() { |
362 | this.terminateOnly = true; |
363 | } |
364 | |
365 | /** |
366 | * @return the total number of items skipped. |
367 | */ |
368 | public int getSkipCount() { |
369 | return readSkipCount + processSkipCount + writeSkipCount; |
370 | } |
371 | |
372 | /** |
373 | * Increment the number of commits |
374 | */ |
375 | public void incrementCommitCount() { |
376 | commitCount++; |
377 | } |
378 | |
379 | /** |
380 | * Convenience method to get the current job parameters. |
381 | * |
382 | * @return the {@link JobParameters} from the enclosing job, or empty if |
383 | * that is null |
384 | */ |
385 | public JobParameters getJobParameters() { |
386 | if (jobExecution == null || jobExecution.getJobInstance() == null) { |
387 | return new JobParameters(); |
388 | } |
389 | return jobExecution.getJobInstance().getJobParameters(); |
390 | } |
391 | |
392 | /** |
393 | * @return the number of records skipped on read |
394 | */ |
395 | public int getReadSkipCount() { |
396 | return readSkipCount; |
397 | } |
398 | |
399 | /** |
400 | * @return the number of records skipped on write |
401 | */ |
402 | public int getWriteSkipCount() { |
403 | return writeSkipCount; |
404 | } |
405 | |
406 | /** |
407 | * Set the number of records skipped on read |
408 | * |
409 | * @param readSkipCount |
410 | */ |
411 | public void setReadSkipCount(int readSkipCount) { |
412 | this.readSkipCount = readSkipCount; |
413 | } |
414 | |
415 | /** |
416 | * Set the number of records skipped on write |
417 | * |
418 | * @param writeSkipCount |
419 | */ |
420 | public void setWriteSkipCount(int writeSkipCount) { |
421 | this.writeSkipCount = writeSkipCount; |
422 | } |
423 | |
424 | /** |
425 | * @return the number of records skipped during processing |
426 | */ |
427 | public int getProcessSkipCount() { |
428 | return processSkipCount; |
429 | } |
430 | |
431 | /** |
432 | * Set the number of records skipped during processing. |
433 | * |
434 | * @param processSkipCount |
435 | */ |
436 | public void setProcessSkipCount(int processSkipCount) { |
437 | this.processSkipCount = processSkipCount; |
438 | } |
439 | |
440 | /** |
441 | * @return the Date representing the last time this execution was persisted. |
442 | */ |
443 | public Date getLastUpdated() { |
444 | return lastUpdated; |
445 | } |
446 | |
447 | /** |
448 | * Set the time when the StepExecution was last updated before persisting |
449 | * |
450 | * @param lastUpdated |
451 | */ |
452 | public void setLastUpdated(Date lastUpdated) { |
453 | this.lastUpdated = lastUpdated; |
454 | } |
455 | |
456 | public List<Throwable> getFailureExceptions() { |
457 | return failureExceptions; |
458 | } |
459 | |
460 | public void addFailureException(Throwable throwable) { |
461 | this.failureExceptions.add(throwable); |
462 | } |
463 | |
464 | /* |
465 | * (non-Javadoc) |
466 | * |
467 | * @see |
468 | * org.springframework.batch.container.common.domain.Entity#equals(java. |
469 | * lang.Object) |
470 | */ |
471 | public boolean equals(Object obj) { |
472 | |
473 | Object jobExecutionId = getJobExecutionId(); |
474 | if (jobExecutionId == null || !(obj instanceof StepExecution) || getId() == null) { |
475 | return super.equals(obj); |
476 | } |
477 | StepExecution other = (StepExecution) obj; |
478 | |
479 | return stepName.equals(other.getStepName()) && (jobExecutionId.equals(other.getJobExecutionId())); |
480 | } |
481 | |
482 | /** |
483 | * Deserialise and ensure transient fields are re-instantiated when read |
484 | * back |
485 | */ |
486 | private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { |
487 | stream.defaultReadObject(); |
488 | failureExceptions = new ArrayList<Throwable>(); |
489 | } |
490 | |
491 | /* |
492 | * (non-Javadoc) |
493 | * |
494 | * @see org.springframework.batch.container.common.domain.Entity#hashCode() |
495 | */ |
496 | public int hashCode() { |
497 | Object jobExecutionId = getJobExecutionId(); |
498 | return super.hashCode() + 31 * (stepName != null ? stepName.hashCode() : 0) + 91 |
499 | * (jobExecutionId != null ? jobExecutionId.hashCode() : 0); |
500 | } |
501 | |
502 | public String toString() { |
503 | return String.format(getSummary() + ", exitDescription=%s", exitStatus.getExitDescription()); |
504 | } |
505 | |
506 | public String getSummary() { |
507 | return super.toString() |
508 | + String.format( |
509 | ", name=%s, status=%s, exitStatus=%s, readCount=%d, filterCount=%d, writeCount=%d readSkipCount=%d, writeSkipCount=%d" |
510 | + ", processSkipCount=%d, commitCount=%d, rollbackCount=%d", stepName, status, exitStatus.getExitCode(), |
511 | readCount, filterCount, writeCount, readSkipCount, writeSkipCount, processSkipCount, commitCount, rollbackCount); |
512 | } |
513 | |
514 | } |