1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core; |
18 | |
19 | import java.io.IOException; |
20 | import java.io.ObjectInputStream; |
21 | import java.util.ArrayList; |
22 | import java.util.Date; |
23 | import java.util.List; |
24 | import java.util.concurrent.CopyOnWriteArrayList; |
25 | |
26 | import org.springframework.batch.item.ExecutionContext; |
27 | import org.springframework.util.Assert; |
28 | |
29 | /** |
30 | * Batch domain object representation the execution of a step. Unlike |
31 | * {@link JobExecution}, there are additional properties related the processing |
32 | * of items such as commit count, etc. |
33 | * |
34 | * @author Lucas Ward |
35 | * @author Dave Syer |
36 | * |
37 | */ |
38 | @SuppressWarnings("serial") |
39 | public class StepExecution extends Entity { |
40 | |
41 | private final JobExecution jobExecution; |
42 | |
43 | private final String stepName; |
44 | |
45 | private volatile BatchStatus status = BatchStatus.STARTING; |
46 | |
47 | private volatile int readCount = 0; |
48 | |
49 | private volatile int writeCount = 0; |
50 | |
51 | private volatile int commitCount = 0; |
52 | |
53 | private volatile int rollbackCount = 0; |
54 | |
55 | private volatile int readSkipCount = 0; |
56 | |
57 | private volatile int processSkipCount = 0; |
58 | |
59 | private volatile int writeSkipCount = 0; |
60 | |
61 | private volatile Date startTime = new Date(System.currentTimeMillis()); |
62 | |
63 | private volatile Date endTime = null; |
64 | |
65 | private volatile Date lastUpdated = null; |
66 | |
67 | private volatile ExecutionContext executionContext = new ExecutionContext(); |
68 | |
69 | private volatile ExitStatus exitStatus = ExitStatus.EXECUTING; |
70 | |
71 | private volatile boolean terminateOnly; |
72 | |
73 | private volatile int filterCount; |
74 | |
75 | private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<Throwable>(); |
76 | |
77 | /** |
78 | * Constructor with mandatory properties. |
79 | * |
80 | * @param stepName the step to which this execution belongs |
81 | * @param jobExecution the current job execution |
82 | * @param id the id of this execution |
83 | */ |
84 | public StepExecution(String stepName, JobExecution jobExecution, Long id) { |
85 | this(stepName, jobExecution); |
86 | Assert.notNull(jobExecution, "JobExecution must be provided to re-hydrate an existing StepExecution"); |
87 | Assert.notNull(id, "The entity Id must be provided to re-hydrate an existing StepExecution"); |
88 | setId(id); |
89 | jobExecution.addStepExecution(this); |
90 | } |
91 | |
92 | /** |
93 | * Constructor that substitutes in null for the execution id |
94 | * |
95 | * @param stepName the step to which this execution belongs |
96 | * @param jobExecution the current job execution |
97 | */ |
98 | public StepExecution(String stepName, JobExecution jobExecution) { |
99 | super(); |
100 | Assert.hasLength(stepName); |
101 | this.stepName = stepName; |
102 | this.jobExecution = jobExecution; |
103 | } |
104 | |
105 | /** |
106 | * Constructor that requires only a stepName. Intended only to be |
107 | * used via serialization libraries to address the circular |
108 | * reference between {@link JobExecution} and StepExecution. |
109 | * |
110 | * @param stepName the name of the executed step |
111 | */ |
112 | @SuppressWarnings("unused") |
113 | private StepExecution(String stepName) { |
114 | super(); |
115 | Assert.hasLength(stepName); |
116 | this.stepName = stepName; |
117 | this.jobExecution = null; |
118 | } |
119 | |
120 | /** |
121 | * Returns the {@link ExecutionContext} for this execution |
122 | * |
123 | * @return the attributes |
124 | */ |
125 | public ExecutionContext getExecutionContext() { |
126 | return executionContext; |
127 | } |
128 | |
129 | /** |
130 | * Sets the {@link ExecutionContext} for this execution |
131 | * |
132 | * @param executionContext the attributes |
133 | */ |
134 | public void setExecutionContext(ExecutionContext executionContext) { |
135 | this.executionContext = executionContext; |
136 | } |
137 | |
138 | /** |
139 | * Returns the current number of commits for this execution |
140 | * |
141 | * @return the current number of commits |
142 | */ |
143 | public int getCommitCount() { |
144 | return commitCount; |
145 | } |
146 | |
147 | /** |
148 | * Sets the current number of commits for this execution |
149 | * |
150 | * @param commitCount the current number of commits |
151 | */ |
152 | public void setCommitCount(int commitCount) { |
153 | this.commitCount = commitCount; |
154 | } |
155 | |
156 | /** |
157 | * Returns the time that this execution ended |
158 | * |
159 | * @return the time that this execution ended |
160 | */ |
161 | public Date getEndTime() { |
162 | return endTime; |
163 | } |
164 | |
165 | /** |
166 | * Sets the time that this execution ended |
167 | * |
168 | * @param endTime the time that this execution ended |
169 | */ |
170 | public void setEndTime(Date endTime) { |
171 | this.endTime = endTime; |
172 | } |
173 | |
174 | /** |
175 | * Returns the current number of items read for this execution |
176 | * |
177 | * @return the current number of items read for this execution |
178 | */ |
179 | public int getReadCount() { |
180 | return readCount; |
181 | } |
182 | |
183 | /** |
184 | * Sets the current number of read items for this execution |
185 | * |
186 | * @param readCount the current number of read items for this execution |
187 | */ |
188 | public void setReadCount(int readCount) { |
189 | this.readCount = readCount; |
190 | } |
191 | |
192 | /** |
193 | * Returns the current number of items written for this execution |
194 | * |
195 | * @return the current number of items written for this execution |
196 | */ |
197 | public int getWriteCount() { |
198 | return writeCount; |
199 | } |
200 | |
201 | /** |
202 | * Sets the current number of written items for this execution |
203 | * |
204 | * @param writeCount the current number of written items for this execution |
205 | */ |
206 | public void setWriteCount(int writeCount) { |
207 | this.writeCount = writeCount; |
208 | } |
209 | |
210 | /** |
211 | * Returns the current number of rollbacks for this execution |
212 | * |
213 | * @return the current number of rollbacks for this execution |
214 | */ |
215 | public int getRollbackCount() { |
216 | return rollbackCount; |
217 | } |
218 | |
219 | /** |
220 | * Returns the current number of items filtered out of this execution |
221 | * |
222 | * @return the current number of items filtered out of this execution |
223 | */ |
224 | public int getFilterCount() { |
225 | return filterCount; |
226 | } |
227 | |
228 | /** |
229 | * Public setter for the number of items filtered out of this execution. |
230 | * @param filterCount the number of items filtered out of this execution to |
231 | * set |
232 | */ |
233 | public void setFilterCount(int filterCount) { |
234 | this.filterCount = filterCount; |
235 | } |
236 | |
237 | /** |
238 | * Setter for number of rollbacks for this execution |
239 | */ |
240 | public void setRollbackCount(int rollbackCount) { |
241 | this.rollbackCount = rollbackCount; |
242 | } |
243 | |
244 | /** |
245 | * Gets the time this execution started |
246 | * |
247 | * @return the time this execution started |
248 | */ |
249 | public Date getStartTime() { |
250 | return startTime; |
251 | } |
252 | |
253 | /** |
254 | * Sets the time this execution started |
255 | * |
256 | * @param startTime the time this execution started |
257 | */ |
258 | public void setStartTime(Date startTime) { |
259 | this.startTime = startTime; |
260 | } |
261 | |
262 | /** |
263 | * Returns the current status of this step |
264 | * |
265 | * @return the current status of this step |
266 | */ |
267 | public BatchStatus getStatus() { |
268 | return status; |
269 | } |
270 | |
271 | /** |
272 | * Sets the current status of this step |
273 | * |
274 | * @param status the current status of this step |
275 | */ |
276 | public void setStatus(BatchStatus status) { |
277 | this.status = status; |
278 | } |
279 | |
280 | /** |
281 | * Upgrade the status field if the provided value is greater than the |
282 | * existing one. Clients using this method to set the status can be sure |
283 | * that they don't overwrite a failed status with an successful one. |
284 | * |
285 | * @param status the new status value |
286 | */ |
287 | public void upgradeStatus(BatchStatus status) { |
288 | this.status = this.status.upgradeTo(status); |
289 | } |
290 | |
291 | /** |
292 | * @return the name of the step |
293 | */ |
294 | public String getStepName() { |
295 | return stepName; |
296 | } |
297 | |
298 | /** |
299 | * Accessor for the job execution id. |
300 | * |
301 | * @return the jobExecutionId |
302 | */ |
303 | public Long getJobExecutionId() { |
304 | if (jobExecution != null) { |
305 | return jobExecution.getId(); |
306 | } |
307 | return null; |
308 | } |
309 | |
310 | /** |
311 | * @param exitStatus |
312 | */ |
313 | public void setExitStatus(ExitStatus exitStatus) { |
314 | this.exitStatus = exitStatus; |
315 | } |
316 | |
317 | /** |
318 | * @return the exitCode |
319 | */ |
320 | public ExitStatus getExitStatus() { |
321 | return exitStatus; |
322 | } |
323 | |
324 | /** |
325 | * Accessor for the execution context information of the enclosing job. |
326 | * |
327 | * @return the {@link JobExecution} that was used to start this step |
328 | * execution. |
329 | */ |
330 | public JobExecution getJobExecution() { |
331 | return jobExecution; |
332 | } |
333 | |
334 | /** |
335 | * Factory method for {@link StepContribution}. |
336 | * |
337 | * @return a new {@link StepContribution} |
338 | */ |
339 | public StepContribution createStepContribution() { |
340 | return new StepContribution(this); |
341 | } |
342 | |
343 | /** |
344 | * On successful execution just before a chunk commit, this method should be |
345 | * called. Synchronizes access to the {@link StepExecution} so that changes |
346 | * are atomic. |
347 | * |
348 | * @param contribution |
349 | */ |
350 | public synchronized void apply(StepContribution contribution) { |
351 | readSkipCount += contribution.getReadSkipCount(); |
352 | writeSkipCount += contribution.getWriteSkipCount(); |
353 | processSkipCount += contribution.getProcessSkipCount(); |
354 | filterCount += contribution.getFilterCount(); |
355 | readCount += contribution.getReadCount(); |
356 | writeCount += contribution.getWriteCount(); |
357 | exitStatus = exitStatus.and(contribution.getExitStatus()); |
358 | } |
359 | |
360 | /** |
361 | * On unsuccessful execution after a chunk has rolled back. |
362 | */ |
363 | public synchronized void incrementRollbackCount() { |
364 | rollbackCount++; |
365 | } |
366 | |
367 | /** |
368 | * @return flag to indicate that an execution should halt |
369 | */ |
370 | public boolean isTerminateOnly() { |
371 | return this.terminateOnly; |
372 | } |
373 | |
374 | /** |
375 | * Set a flag that will signal to an execution environment that this |
376 | * execution (and its surrounding job) wishes to exit. |
377 | */ |
378 | public void setTerminateOnly() { |
379 | this.terminateOnly = true; |
380 | } |
381 | |
382 | /** |
383 | * @return the total number of items skipped. |
384 | */ |
385 | public int getSkipCount() { |
386 | return readSkipCount + processSkipCount + writeSkipCount; |
387 | } |
388 | |
389 | /** |
390 | * Increment the number of commits |
391 | */ |
392 | public void incrementCommitCount() { |
393 | commitCount++; |
394 | } |
395 | |
396 | /** |
397 | * Convenience method to get the current job parameters. |
398 | * |
399 | * @return the {@link JobParameters} from the enclosing job, or empty if |
400 | * that is null |
401 | */ |
402 | public JobParameters getJobParameters() { |
403 | if (jobExecution == null) { |
404 | return new JobParameters(); |
405 | } |
406 | return jobExecution.getJobParameters(); |
407 | } |
408 | |
409 | /** |
410 | * @return the number of records skipped on read |
411 | */ |
412 | public int getReadSkipCount() { |
413 | return readSkipCount; |
414 | } |
415 | |
416 | /** |
417 | * @return the number of records skipped on write |
418 | */ |
419 | public int getWriteSkipCount() { |
420 | return writeSkipCount; |
421 | } |
422 | |
423 | /** |
424 | * Set the number of records skipped on read |
425 | * |
426 | * @param readSkipCount |
427 | */ |
428 | public void setReadSkipCount(int readSkipCount) { |
429 | this.readSkipCount = readSkipCount; |
430 | } |
431 | |
432 | /** |
433 | * Set the number of records skipped on write |
434 | * |
435 | * @param writeSkipCount |
436 | */ |
437 | public void setWriteSkipCount(int writeSkipCount) { |
438 | this.writeSkipCount = writeSkipCount; |
439 | } |
440 | |
441 | /** |
442 | * @return the number of records skipped during processing |
443 | */ |
444 | public int getProcessSkipCount() { |
445 | return processSkipCount; |
446 | } |
447 | |
448 | /** |
449 | * Set the number of records skipped during processing. |
450 | * |
451 | * @param processSkipCount |
452 | */ |
453 | public void setProcessSkipCount(int processSkipCount) { |
454 | this.processSkipCount = processSkipCount; |
455 | } |
456 | |
457 | /** |
458 | * @return the Date representing the last time this execution was persisted. |
459 | */ |
460 | public Date getLastUpdated() { |
461 | return lastUpdated; |
462 | } |
463 | |
464 | /** |
465 | * Set the time when the StepExecution was last updated before persisting |
466 | * |
467 | * @param lastUpdated |
468 | */ |
469 | public void setLastUpdated(Date lastUpdated) { |
470 | this.lastUpdated = lastUpdated; |
471 | } |
472 | |
473 | public List<Throwable> getFailureExceptions() { |
474 | return failureExceptions; |
475 | } |
476 | |
477 | public void addFailureException(Throwable throwable) { |
478 | this.failureExceptions.add(throwable); |
479 | } |
480 | |
481 | /* |
482 | * (non-Javadoc) |
483 | * |
484 | * @see |
485 | * org.springframework.batch.container.common.domain.Entity#equals(java. |
486 | * lang.Object) |
487 | */ |
488 | @Override |
489 | public boolean equals(Object obj) { |
490 | |
491 | Object jobExecutionId = getJobExecutionId(); |
492 | if (jobExecutionId == null || !(obj instanceof StepExecution) || getId() == null) { |
493 | return super.equals(obj); |
494 | } |
495 | StepExecution other = (StepExecution) obj; |
496 | |
497 | return stepName.equals(other.getStepName()) && (jobExecutionId.equals(other.getJobExecutionId())) |
498 | && getId().equals(other.getId()); |
499 | } |
500 | |
501 | /** |
502 | * Deserialise and ensure transient fields are re-instantiated when read |
503 | * back |
504 | */ |
505 | private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { |
506 | stream.defaultReadObject(); |
507 | failureExceptions = new ArrayList<Throwable>(); |
508 | } |
509 | |
510 | /* |
511 | * (non-Javadoc) |
512 | * |
513 | * @see org.springframework.batch.container.common.domain.Entity#hashCode() |
514 | */ |
515 | @Override |
516 | public int hashCode() { |
517 | Object jobExecutionId = getJobExecutionId(); |
518 | Long id = getId(); |
519 | return super.hashCode() + 31 * (stepName != null ? stepName.hashCode() : 0) + 91 |
520 | * (jobExecutionId != null ? jobExecutionId.hashCode() : 0) + 59 * (id != null ? id.hashCode() : 0); |
521 | } |
522 | |
523 | @Override |
524 | public String toString() { |
525 | return String.format(getSummary() + ", exitDescription=%s", exitStatus.getExitDescription()); |
526 | } |
527 | |
528 | public String getSummary() { |
529 | return super.toString() |
530 | + String.format( |
531 | ", name=%s, status=%s, exitStatus=%s, readCount=%d, filterCount=%d, writeCount=%d readSkipCount=%d, writeSkipCount=%d" |
532 | + ", processSkipCount=%d, commitCount=%d, rollbackCount=%d", stepName, status, |
533 | exitStatus.getExitCode(), readCount, filterCount, writeCount, readSkipCount, writeSkipCount, |
534 | processSkipCount, commitCount, rollbackCount); |
535 | } |
536 | |
537 | } |