1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.repository.dao; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collections; |
21 | import java.util.Comparator; |
22 | import java.util.HashSet; |
23 | import java.util.List; |
24 | import java.util.Map; |
25 | import java.util.Set; |
26 | |
27 | import org.apache.commons.lang.SerializationUtils; |
28 | import org.springframework.batch.core.JobExecution; |
29 | import org.springframework.batch.core.JobInstance; |
30 | import org.springframework.batch.support.transaction.TransactionAwareProxyFactory; |
31 | import org.springframework.dao.OptimisticLockingFailureException; |
32 | import org.springframework.util.Assert; |
33 | |
34 | /** |
35 | * In-memory implementation of {@link JobExecutionDao}. |
36 | */ |
37 | public class MapJobExecutionDao implements JobExecutionDao { |
38 | |
39 | private static Map<Long, JobExecution> executionsById = TransactionAwareProxyFactory.createTransactionalMap(); |
40 | |
41 | private static long currentId = 0; |
42 | |
43 | public static void clear() { |
44 | executionsById.clear(); |
45 | } |
46 | |
47 | private static JobExecution copy(JobExecution original) { |
48 | JobExecution copy = (JobExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original)); |
49 | return copy; |
50 | } |
51 | |
52 | public void saveJobExecution(JobExecution jobExecution) { |
53 | Assert.isTrue(jobExecution.getId() == null); |
54 | Long newId = currentId++; |
55 | jobExecution.setId(newId); |
56 | jobExecution.incrementVersion(); |
57 | executionsById.put(newId, copy(jobExecution)); |
58 | } |
59 | |
60 | public List<JobExecution> findJobExecutions(JobInstance jobInstance) { |
61 | List<JobExecution> executions = new ArrayList<JobExecution>(); |
62 | for (JobExecution exec : executionsById.values()) { |
63 | if (exec.getJobInstance().equals(jobInstance)) { |
64 | executions.add(copy(exec)); |
65 | } |
66 | } |
67 | Collections.sort(executions, new Comparator<JobExecution>() { |
68 | |
69 | public int compare(JobExecution e1, JobExecution e2) { |
70 | long result = (e1.getId() - e2.getId()); |
71 | if (result > 0) { |
72 | return -1; |
73 | } |
74 | else if (result < 0) { |
75 | return 1; |
76 | } |
77 | else { |
78 | return 0; |
79 | } |
80 | } |
81 | }); |
82 | return executions; |
83 | } |
84 | |
85 | public void updateJobExecution(JobExecution jobExecution) { |
86 | Long id = jobExecution.getId(); |
87 | Assert.notNull(id, "JobExecution is expected to have an id (should be saved already)"); |
88 | JobExecution persistedExecution = executionsById.get(id); |
89 | Assert.notNull(persistedExecution, "JobExecution must already be saved"); |
90 | |
91 | synchronized (jobExecution) { |
92 | if (!persistedExecution.getVersion().equals(jobExecution.getVersion())) { |
93 | throw new OptimisticLockingFailureException("Attempt to update step execution id=" + id |
94 | + " with wrong version (" + jobExecution.getVersion() + "), where current version is " |
95 | + persistedExecution.getVersion()); |
96 | } |
97 | jobExecution.incrementVersion(); |
98 | executionsById.put(id, copy(jobExecution)); |
99 | } |
100 | } |
101 | |
102 | public JobExecution getLastJobExecution(JobInstance jobInstance) { |
103 | JobExecution lastExec = null; |
104 | for (JobExecution exec : executionsById.values()) { |
105 | if (!exec.getJobInstance().equals(jobInstance)) { |
106 | continue; |
107 | } |
108 | if (lastExec == null) { |
109 | lastExec = exec; |
110 | } |
111 | if (lastExec.getCreateTime().before(exec.getCreateTime())) { |
112 | lastExec = exec; |
113 | } |
114 | } |
115 | return copy(lastExec); |
116 | } |
117 | |
118 | /* |
119 | * (non-Javadoc) |
120 | * |
121 | * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# |
122 | * findRunningJobExecutions(java.lang.String) |
123 | */ |
124 | public Set<JobExecution> findRunningJobExecutions(String jobName) { |
125 | Set<JobExecution> result = new HashSet<JobExecution>(); |
126 | for (JobExecution exec : executionsById.values()) { |
127 | if (!exec.getJobInstance().getJobName().equals(jobName) || !exec.isRunning()) { |
128 | continue; |
129 | } |
130 | result.add(copy(exec)); |
131 | } |
132 | return result; |
133 | } |
134 | |
135 | /* |
136 | * (non-Javadoc) |
137 | * |
138 | * @see |
139 | * org.springframework.batch.core.repository.dao.JobExecutionDao#getJobExecution |
140 | * (java.lang.Long) |
141 | */ |
142 | public JobExecution getJobExecution(Long executionId) { |
143 | return copy(executionsById.get(executionId)); |
144 | } |
145 | |
146 | public void synchronizeStatus(JobExecution jobExecution) { |
147 | JobExecution saved = getJobExecution(jobExecution.getId()); |
148 | if (saved.getVersion().intValue() != jobExecution.getVersion().intValue()) { |
149 | jobExecution.upgradeStatus(saved.getStatus()); |
150 | jobExecution.setVersion(saved.getVersion()); |
151 | } |
152 | } |
153 | } |