1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core; |
18 | |
19 | import java.io.IOException; |
20 | import java.io.ObjectInputStream; |
21 | import java.util.ArrayList; |
22 | import java.util.Date; |
23 | import java.util.List; |
24 | import java.util.concurrent.CopyOnWriteArrayList; |
25 | |
26 | import org.springframework.batch.item.ExecutionContext; |
27 | import org.springframework.util.Assert; |
28 | |
29 | /** |
30 | * Batch domain object representation the execution of a step. Unlike |
31 | * {@link JobExecution}, there are additional properties related the processing |
32 | * of items such as commit count, etc. |
33 | * |
34 | * @author Lucas Ward |
35 | * @author Dave Syer |
36 | * |
37 | */ |
38 | public class StepExecution extends Entity { |
39 | |
40 | private final JobExecution jobExecution; |
41 | |
42 | private final String stepName; |
43 | |
44 | private volatile BatchStatus status = BatchStatus.STARTING; |
45 | |
46 | private volatile int readCount = 0; |
47 | |
48 | private volatile int writeCount = 0; |
49 | |
50 | private volatile int commitCount = 0; |
51 | |
52 | private volatile int rollbackCount = 0; |
53 | |
54 | private volatile int readSkipCount = 0; |
55 | |
56 | private volatile int processSkipCount = 0; |
57 | |
58 | private volatile int writeSkipCount = 0; |
59 | |
60 | private volatile Date startTime = new Date(System.currentTimeMillis()); |
61 | |
62 | private volatile Date endTime = null; |
63 | |
64 | private volatile Date lastUpdated = null; |
65 | |
66 | private volatile ExecutionContext executionContext = new ExecutionContext(); |
67 | |
68 | private volatile ExitStatus exitStatus = ExitStatus.EXECUTING; |
69 | |
70 | private volatile boolean terminateOnly; |
71 | |
72 | private volatile int filterCount; |
73 | |
74 | private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<Throwable>(); |
75 | |
76 | /** |
77 | * Constructor with mandatory properties. |
78 | * |
79 | * @param stepName the step to which this execution belongs |
80 | * @param jobExecution the current job execution |
81 | * @param id the id of this execution |
82 | */ |
83 | public StepExecution(String stepName, JobExecution jobExecution, Long id) { |
84 | this(stepName, jobExecution); |
85 | Assert.notNull(jobExecution, "JobExecution must be provided to re-hydrate an existing StepExecution"); |
86 | Assert.notNull(id, "The entity Id must be provided to re-hydrate an existing StepExecution"); |
87 | setId(id); |
88 | jobExecution.addStepExecution(this); |
89 | } |
90 | |
91 | /** |
92 | * Constructor that substitutes in null for the execution id |
93 | * |
94 | * @param stepName the step to which this execution belongs |
95 | * @param jobExecution the current job execution |
96 | */ |
97 | public StepExecution(String stepName, JobExecution jobExecution) { |
98 | super(); |
99 | Assert.hasLength(stepName); |
100 | this.stepName = stepName; |
101 | this.jobExecution = jobExecution; |
102 | } |
103 | |
104 | /** |
105 | * Returns the {@link ExecutionContext} for this execution |
106 | * |
107 | * @return the attributes |
108 | */ |
109 | public ExecutionContext getExecutionContext() { |
110 | return executionContext; |
111 | } |
112 | |
113 | /** |
114 | * Sets the {@link ExecutionContext} for this execution |
115 | * |
116 | * @param executionContext the attributes |
117 | */ |
118 | public void setExecutionContext(ExecutionContext executionContext) { |
119 | this.executionContext = executionContext; |
120 | } |
121 | |
122 | /** |
123 | * Returns the current number of commits for this execution |
124 | * |
125 | * @return the current number of commits |
126 | */ |
127 | public int getCommitCount() { |
128 | return commitCount; |
129 | } |
130 | |
131 | /** |
132 | * Sets the current number of commits for this execution |
133 | * |
134 | * @param commitCount the current number of commits |
135 | */ |
136 | public void setCommitCount(int commitCount) { |
137 | this.commitCount = commitCount; |
138 | } |
139 | |
140 | /** |
141 | * Returns the time that this execution ended |
142 | * |
143 | * @return the time that this execution ended |
144 | */ |
145 | public Date getEndTime() { |
146 | return endTime; |
147 | } |
148 | |
149 | /** |
150 | * Sets the time that this execution ended |
151 | * |
152 | * @param endTime the time that this execution ended |
153 | */ |
154 | public void setEndTime(Date endTime) { |
155 | this.endTime = endTime; |
156 | } |
157 | |
158 | /** |
159 | * Returns the current number of items read for this execution |
160 | * |
161 | * @return the current number of items read for this execution |
162 | */ |
163 | public int getReadCount() { |
164 | return readCount; |
165 | } |
166 | |
167 | /** |
168 | * Sets the current number of read items for this execution |
169 | * |
170 | * @param readCount the current number of read items for this execution |
171 | */ |
172 | public void setReadCount(int readCount) { |
173 | this.readCount = readCount; |
174 | } |
175 | |
176 | /** |
177 | * Returns the current number of items written for this execution |
178 | * |
179 | * @return the current number of items written for this execution |
180 | */ |
181 | public int getWriteCount() { |
182 | return writeCount; |
183 | } |
184 | |
185 | /** |
186 | * Sets the current number of written items for this execution |
187 | * |
188 | * @param writeCount the current number of written items for this execution |
189 | */ |
190 | public void setWriteCount(int writeCount) { |
191 | this.writeCount = writeCount; |
192 | } |
193 | |
194 | /** |
195 | * Returns the current number of rollbacks for this execution |
196 | * |
197 | * @return the current number of rollbacks for this execution |
198 | */ |
199 | public int getRollbackCount() { |
200 | return rollbackCount; |
201 | } |
202 | |
203 | /** |
204 | * Returns the current number of items filtered out of this execution |
205 | * |
206 | * @return the current number of items filtered out of this execution |
207 | */ |
208 | public int getFilterCount() { |
209 | return filterCount; |
210 | } |
211 | |
212 | /** |
213 | * Public setter for the number of items filtered out of this execution. |
214 | * @param filterCount the number of items filtered out of this execution to |
215 | * set |
216 | */ |
217 | public void setFilterCount(int filterCount) { |
218 | this.filterCount = filterCount; |
219 | } |
220 | |
221 | /** |
222 | * Setter for number of rollbacks for this execution |
223 | */ |
224 | public void setRollbackCount(int rollbackCount) { |
225 | this.rollbackCount = rollbackCount; |
226 | } |
227 | |
228 | /** |
229 | * Gets the time this execution started |
230 | * |
231 | * @return the time this execution started |
232 | */ |
233 | public Date getStartTime() { |
234 | return startTime; |
235 | } |
236 | |
237 | /** |
238 | * Sets the time this execution started |
239 | * |
240 | * @param startTime the time this execution started |
241 | */ |
242 | public void setStartTime(Date startTime) { |
243 | this.startTime = startTime; |
244 | } |
245 | |
246 | /** |
247 | * Returns the current status of this step |
248 | * |
249 | * @return the current status of this step |
250 | */ |
251 | public BatchStatus getStatus() { |
252 | return status; |
253 | } |
254 | |
255 | /** |
256 | * Sets the current status of this step |
257 | * |
258 | * @param status the current status of this step |
259 | */ |
260 | public void setStatus(BatchStatus status) { |
261 | this.status = status; |
262 | } |
263 | |
264 | /** |
265 | * Upgrade the status field if the provided value is greater than the |
266 | * existing one. Clients using this method to set the status can be sure |
267 | * that they don't overwrite a failed status with an successful one. |
268 | * |
269 | * @param status the new status value |
270 | */ |
271 | public void upgradeStatus(BatchStatus status) { |
272 | this.status = this.status.upgradeTo(status); |
273 | } |
274 | |
275 | /** |
276 | * @return the name of the step |
277 | */ |
278 | public String getStepName() { |
279 | return stepName; |
280 | } |
281 | |
282 | /** |
283 | * Accessor for the job execution id. |
284 | * |
285 | * @return the jobExecutionId |
286 | */ |
287 | public Long getJobExecutionId() { |
288 | if (jobExecution != null) { |
289 | return jobExecution.getId(); |
290 | } |
291 | return null; |
292 | } |
293 | |
294 | /** |
295 | * @param exitStatus |
296 | */ |
297 | public void setExitStatus(ExitStatus exitStatus) { |
298 | this.exitStatus = exitStatus; |
299 | } |
300 | |
301 | /** |
302 | * @return the exitCode |
303 | */ |
304 | public ExitStatus getExitStatus() { |
305 | return exitStatus; |
306 | } |
307 | |
308 | /** |
309 | * Accessor for the execution context information of the enclosing job. |
310 | * |
311 | * @return the {@link JobExecution} that was used to start this step |
312 | * execution. |
313 | */ |
314 | public JobExecution getJobExecution() { |
315 | return jobExecution; |
316 | } |
317 | |
318 | /** |
319 | * Factory method for {@link StepContribution}. |
320 | * |
321 | * @return a new {@link StepContribution} |
322 | */ |
323 | public StepContribution createStepContribution() { |
324 | return new StepContribution(this); |
325 | } |
326 | |
327 | /** |
328 | * On successful execution just before a chunk commit, this method should be |
329 | * called. Synchronizes access to the {@link StepExecution} so that changes |
330 | * are atomic. |
331 | * |
332 | * @param contribution |
333 | */ |
334 | public synchronized void apply(StepContribution contribution) { |
335 | readSkipCount += contribution.getReadSkipCount(); |
336 | writeSkipCount += contribution.getWriteSkipCount(); |
337 | processSkipCount += contribution.getProcessSkipCount(); |
338 | filterCount += contribution.getFilterCount(); |
339 | readCount += contribution.getReadCount(); |
340 | writeCount += contribution.getWriteCount(); |
341 | exitStatus = exitStatus.and(contribution.getExitStatus()); |
342 | } |
343 | |
344 | /** |
345 | * On unsuccessful execution after a chunk has rolled back. |
346 | */ |
347 | public synchronized void incrementRollbackCount() { |
348 | rollbackCount++; |
349 | } |
350 | |
351 | /** |
352 | * @return flag to indicate that an execution should halt |
353 | */ |
354 | public boolean isTerminateOnly() { |
355 | return this.terminateOnly; |
356 | } |
357 | |
358 | /** |
359 | * Set a flag that will signal to an execution environment that this |
360 | * execution (and its surrounding job) wishes to exit. |
361 | */ |
362 | public void setTerminateOnly() { |
363 | this.terminateOnly = true; |
364 | } |
365 | |
366 | /** |
367 | * @return the total number of items skipped. |
368 | */ |
369 | public int getSkipCount() { |
370 | return readSkipCount + processSkipCount + writeSkipCount; |
371 | } |
372 | |
373 | /** |
374 | * Increment the number of commits |
375 | */ |
376 | public void incrementCommitCount() { |
377 | commitCount++; |
378 | } |
379 | |
380 | /** |
381 | * Convenience method to get the current job parameters. |
382 | * |
383 | * @return the {@link JobParameters} from the enclosing job, or empty if |
384 | * that is null |
385 | */ |
386 | public JobParameters getJobParameters() { |
387 | if (jobExecution == null || jobExecution.getJobInstance() == null) { |
388 | return new JobParameters(); |
389 | } |
390 | return jobExecution.getJobInstance().getJobParameters(); |
391 | } |
392 | |
393 | /** |
394 | * @return the number of records skipped on read |
395 | */ |
396 | public int getReadSkipCount() { |
397 | return readSkipCount; |
398 | } |
399 | |
400 | /** |
401 | * @return the number of records skipped on write |
402 | */ |
403 | public int getWriteSkipCount() { |
404 | return writeSkipCount; |
405 | } |
406 | |
407 | /** |
408 | * Set the number of records skipped on read |
409 | * |
410 | * @param readSkipCount |
411 | */ |
412 | public void setReadSkipCount(int readSkipCount) { |
413 | this.readSkipCount = readSkipCount; |
414 | } |
415 | |
416 | /** |
417 | * Set the number of records skipped on write |
418 | * |
419 | * @param writeSkipCount |
420 | */ |
421 | public void setWriteSkipCount(int writeSkipCount) { |
422 | this.writeSkipCount = writeSkipCount; |
423 | } |
424 | |
425 | /** |
426 | * @return the number of records skipped during processing |
427 | */ |
428 | public int getProcessSkipCount() { |
429 | return processSkipCount; |
430 | } |
431 | |
432 | /** |
433 | * Set the number of records skipped during processing. |
434 | * |
435 | * @param processSkipCount |
436 | */ |
437 | public void setProcessSkipCount(int processSkipCount) { |
438 | this.processSkipCount = processSkipCount; |
439 | } |
440 | |
441 | /** |
442 | * @return the Date representing the last time this execution was persisted. |
443 | */ |
444 | public Date getLastUpdated() { |
445 | return lastUpdated; |
446 | } |
447 | |
448 | /** |
449 | * Set the time when the StepExecution was last updated before persisting |
450 | * |
451 | * @param lastUpdated |
452 | */ |
453 | public void setLastUpdated(Date lastUpdated) { |
454 | this.lastUpdated = lastUpdated; |
455 | } |
456 | |
457 | public List<Throwable> getFailureExceptions() { |
458 | return failureExceptions; |
459 | } |
460 | |
461 | public void addFailureException(Throwable throwable) { |
462 | this.failureExceptions.add(throwable); |
463 | } |
464 | |
465 | /* |
466 | * (non-Javadoc) |
467 | * |
468 | * @see |
469 | * org.springframework.batch.container.common.domain.Entity#equals(java. |
470 | * lang.Object) |
471 | */ |
472 | public boolean equals(Object obj) { |
473 | |
474 | Object jobExecutionId = getJobExecutionId(); |
475 | if (jobExecutionId == null || !(obj instanceof StepExecution) || getId() == null) { |
476 | return super.equals(obj); |
477 | } |
478 | StepExecution other = (StepExecution) obj; |
479 | |
480 | return stepName.equals(other.getStepName()) && (jobExecutionId.equals(other.getJobExecutionId())) |
481 | && getId().equals(other.getId()); |
482 | } |
483 | |
484 | /** |
485 | * Deserialise and ensure transient fields are re-instantiated when read |
486 | * back |
487 | */ |
488 | private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { |
489 | stream.defaultReadObject(); |
490 | failureExceptions = new ArrayList<Throwable>(); |
491 | } |
492 | |
493 | /* |
494 | * (non-Javadoc) |
495 | * |
496 | * @see org.springframework.batch.container.common.domain.Entity#hashCode() |
497 | */ |
498 | public int hashCode() { |
499 | Object jobExecutionId = getJobExecutionId(); |
500 | Long id = getId(); |
501 | return super.hashCode() + 31 * (stepName != null ? stepName.hashCode() : 0) + 91 |
502 | * (jobExecutionId != null ? jobExecutionId.hashCode() : 0) + 59 * (id != null ? id.hashCode() : 0); |
503 | } |
504 | |
505 | public String toString() { |
506 | return String.format(getSummary() + ", exitDescription=%s", exitStatus.getExitDescription()); |
507 | } |
508 | |
509 | public String getSummary() { |
510 | return super.toString() |
511 | + String.format( |
512 | ", name=%s, status=%s, exitStatus=%s, readCount=%d, filterCount=%d, writeCount=%d readSkipCount=%d, writeSkipCount=%d" |
513 | + ", processSkipCount=%d, commitCount=%d, rollbackCount=%d", stepName, status, |
514 | exitStatus.getExitCode(), readCount, filterCount, writeCount, readSkipCount, writeSkipCount, |
515 | processSkipCount, commitCount, rollbackCount); |
516 | } |
517 | |
518 | } |