View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.repository.dao;
18  
19  import java.util.ArrayList;
20  import java.util.Collections;
21  import java.util.Comparator;
22  import java.util.HashSet;
23  import java.util.List;
24  import java.util.Set;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.ConcurrentMap;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.springframework.batch.core.JobExecution;
30  import org.springframework.batch.core.JobInstance;
31  import org.springframework.batch.support.SerializationUtils;
32  import org.springframework.dao.OptimisticLockingFailureException;
33  import org.springframework.util.Assert;
34  
35  /**
36   * In-memory implementation of {@link JobExecutionDao}.
37   */
38  public class MapJobExecutionDao implements JobExecutionDao {
39  
40  	// JDK6 Make this into a ConcurrentSkipListMap: adds and removes tend to be very near the front or back
41  	private final ConcurrentMap<Long, JobExecution> executionsById = new ConcurrentHashMap<Long, JobExecution>();
42  
43  	private final AtomicLong currentId = new AtomicLong(0L);
44  
45  	public void clear() {
46  		executionsById.clear();
47  	}
48  
49  	private static JobExecution copy(JobExecution original) {
50  		JobExecution copy = (JobExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original));
51  		return copy;
52  	}
53  
54  	@Override
55  	public void saveJobExecution(JobExecution jobExecution) {
56  		Assert.isTrue(jobExecution.getId() == null);
57  		Long newId = currentId.getAndIncrement();
58  		jobExecution.setId(newId);
59  		jobExecution.incrementVersion();
60  		executionsById.put(newId, copy(jobExecution));
61  	}
62  
63  	@Override
64  	public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
65  		List<JobExecution> executions = new ArrayList<JobExecution>();
66  		for (JobExecution exec : executionsById.values()) {
67  			if (exec.getJobInstance().equals(jobInstance)) {
68  				executions.add(copy(exec));
69  			}
70  		}
71  		Collections.sort(executions, new Comparator<JobExecution>() {
72  
73  			@Override
74  			public int compare(JobExecution e1, JobExecution e2) {
75  				long result = (e1.getId() - e2.getId());
76  				if (result > 0) {
77  					return -1;
78  				}
79  				else if (result < 0) {
80  					return 1;
81  				}
82  				else {
83  					return 0;
84  				}
85  			}
86  		});
87  		return executions;
88  	}
89  
90  	@Override
91  	public void updateJobExecution(JobExecution jobExecution) {
92  		Long id = jobExecution.getId();
93  		Assert.notNull(id, "JobExecution is expected to have an id (should be saved already)");
94  		JobExecution persistedExecution = executionsById.get(id);
95  		Assert.notNull(persistedExecution, "JobExecution must already be saved");
96  
97  		synchronized (jobExecution) {
98  			if (!persistedExecution.getVersion().equals(jobExecution.getVersion())) {
99  				throw new OptimisticLockingFailureException("Attempt to update step execution id=" + id
100 						+ " with wrong version (" + jobExecution.getVersion() + "), where current version is "
101 						+ persistedExecution.getVersion());
102 			}
103 			jobExecution.incrementVersion();
104 			executionsById.put(id, copy(jobExecution));
105 		}
106 	}
107 
108 	@Override
109 	public JobExecution getLastJobExecution(JobInstance jobInstance) {
110 		JobExecution lastExec = null;
111 		for (JobExecution exec : executionsById.values()) {
112 			if (!exec.getJobInstance().equals(jobInstance)) {
113 				continue;
114 			}
115 			if (lastExec == null) {
116 				lastExec = exec;
117 			}
118 			if (lastExec.getCreateTime().before(exec.getCreateTime())) {
119 				lastExec = exec;
120 			}
121 		}
122 		return copy(lastExec);
123 	}
124 
125 	/*
126 	 * (non-Javadoc)
127 	 *
128 	 * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao#
129 	 * findRunningJobExecutions(java.lang.String)
130 	 */
131 	@Override
132 	public Set<JobExecution> findRunningJobExecutions(String jobName) {
133 		Set<JobExecution> result = new HashSet<JobExecution>();
134 		for (JobExecution exec : executionsById.values()) {
135 			if (!exec.getJobInstance().getJobName().equals(jobName) || !exec.isRunning()) {
136 				continue;
137 			}
138 			result.add(copy(exec));
139 		}
140 		return result;
141 	}
142 
143 	/*
144 	 * (non-Javadoc)
145 	 *
146 	 * @see
147 	 * org.springframework.batch.core.repository.dao.JobExecutionDao#getJobExecution
148 	 * (java.lang.Long)
149 	 */
150 	@Override
151 	public JobExecution getJobExecution(Long executionId) {
152 		return copy(executionsById.get(executionId));
153 	}
154 
155 	@Override
156 	public void synchronizeStatus(JobExecution jobExecution) {
157 		JobExecution saved = getJobExecution(jobExecution.getId());
158 		if (saved.getVersion().intValue() != jobExecution.getVersion().intValue()) {
159 			jobExecution.upgradeStatus(saved.getStatus());
160 			jobExecution.setVersion(saved.getVersion());
161 		}
162 	}
163 }