View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core;
18  
19  import java.io.IOException;
20  import java.io.ObjectInputStream;
21  import java.util.ArrayList;
22  import java.util.Date;
23  import java.util.List;
24  import java.util.concurrent.CopyOnWriteArrayList;
25  
26  import org.springframework.batch.item.ExecutionContext;
27  import org.springframework.util.Assert;
28  
29  /**
30   * Batch domain object representation the execution of a step. Unlike
31   * {@link JobExecution}, there are additional properties related the processing
32   * of items such as commit count, etc.
33   * 
34   * @author Lucas Ward
35   * @author Dave Syer
36   * 
37   */
38  public class StepExecution extends Entity {
39  
40  	private final JobExecution jobExecution;
41  
42  	private final String stepName;
43  
44  	private volatile BatchStatus status = BatchStatus.STARTING;
45  
46  	private volatile int readCount = 0;
47  
48  	private volatile int writeCount = 0;
49  
50  	private volatile int commitCount = 0;
51  
52  	private volatile int rollbackCount = 0;
53  
54  	private volatile int readSkipCount = 0;
55  
56  	private volatile int processSkipCount = 0;
57  
58  	private volatile int writeSkipCount = 0;
59  
60  	private volatile Date startTime = new Date(System.currentTimeMillis());
61  
62  	private volatile Date endTime = null;
63  
64  	private volatile Date lastUpdated = null;
65  
66  	private volatile ExecutionContext executionContext = new ExecutionContext();
67  
68  	private volatile ExitStatus exitStatus = ExitStatus.EXECUTING;
69  
70  	private volatile boolean terminateOnly;
71  
72  	private volatile int filterCount;
73  
74  	private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<Throwable>();
75  
76  	/**
77  	 * Constructor with mandatory properties.
78  	 * 
79  	 * @param stepName the step to which this execution belongs
80  	 * @param jobExecution the current job execution
81  	 * @param id the id of this execution
82  	 */
83  	public StepExecution(String stepName, JobExecution jobExecution, Long id) {
84  		this(stepName, jobExecution);
85  		Assert.notNull(jobExecution, "JobExecution must be provided to re-hydrate an existing StepExecution");
86  		Assert.notNull(id, "The entity Id must be provided to re-hydrate an existing StepExecution");
87  		setId(id);
88  		jobExecution.addStepExecution(this);
89  	}
90  
91  	/**
92  	 * Constructor that substitutes in null for the execution id
93  	 * 
94  	 * @param stepName the step to which this execution belongs
95  	 * @param jobExecution the current job execution
96  	 */
97  	public StepExecution(String stepName, JobExecution jobExecution) {
98  		super();
99  		Assert.hasLength(stepName);
100 		this.stepName = stepName;
101 		this.jobExecution = jobExecution;
102 	}
103 
104 	/**
105 	 * Returns the {@link ExecutionContext} for this execution
106 	 * 
107 	 * @return the attributes
108 	 */
109 	public ExecutionContext getExecutionContext() {
110 		return executionContext;
111 	}
112 
113 	/**
114 	 * Sets the {@link ExecutionContext} for this execution
115 	 * 
116 	 * @param executionContext the attributes
117 	 */
118 	public void setExecutionContext(ExecutionContext executionContext) {
119 		this.executionContext = executionContext;
120 	}
121 
122 	/**
123 	 * Returns the current number of commits for this execution
124 	 * 
125 	 * @return the current number of commits
126 	 */
127 	public int getCommitCount() {
128 		return commitCount;
129 	}
130 
131 	/**
132 	 * Sets the current number of commits for this execution
133 	 * 
134 	 * @param commitCount the current number of commits
135 	 */
136 	public void setCommitCount(int commitCount) {
137 		this.commitCount = commitCount;
138 	}
139 
140 	/**
141 	 * Returns the time that this execution ended
142 	 * 
143 	 * @return the time that this execution ended
144 	 */
145 	public Date getEndTime() {
146 		return endTime;
147 	}
148 
149 	/**
150 	 * Sets the time that this execution ended
151 	 * 
152 	 * @param endTime the time that this execution ended
153 	 */
154 	public void setEndTime(Date endTime) {
155 		this.endTime = endTime;
156 	}
157 
158 	/**
159 	 * Returns the current number of items read for this execution
160 	 * 
161 	 * @return the current number of items read for this execution
162 	 */
163 	public int getReadCount() {
164 		return readCount;
165 	}
166 
167 	/**
168 	 * Sets the current number of read items for this execution
169 	 * 
170 	 * @param readCount the current number of read items for this execution
171 	 */
172 	public void setReadCount(int readCount) {
173 		this.readCount = readCount;
174 	}
175 
176 	/**
177 	 * Returns the current number of items written for this execution
178 	 * 
179 	 * @return the current number of items written for this execution
180 	 */
181 	public int getWriteCount() {
182 		return writeCount;
183 	}
184 
185 	/**
186 	 * Sets the current number of written items for this execution
187 	 * 
188 	 * @param writeCount the current number of written items for this execution
189 	 */
190 	public void setWriteCount(int writeCount) {
191 		this.writeCount = writeCount;
192 	}
193 
194 	/**
195 	 * Returns the current number of rollbacks for this execution
196 	 * 
197 	 * @return the current number of rollbacks for this execution
198 	 */
199 	public int getRollbackCount() {
200 		return rollbackCount;
201 	}
202 
203 	/**
204 	 * Returns the current number of items filtered out of this execution
205 	 * 
206 	 * @return the current number of items filtered out of this execution
207 	 */
208 	public int getFilterCount() {
209 		return filterCount;
210 	}
211 
212 	/**
213 	 * Public setter for the number of items filtered out of this execution.
214 	 * @param filterCount the number of items filtered out of this execution to
215 	 * set
216 	 */
217 	public void setFilterCount(int filterCount) {
218 		this.filterCount = filterCount;
219 	}
220 
221 	/**
222 	 * Setter for number of rollbacks for this execution
223 	 */
224 	public void setRollbackCount(int rollbackCount) {
225 		this.rollbackCount = rollbackCount;
226 	}
227 
228 	/**
229 	 * Gets the time this execution started
230 	 * 
231 	 * @return the time this execution started
232 	 */
233 	public Date getStartTime() {
234 		return startTime;
235 	}
236 
237 	/**
238 	 * Sets the time this execution started
239 	 * 
240 	 * @param startTime the time this execution started
241 	 */
242 	public void setStartTime(Date startTime) {
243 		this.startTime = startTime;
244 	}
245 
246 	/**
247 	 * Returns the current status of this step
248 	 * 
249 	 * @return the current status of this step
250 	 */
251 	public BatchStatus getStatus() {
252 		return status;
253 	}
254 
255 	/**
256 	 * Sets the current status of this step
257 	 * 
258 	 * @param status the current status of this step
259 	 */
260 	public void setStatus(BatchStatus status) {
261 		this.status = status;
262 	}
263 
264 	/**
265 	 * Upgrade the status field if the provided value is greater than the
266 	 * existing one. Clients using this method to set the status can be sure
267 	 * that they don't overwrite a failed status with an successful one.
268 	 * 
269 	 * @param status the new status value
270 	 */
271 	public void upgradeStatus(BatchStatus status) {
272 		this.status = this.status.upgradeTo(status);
273 	}
274 
275 	/**
276 	 * @return the name of the step
277 	 */
278 	public String getStepName() {
279 		return stepName;
280 	}
281 
282 	/**
283 	 * Accessor for the job execution id.
284 	 * 
285 	 * @return the jobExecutionId
286 	 */
287 	public Long getJobExecutionId() {
288 		if (jobExecution != null) {
289 			return jobExecution.getId();
290 		}
291 		return null;
292 	}
293 
294 	/**
295 	 * @param exitStatus
296 	 */
297 	public void setExitStatus(ExitStatus exitStatus) {
298 		this.exitStatus = exitStatus;
299 	}
300 
301 	/**
302 	 * @return the exitCode
303 	 */
304 	public ExitStatus getExitStatus() {
305 		return exitStatus;
306 	}
307 
308 	/**
309 	 * Accessor for the execution context information of the enclosing job.
310 	 * 
311 	 * @return the {@link JobExecution} that was used to start this step
312 	 * execution.
313 	 */
314 	public JobExecution getJobExecution() {
315 		return jobExecution;
316 	}
317 
318 	/**
319 	 * Factory method for {@link StepContribution}.
320 	 * 
321 	 * @return a new {@link StepContribution}
322 	 */
323 	public StepContribution createStepContribution() {
324 		return new StepContribution(this);
325 	}
326 
327 	/**
328 	 * On successful execution just before a chunk commit, this method should be
329 	 * called. Synchronizes access to the {@link StepExecution} so that changes
330 	 * are atomic.
331 	 * 
332 	 * @param contribution
333 	 */
334 	public synchronized void apply(StepContribution contribution) {
335 		readSkipCount += contribution.getReadSkipCount();
336 		writeSkipCount += contribution.getWriteSkipCount();
337 		processSkipCount += contribution.getProcessSkipCount();
338 		filterCount += contribution.getFilterCount();
339 		readCount += contribution.getReadCount();
340 		writeCount += contribution.getWriteCount();
341 		exitStatus = exitStatus.and(contribution.getExitStatus());
342 	}
343 
344 	/**
345 	 * On unsuccessful execution after a chunk has rolled back.
346 	 */
347 	public synchronized void incrementRollbackCount() {
348 		rollbackCount++;
349 	}
350 
351 	/**
352 	 * @return flag to indicate that an execution should halt
353 	 */
354 	public boolean isTerminateOnly() {
355 		return this.terminateOnly;
356 	}
357 
358 	/**
359 	 * Set a flag that will signal to an execution environment that this
360 	 * execution (and its surrounding job) wishes to exit.
361 	 */
362 	public void setTerminateOnly() {
363 		this.terminateOnly = true;
364 	}
365 
366 	/**
367 	 * @return the total number of items skipped.
368 	 */
369 	public int getSkipCount() {
370 		return readSkipCount + processSkipCount + writeSkipCount;
371 	}
372 
373 	/**
374 	 * Increment the number of commits
375 	 */
376 	public void incrementCommitCount() {
377 		commitCount++;
378 	}
379 
380 	/**
381 	 * Convenience method to get the current job parameters.
382 	 * 
383 	 * @return the {@link JobParameters} from the enclosing job, or empty if
384 	 * that is null
385 	 */
386 	public JobParameters getJobParameters() {
387 		if (jobExecution == null || jobExecution.getJobInstance() == null) {
388 			return new JobParameters();
389 		}
390 		return jobExecution.getJobInstance().getJobParameters();
391 	}
392 
393 	/**
394 	 * @return the number of records skipped on read
395 	 */
396 	public int getReadSkipCount() {
397 		return readSkipCount;
398 	}
399 
400 	/**
401 	 * @return the number of records skipped on write
402 	 */
403 	public int getWriteSkipCount() {
404 		return writeSkipCount;
405 	}
406 
407 	/**
408 	 * Set the number of records skipped on read
409 	 * 
410 	 * @param readSkipCount
411 	 */
412 	public void setReadSkipCount(int readSkipCount) {
413 		this.readSkipCount = readSkipCount;
414 	}
415 
416 	/**
417 	 * Set the number of records skipped on write
418 	 * 
419 	 * @param writeSkipCount
420 	 */
421 	public void setWriteSkipCount(int writeSkipCount) {
422 		this.writeSkipCount = writeSkipCount;
423 	}
424 
425 	/**
426 	 * @return the number of records skipped during processing
427 	 */
428 	public int getProcessSkipCount() {
429 		return processSkipCount;
430 	}
431 
432 	/**
433 	 * Set the number of records skipped during processing.
434 	 * 
435 	 * @param processSkipCount
436 	 */
437 	public void setProcessSkipCount(int processSkipCount) {
438 		this.processSkipCount = processSkipCount;
439 	}
440 
441 	/**
442 	 * @return the Date representing the last time this execution was persisted.
443 	 */
444 	public Date getLastUpdated() {
445 		return lastUpdated;
446 	}
447 
448 	/**
449 	 * Set the time when the StepExecution was last updated before persisting
450 	 * 
451 	 * @param lastUpdated
452 	 */
453 	public void setLastUpdated(Date lastUpdated) {
454 		this.lastUpdated = lastUpdated;
455 	}
456 
457 	public List<Throwable> getFailureExceptions() {
458 		return failureExceptions;
459 	}
460 
461 	public void addFailureException(Throwable throwable) {
462 		this.failureExceptions.add(throwable);
463 	}
464 
465 	/*
466 	 * (non-Javadoc)
467 	 * 
468 	 * @see
469 	 * org.springframework.batch.container.common.domain.Entity#equals(java.
470 	 * lang.Object)
471 	 */
472 	public boolean equals(Object obj) {
473 
474 		Object jobExecutionId = getJobExecutionId();
475 		if (jobExecutionId == null || !(obj instanceof StepExecution) || getId() == null) {
476 			return super.equals(obj);
477 		}
478 		StepExecution other = (StepExecution) obj;
479 
480 		return stepName.equals(other.getStepName()) && (jobExecutionId.equals(other.getJobExecutionId()))
481 				&& getId().equals(other.getId());
482 	}
483 
484 	/**
485 	 * Deserialise and ensure transient fields are re-instantiated when read
486 	 * back
487 	 */
488 	private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
489 		stream.defaultReadObject();
490 		failureExceptions = new ArrayList<Throwable>();
491 	}
492 
493 	/*
494 	 * (non-Javadoc)
495 	 * 
496 	 * @see org.springframework.batch.container.common.domain.Entity#hashCode()
497 	 */
498 	public int hashCode() {
499 		Object jobExecutionId = getJobExecutionId();
500 		Long id = getId();
501 		return super.hashCode() + 31 * (stepName != null ? stepName.hashCode() : 0) + 91
502 				* (jobExecutionId != null ? jobExecutionId.hashCode() : 0) + 59 * (id != null ? id.hashCode() : 0);
503 	}
504 
505 	public String toString() {
506 		return String.format(getSummary() + ", exitDescription=%s", exitStatus.getExitDescription());
507 	}
508 
509 	public String getSummary() {
510 		return super.toString()
511 				+ String.format(
512 						", name=%s, status=%s, exitStatus=%s, readCount=%d, filterCount=%d, writeCount=%d readSkipCount=%d, writeSkipCount=%d"
513 								+ ", processSkipCount=%d, commitCount=%d, rollbackCount=%d", stepName, status,
514 						exitStatus.getExitCode(), readCount, filterCount, writeCount, readSkipCount, writeSkipCount,
515 						processSkipCount, commitCount, rollbackCount);
516 	}
517 
518 }