View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core;
18  
19  import java.io.IOException;
20  import java.io.ObjectInputStream;
21  import java.util.ArrayList;
22  import java.util.Date;
23  import java.util.List;
24  import java.util.concurrent.CopyOnWriteArrayList;
25  
26  import org.springframework.batch.item.ExecutionContext;
27  import org.springframework.util.Assert;
28  
29  /**
30   * Batch domain object representation the execution of a step. Unlike
31   * {@link JobExecution}, there are additional properties related the processing
32   * of items such as commit count, etc.
33   *
34   * @author Lucas Ward
35   * @author Dave Syer
36   *
37   */
38  @SuppressWarnings("serial")
39  public class StepExecution extends Entity {
40  
41  	private final JobExecution jobExecution;
42  
43  	private final String stepName;
44  
45  	private volatile BatchStatus status = BatchStatus.STARTING;
46  
47  	private volatile int readCount = 0;
48  
49  	private volatile int writeCount = 0;
50  
51  	private volatile int commitCount = 0;
52  
53  	private volatile int rollbackCount = 0;
54  
55  	private volatile int readSkipCount = 0;
56  
57  	private volatile int processSkipCount = 0;
58  
59  	private volatile int writeSkipCount = 0;
60  
61  	private volatile Date startTime = new Date(System.currentTimeMillis());
62  
63  	private volatile Date endTime = null;
64  
65  	private volatile Date lastUpdated = null;
66  
67  	private volatile ExecutionContext executionContext = new ExecutionContext();
68  
69  	private volatile ExitStatus exitStatus = ExitStatus.EXECUTING;
70  
71  	private volatile boolean terminateOnly;
72  
73  	private volatile int filterCount;
74  
75  	private transient volatile List<Throwable> failureExceptions = new CopyOnWriteArrayList<Throwable>();
76  
77  	/**
78  	 * Constructor with mandatory properties.
79  	 *
80  	 * @param stepName the step to which this execution belongs
81  	 * @param jobExecution the current job execution
82  	 * @param id the id of this execution
83  	 */
84  	public StepExecution(String stepName, JobExecution jobExecution, Long id) {
85  		this(stepName, jobExecution);
86  		Assert.notNull(jobExecution, "JobExecution must be provided to re-hydrate an existing StepExecution");
87  		Assert.notNull(id, "The entity Id must be provided to re-hydrate an existing StepExecution");
88  		setId(id);
89  		jobExecution.addStepExecution(this);
90  	}
91  
92  	/**
93  	 * Constructor that substitutes in null for the execution id
94  	 *
95  	 * @param stepName the step to which this execution belongs
96  	 * @param jobExecution the current job execution
97  	 */
98  	public StepExecution(String stepName, JobExecution jobExecution) {
99  		super();
100 		Assert.hasLength(stepName);
101 		this.stepName = stepName;
102 		this.jobExecution = jobExecution;
103 	}
104 
105 	/**
106 	 * Returns the {@link ExecutionContext} for this execution
107 	 *
108 	 * @return the attributes
109 	 */
110 	public ExecutionContext getExecutionContext() {
111 		return executionContext;
112 	}
113 
114 	/**
115 	 * Sets the {@link ExecutionContext} for this execution
116 	 *
117 	 * @param executionContext the attributes
118 	 */
119 	public void setExecutionContext(ExecutionContext executionContext) {
120 		this.executionContext = executionContext;
121 	}
122 
123 	/**
124 	 * Returns the current number of commits for this execution
125 	 *
126 	 * @return the current number of commits
127 	 */
128 	public int getCommitCount() {
129 		return commitCount;
130 	}
131 
132 	/**
133 	 * Sets the current number of commits for this execution
134 	 *
135 	 * @param commitCount the current number of commits
136 	 */
137 	public void setCommitCount(int commitCount) {
138 		this.commitCount = commitCount;
139 	}
140 
141 	/**
142 	 * Returns the time that this execution ended
143 	 *
144 	 * @return the time that this execution ended
145 	 */
146 	public Date getEndTime() {
147 		return endTime;
148 	}
149 
150 	/**
151 	 * Sets the time that this execution ended
152 	 *
153 	 * @param endTime the time that this execution ended
154 	 */
155 	public void setEndTime(Date endTime) {
156 		this.endTime = endTime;
157 	}
158 
159 	/**
160 	 * Returns the current number of items read for this execution
161 	 *
162 	 * @return the current number of items read for this execution
163 	 */
164 	public int getReadCount() {
165 		return readCount;
166 	}
167 
168 	/**
169 	 * Sets the current number of read items for this execution
170 	 *
171 	 * @param readCount the current number of read items for this execution
172 	 */
173 	public void setReadCount(int readCount) {
174 		this.readCount = readCount;
175 	}
176 
177 	/**
178 	 * Returns the current number of items written for this execution
179 	 *
180 	 * @return the current number of items written for this execution
181 	 */
182 	public int getWriteCount() {
183 		return writeCount;
184 	}
185 
186 	/**
187 	 * Sets the current number of written items for this execution
188 	 *
189 	 * @param writeCount the current number of written items for this execution
190 	 */
191 	public void setWriteCount(int writeCount) {
192 		this.writeCount = writeCount;
193 	}
194 
195 	/**
196 	 * Returns the current number of rollbacks for this execution
197 	 *
198 	 * @return the current number of rollbacks for this execution
199 	 */
200 	public int getRollbackCount() {
201 		return rollbackCount;
202 	}
203 
204 	/**
205 	 * Returns the current number of items filtered out of this execution
206 	 *
207 	 * @return the current number of items filtered out of this execution
208 	 */
209 	public int getFilterCount() {
210 		return filterCount;
211 	}
212 
213 	/**
214 	 * Public setter for the number of items filtered out of this execution.
215 	 * @param filterCount the number of items filtered out of this execution to
216 	 * set
217 	 */
218 	public void setFilterCount(int filterCount) {
219 		this.filterCount = filterCount;
220 	}
221 
222 	/**
223 	 * Setter for number of rollbacks for this execution
224 	 */
225 	public void setRollbackCount(int rollbackCount) {
226 		this.rollbackCount = rollbackCount;
227 	}
228 
229 	/**
230 	 * Gets the time this execution started
231 	 *
232 	 * @return the time this execution started
233 	 */
234 	public Date getStartTime() {
235 		return startTime;
236 	}
237 
238 	/**
239 	 * Sets the time this execution started
240 	 *
241 	 * @param startTime the time this execution started
242 	 */
243 	public void setStartTime(Date startTime) {
244 		this.startTime = startTime;
245 	}
246 
247 	/**
248 	 * Returns the current status of this step
249 	 *
250 	 * @return the current status of this step
251 	 */
252 	public BatchStatus getStatus() {
253 		return status;
254 	}
255 
256 	/**
257 	 * Sets the current status of this step
258 	 *
259 	 * @param status the current status of this step
260 	 */
261 	public void setStatus(BatchStatus status) {
262 		this.status = status;
263 	}
264 
265 	/**
266 	 * Upgrade the status field if the provided value is greater than the
267 	 * existing one. Clients using this method to set the status can be sure
268 	 * that they don't overwrite a failed status with an successful one.
269 	 *
270 	 * @param status the new status value
271 	 */
272 	public void upgradeStatus(BatchStatus status) {
273 		this.status = this.status.upgradeTo(status);
274 	}
275 
276 	/**
277 	 * @return the name of the step
278 	 */
279 	public String getStepName() {
280 		return stepName;
281 	}
282 
283 	/**
284 	 * Accessor for the job execution id.
285 	 *
286 	 * @return the jobExecutionId
287 	 */
288 	public Long getJobExecutionId() {
289 		if (jobExecution != null) {
290 			return jobExecution.getId();
291 		}
292 		return null;
293 	}
294 
295 	/**
296 	 * @param exitStatus
297 	 */
298 	public void setExitStatus(ExitStatus exitStatus) {
299 		this.exitStatus = exitStatus;
300 	}
301 
302 	/**
303 	 * @return the exitCode
304 	 */
305 	public ExitStatus getExitStatus() {
306 		return exitStatus;
307 	}
308 
309 	/**
310 	 * Accessor for the execution context information of the enclosing job.
311 	 *
312 	 * @return the {@link JobExecution} that was used to start this step
313 	 * execution.
314 	 */
315 	public JobExecution getJobExecution() {
316 		return jobExecution;
317 	}
318 
319 	/**
320 	 * Factory method for {@link StepContribution}.
321 	 *
322 	 * @return a new {@link StepContribution}
323 	 */
324 	public StepContribution createStepContribution() {
325 		return new StepContribution(this);
326 	}
327 
328 	/**
329 	 * On successful execution just before a chunk commit, this method should be
330 	 * called. Synchronizes access to the {@link StepExecution} so that changes
331 	 * are atomic.
332 	 *
333 	 * @param contribution
334 	 */
335 	public synchronized void apply(StepContribution contribution) {
336 		readSkipCount += contribution.getReadSkipCount();
337 		writeSkipCount += contribution.getWriteSkipCount();
338 		processSkipCount += contribution.getProcessSkipCount();
339 		filterCount += contribution.getFilterCount();
340 		readCount += contribution.getReadCount();
341 		writeCount += contribution.getWriteCount();
342 		exitStatus = exitStatus.and(contribution.getExitStatus());
343 	}
344 
345 	/**
346 	 * On unsuccessful execution after a chunk has rolled back.
347 	 */
348 	public synchronized void incrementRollbackCount() {
349 		rollbackCount++;
350 	}
351 
352 	/**
353 	 * @return flag to indicate that an execution should halt
354 	 */
355 	public boolean isTerminateOnly() {
356 		return this.terminateOnly;
357 	}
358 
359 	/**
360 	 * Set a flag that will signal to an execution environment that this
361 	 * execution (and its surrounding job) wishes to exit.
362 	 */
363 	public void setTerminateOnly() {
364 		this.terminateOnly = true;
365 	}
366 
367 	/**
368 	 * @return the total number of items skipped.
369 	 */
370 	public int getSkipCount() {
371 		return readSkipCount + processSkipCount + writeSkipCount;
372 	}
373 
374 	/**
375 	 * Increment the number of commits
376 	 */
377 	public void incrementCommitCount() {
378 		commitCount++;
379 	}
380 
381 	/**
382 	 * Convenience method to get the current job parameters.
383 	 *
384 	 * @return the {@link JobParameters} from the enclosing job, or empty if
385 	 * that is null
386 	 */
387 	public JobParameters getJobParameters() {
388 		if (jobExecution == null || jobExecution.getJobInstance() == null) {
389 			return new JobParameters();
390 		}
391 		return jobExecution.getJobInstance().getJobParameters();
392 	}
393 
394 	/**
395 	 * @return the number of records skipped on read
396 	 */
397 	public int getReadSkipCount() {
398 		return readSkipCount;
399 	}
400 
401 	/**
402 	 * @return the number of records skipped on write
403 	 */
404 	public int getWriteSkipCount() {
405 		return writeSkipCount;
406 	}
407 
408 	/**
409 	 * Set the number of records skipped on read
410 	 *
411 	 * @param readSkipCount
412 	 */
413 	public void setReadSkipCount(int readSkipCount) {
414 		this.readSkipCount = readSkipCount;
415 	}
416 
417 	/**
418 	 * Set the number of records skipped on write
419 	 *
420 	 * @param writeSkipCount
421 	 */
422 	public void setWriteSkipCount(int writeSkipCount) {
423 		this.writeSkipCount = writeSkipCount;
424 	}
425 
426 	/**
427 	 * @return the number of records skipped during processing
428 	 */
429 	public int getProcessSkipCount() {
430 		return processSkipCount;
431 	}
432 
433 	/**
434 	 * Set the number of records skipped during processing.
435 	 *
436 	 * @param processSkipCount
437 	 */
438 	public void setProcessSkipCount(int processSkipCount) {
439 		this.processSkipCount = processSkipCount;
440 	}
441 
442 	/**
443 	 * @return the Date representing the last time this execution was persisted.
444 	 */
445 	public Date getLastUpdated() {
446 		return lastUpdated;
447 	}
448 
449 	/**
450 	 * Set the time when the StepExecution was last updated before persisting
451 	 *
452 	 * @param lastUpdated
453 	 */
454 	public void setLastUpdated(Date lastUpdated) {
455 		this.lastUpdated = lastUpdated;
456 	}
457 
458 	public List<Throwable> getFailureExceptions() {
459 		return failureExceptions;
460 	}
461 
462 	public void addFailureException(Throwable throwable) {
463 		this.failureExceptions.add(throwable);
464 	}
465 
466 	/*
467 	 * (non-Javadoc)
468 	 *
469 	 * @see
470 	 * org.springframework.batch.container.common.domain.Entity#equals(java.
471 	 * lang.Object)
472 	 */
473 	@Override
474 	public boolean equals(Object obj) {
475 
476 		Object jobExecutionId = getJobExecutionId();
477 		if (jobExecutionId == null || !(obj instanceof StepExecution) || getId() == null) {
478 			return super.equals(obj);
479 		}
480 		StepExecution other = (StepExecution) obj;
481 
482 		return stepName.equals(other.getStepName()) && (jobExecutionId.equals(other.getJobExecutionId()))
483 				&& getId().equals(other.getId());
484 	}
485 
486 	/**
487 	 * Deserialise and ensure transient fields are re-instantiated when read
488 	 * back
489 	 */
490 	private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
491 		stream.defaultReadObject();
492 		failureExceptions = new ArrayList<Throwable>();
493 	}
494 
495 	/*
496 	 * (non-Javadoc)
497 	 *
498 	 * @see org.springframework.batch.container.common.domain.Entity#hashCode()
499 	 */
500 	@Override
501 	public int hashCode() {
502 		Object jobExecutionId = getJobExecutionId();
503 		Long id = getId();
504 		return super.hashCode() + 31 * (stepName != null ? stepName.hashCode() : 0) + 91
505 				* (jobExecutionId != null ? jobExecutionId.hashCode() : 0) + 59 * (id != null ? id.hashCode() : 0);
506 	}
507 
508 	@Override
509 	public String toString() {
510 		return String.format(getSummary() + ", exitDescription=%s", exitStatus.getExitDescription());
511 	}
512 
513 	public String getSummary() {
514 		return super.toString()
515 				+ String.format(
516 						", name=%s, status=%s, exitStatus=%s, readCount=%d, filterCount=%d, writeCount=%d readSkipCount=%d, writeSkipCount=%d"
517 								+ ", processSkipCount=%d, commitCount=%d, rollbackCount=%d", stepName, status,
518 								exitStatus.getExitCode(), readCount, filterCount, writeCount, readSkipCount, writeSkipCount,
519 								processSkipCount, commitCount, rollbackCount);
520 	}
521 
522 }