1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.repository.dao; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collections; |
21 | import java.util.Comparator; |
22 | import java.util.HashSet; |
23 | import java.util.List; |
24 | import java.util.Set; |
25 | import java.util.concurrent.ConcurrentHashMap; |
26 | import java.util.concurrent.ConcurrentMap; |
27 | import java.util.concurrent.atomic.AtomicLong; |
28 | |
29 | import org.springframework.batch.core.JobExecution; |
30 | import org.springframework.batch.core.JobInstance; |
31 | import org.springframework.batch.support.SerializationUtils; |
32 | import org.springframework.dao.OptimisticLockingFailureException; |
33 | import org.springframework.util.Assert; |
34 | |
35 | /** |
36 | * In-memory implementation of {@link JobExecutionDao}. |
37 | */ |
38 | public class MapJobExecutionDao implements JobExecutionDao { |
39 | |
40 | // JDK6 Make this into a ConcurrentSkipListMap: adds and removes tend to be very near the front or back |
41 | private final ConcurrentMap<Long, JobExecution> executionsById = new ConcurrentHashMap<Long, JobExecution>(); |
42 | |
43 | private final AtomicLong currentId = new AtomicLong(0L); |
44 | |
45 | public void clear() { |
46 | executionsById.clear(); |
47 | } |
48 | |
49 | private static JobExecution copy(JobExecution original) { |
50 | JobExecution copy = (JobExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original)); |
51 | return copy; |
52 | } |
53 | |
54 | @Override |
55 | public void saveJobExecution(JobExecution jobExecution) { |
56 | Assert.isTrue(jobExecution.getId() == null); |
57 | Long newId = currentId.getAndIncrement(); |
58 | jobExecution.setId(newId); |
59 | jobExecution.incrementVersion(); |
60 | executionsById.put(newId, copy(jobExecution)); |
61 | } |
62 | |
63 | @Override |
64 | public List<JobExecution> findJobExecutions(JobInstance jobInstance) { |
65 | List<JobExecution> executions = new ArrayList<JobExecution>(); |
66 | for (JobExecution exec : executionsById.values()) { |
67 | if (exec.getJobInstance().equals(jobInstance)) { |
68 | executions.add(copy(exec)); |
69 | } |
70 | } |
71 | Collections.sort(executions, new Comparator<JobExecution>() { |
72 | |
73 | @Override |
74 | public int compare(JobExecution e1, JobExecution e2) { |
75 | long result = (e1.getId() - e2.getId()); |
76 | if (result > 0) { |
77 | return -1; |
78 | } |
79 | else if (result < 0) { |
80 | return 1; |
81 | } |
82 | else { |
83 | return 0; |
84 | } |
85 | } |
86 | }); |
87 | return executions; |
88 | } |
89 | |
90 | @Override |
91 | public void updateJobExecution(JobExecution jobExecution) { |
92 | Long id = jobExecution.getId(); |
93 | Assert.notNull(id, "JobExecution is expected to have an id (should be saved already)"); |
94 | JobExecution persistedExecution = executionsById.get(id); |
95 | Assert.notNull(persistedExecution, "JobExecution must already be saved"); |
96 | |
97 | synchronized (jobExecution) { |
98 | if (!persistedExecution.getVersion().equals(jobExecution.getVersion())) { |
99 | throw new OptimisticLockingFailureException("Attempt to update step execution id=" + id |
100 | + " with wrong version (" + jobExecution.getVersion() + "), where current version is " |
101 | + persistedExecution.getVersion()); |
102 | } |
103 | jobExecution.incrementVersion(); |
104 | executionsById.put(id, copy(jobExecution)); |
105 | } |
106 | } |
107 | |
108 | @Override |
109 | public JobExecution getLastJobExecution(JobInstance jobInstance) { |
110 | JobExecution lastExec = null; |
111 | for (JobExecution exec : executionsById.values()) { |
112 | if (!exec.getJobInstance().equals(jobInstance)) { |
113 | continue; |
114 | } |
115 | if (lastExec == null) { |
116 | lastExec = exec; |
117 | } |
118 | if (lastExec.getCreateTime().before(exec.getCreateTime())) { |
119 | lastExec = exec; |
120 | } |
121 | } |
122 | return copy(lastExec); |
123 | } |
124 | |
125 | /* |
126 | * (non-Javadoc) |
127 | * |
128 | * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# |
129 | * findRunningJobExecutions(java.lang.String) |
130 | */ |
131 | @Override |
132 | public Set<JobExecution> findRunningJobExecutions(String jobName) { |
133 | Set<JobExecution> result = new HashSet<JobExecution>(); |
134 | for (JobExecution exec : executionsById.values()) { |
135 | if (!exec.getJobInstance().getJobName().equals(jobName) || !exec.isRunning()) { |
136 | continue; |
137 | } |
138 | result.add(copy(exec)); |
139 | } |
140 | return result; |
141 | } |
142 | |
143 | /* |
144 | * (non-Javadoc) |
145 | * |
146 | * @see |
147 | * org.springframework.batch.core.repository.dao.JobExecutionDao#getJobExecution |
148 | * (java.lang.Long) |
149 | */ |
150 | @Override |
151 | public JobExecution getJobExecution(Long executionId) { |
152 | return copy(executionsById.get(executionId)); |
153 | } |
154 | |
155 | @Override |
156 | public void synchronizeStatus(JobExecution jobExecution) { |
157 | JobExecution saved = getJobExecution(jobExecution.getId()); |
158 | if (saved.getVersion().intValue() != jobExecution.getVersion().intValue()) { |
159 | jobExecution.upgradeStatus(saved.getStatus()); |
160 | jobExecution.setVersion(saved.getVersion()); |
161 | } |
162 | } |
163 | } |