1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.osgi.service.importer.support.internal.collection;
18
19 import java.util.Collection;
20 import java.util.Collections;
21 import java.util.Iterator;
22 import java.util.LinkedHashMap;
23 import java.util.List;
24 import java.util.Map;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.osgi.framework.BundleContext;
29 import org.osgi.framework.Constants;
30 import org.osgi.framework.Filter;
31 import org.osgi.framework.ServiceEvent;
32 import org.osgi.framework.ServiceListener;
33 import org.osgi.framework.ServiceReference;
34 import org.springframework.beans.factory.DisposableBean;
35 import org.springframework.beans.factory.InitializingBean;
36 import org.springframework.osgi.service.ServiceUnavailableException;
37 import org.springframework.osgi.service.importer.DefaultOsgiServiceDependency;
38 import org.springframework.osgi.service.importer.ImportedOsgiServiceProxy;
39 import org.springframework.osgi.service.importer.OsgiServiceDependency;
40 import org.springframework.osgi.service.importer.OsgiServiceLifecycleListener;
41 import org.springframework.osgi.service.importer.support.internal.aop.ProxyPlusCallback;
42 import org.springframework.osgi.service.importer.support.internal.aop.ServiceProxyCreator;
43 import org.springframework.osgi.service.importer.support.internal.dependency.ImporterStateListener;
44 import org.springframework.osgi.service.importer.support.internal.util.OsgiServiceBindingUtils;
45 import org.springframework.osgi.util.OsgiListenerUtils;
46 import org.springframework.util.Assert;
47
48 /**
49 * OSGi service dynamic collection - allows iterating while the underlying
50 * storage is being shrunk/expanded. This collection is read-only - its content
51 * is being retrieved dynamically from the OSGi platform.
52 *
53 * <p/> This collection and its iterators are thread-safe. That is, multiple
54 * threads can access the collection. However, since the collection is
55 * read-only, it cannot be modified by the client.
56 *
57 * @see Collection
58 * @author Costin Leau
59 */
60 public class OsgiServiceCollection implements Collection, InitializingBean, CollectionProxy, DisposableBean {
61
62 /**
63 * Listener tracking the OSGi services which form the dynamic collection.
64 *
65 * @author Costin Leau
66 */
67 private class Listener implements ServiceListener {
68
69 public void serviceChanged(ServiceEvent event) {
70 ClassLoader tccl = Thread.currentThread().getContextClassLoader();
71
72 try {
73 Thread.currentThread().setContextClassLoader(classLoader);
74 ServiceReference ref = event.getServiceReference();
75 Long serviceId = (Long) ref.getProperty(Constants.SERVICE_ID);
76 boolean collectionModified = false;
77
78 ProxyPlusCallback ppc = null;
79 Object proxy = null;
80
81
82 boolean shouldInformStateListeners = false;
83
84 switch (event.getType()) {
85
86 case (ServiceEvent.REGISTERED):
87 case (ServiceEvent.MODIFIED):
88
89 synchronized (serviceProxies) {
90 if (!servicesIdMap.containsKey(serviceId)) {
91 ppc = proxyCreator.createServiceProxy(ref);
92 proxy = ppc.proxy;
93
94
95 if (serviceProxies.add(proxy)) {
96 collectionModified = true;
97
98 shouldInformStateListeners = (serviceProxies.size() == 1);
99 servicesIdMap.put(serviceId, ppc);
100 }
101 }
102 }
103
104
105 if (collectionModified) {
106 OsgiServiceBindingUtils.callListenersBind(context, proxy, ref, listeners);
107
108 if (serviceRequiredAtStartup && shouldInformStateListeners)
109 notifySatisfiedStateListeners();
110 }
111
112 break;
113 case (ServiceEvent.UNREGISTERING):
114 synchronized (serviceProxies) {
115
116 ppc = (ProxyPlusCallback) servicesIdMap.remove(serviceId);
117 if (ppc != null) {
118 proxy = ppc.proxy;
119
120 checkDeadProxies(proxy);
121
122 collectionModified = serviceProxies.remove(proxy);
123
124 invalidateProxy(ppc);
125
126
127 shouldInformStateListeners = (serviceProxies.isEmpty());
128 }
129 }
130
131 if (collectionModified) {
132 OsgiServiceBindingUtils.callListenersUnbind(context, proxy, ref, listeners);
133
134 if (serviceRequiredAtStartup && shouldInformStateListeners)
135 notifyUnsatisfiedStateListeners();
136 }
137
138 break;
139
140 default:
141 throw new IllegalArgumentException("unsupported event type:" + event);
142 }
143 }
144
145
146 catch (Throwable re) {
147 if (log.isWarnEnabled()) {
148 log.warn("serviceChanged() processing failed", re);
149 }
150 }
151 finally {
152 Thread.currentThread().setContextClassLoader(tccl);
153 }
154 }
155
156 private void notifySatisfiedStateListeners() {
157 synchronized (stateListeners) {
158 for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
159 ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
160 stateListener.importerSatisfied(eventSource, dependency);
161 }
162 }
163 }
164
165 private void notifyUnsatisfiedStateListeners() {
166 synchronized (stateListeners) {
167 for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
168 ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
169 stateListener.importerUnsatisfied(eventSource, dependency);
170 }
171 }
172 }
173 }
174
175 /**
176 * Read-only iterator wrapper around the dynamic collection iterator.
177 *
178 * @author Costin Leau
179 *
180 */
181 protected class OsgiServiceIterator implements Iterator {
182
183
184 private final Iterator iter = serviceProxies.iterator();
185
186
187 public boolean hasNext() {
188 mandatoryServiceCheck();
189 return iter.hasNext();
190 }
191
192 public Object next() {
193 synchronized (serviceProxies) {
194 mandatoryServiceCheck();
195 Object proxy = iter.next();
196 return (proxy == null ? tailDeadProxy : proxy);
197 }
198 }
199
200 public void remove() {
201
202 throw new UnsupportedOperationException();
203 }
204 }
205
206
207 private static final Log log = LogFactory.getLog(OsgiServiceCollection.class);
208
209
210
211
212
213
214
215 protected final Map servicesIdMap = new LinkedHashMap(8);
216
217 /**
218 * The dynamic collection.
219 */
220 protected DynamicCollection serviceProxies;
221
222 /**
223 * Recall the last proxy for the rare case, where a service goes down
224 * between the #hasNext() and #next() call of an iterator.
225 *
226 * Subclasses should implement their own strategy when it comes to assign a
227 * value to it through
228 */
229 protected volatile Object tailDeadProxy;
230
231 private boolean serviceRequiredAtStartup = true;
232
233 private final Filter filter;
234
235 private final BundleContext context;
236
237 /** TCCL to set between calling listeners */
238 private final ClassLoader classLoader;
239
240 /** Service proxy creator */
241 private final ServiceProxyCreator proxyCreator;
242
243 private OsgiServiceLifecycleListener[] listeners = new OsgiServiceLifecycleListener[0];
244
245 private final ServiceListener listener;
246
247 /** state listener */
248 private List stateListeners = Collections.EMPTY_LIST;
249
250 private final Object lock = new Object();
251
252 /** dependency object */
253 private OsgiServiceDependency dependency;
254
255 /** dependable service importer */
256 private Object eventSource;
257
258 /** event source (importer) name */
259 private String sourceName;
260
261
262 public OsgiServiceCollection(Filter filter, BundleContext context, ClassLoader classLoader,
263 ServiceProxyCreator proxyCreator) {
264 Assert.notNull(classLoader, "ClassLoader is required");
265 Assert.notNull(context, "context is required");
266
267 this.filter = filter;
268 this.context = context;
269 this.classLoader = classLoader;
270
271 this.proxyCreator = proxyCreator;
272 listener = new Listener();
273 }
274
275 public void afterPropertiesSet() {
276
277 this.serviceProxies = createInternalDynamicStorage();
278
279 boolean trace = log.isTraceEnabled();
280
281 dependency = new DefaultOsgiServiceDependency(sourceName, filter, serviceRequiredAtStartup);
282
283 if (trace)
284 log.trace("Adding osgi listener for services matching [" + filter + "]");
285 OsgiListenerUtils.addServiceListener(context, listener, filter);
286
287 if (serviceRequiredAtStartup) {
288 if (trace)
289 log.trace("1..x cardinality - looking for service [" + filter + "] at startup...");
290 mandatoryServiceCheck();
291 }
292 }
293
294 public void destroy() {
295 OsgiListenerUtils.removeServiceListener(context, listener);
296
297 synchronized (serviceProxies) {
298 for (Iterator iterator = serviceProxies.iterator(); iterator.hasNext();) {
299 ImportedOsgiServiceProxy serviceProxy = (ImportedOsgiServiceProxy) iterator.next();
300 ServiceReference ref = serviceProxy.getServiceReference();
301
302
303 ProxyPlusCallback ppc = (ProxyPlusCallback) servicesIdMap.get((Long) ref.getProperty(Constants.SERVICE_ID));
304 listener.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, ref));
305
306 try {
307 ppc.destructionCallback.destroy();
308 }
309 catch (Exception ex) {
310 log.error("Exception occurred while destroying proxy " + ppc.proxy, ex);
311 }
312 }
313
314 serviceProxies.clear();
315 servicesIdMap.clear();
316 }
317 }
318
319 /**
320 * Check to see whether at least one service is available.
321 */
322 protected void mandatoryServiceCheck() {
323 if (serviceRequiredAtStartup && serviceProxies.isEmpty())
324 throw new ServiceUnavailableException(filter);
325 }
326
327 public boolean isSatisfied() {
328 if (serviceRequiredAtStartup)
329 return (!serviceProxies.isEmpty());
330 else
331 return true;
332 }
333
334 /**
335 * Create the dynamic storage used internally. The storage <strong>has</strong>
336 * to be thread-safe.
337 */
338 protected DynamicCollection createInternalDynamicStorage() {
339 return new DynamicCollection();
340 }
341
342 private void invalidateProxy(ProxyPlusCallback ppc) {
343
344 }
345
346 /**
347 * Hook for tracking the last disappearing service to cope with the rare
348 * case, where the last service in the collection disappears between calls
349 * to hasNext() and next() on an iterator at the end of the collection.
350 *
351 * @param proxy
352 */
353 protected void checkDeadProxies(Object proxy, int proxyCollectionPos) {
354 if (proxyCollectionPos == serviceProxies.size() - 1)
355 tailDeadProxy = proxy;
356 }
357
358 /**
359 * Private method, computing the index and share it with subclasses.
360 *
361 * @param proxy
362 */
363 private void checkDeadProxies(Object proxy) {
364
365 int index = serviceProxies.indexOf(proxy);
366 checkDeadProxies(proxy, index);
367 }
368
369 public void setServiceImporter(Object importer) {
370 this.eventSource = importer;
371 }
372
373 public void setServiceImporterName(String name) {
374 this.sourceName = name;
375 }
376
377 public Iterator iterator() {
378 return new OsgiServiceIterator();
379 }
380
381 public int size() {
382 mandatoryServiceCheck();
383 return serviceProxies.size();
384 }
385
386 public String toString() {
387 mandatoryServiceCheck();
388 synchronized (serviceProxies) {
389 return serviceProxies.toString();
390 }
391 }
392
393
394
395
396 public boolean remove(Object o) {
397 throw new UnsupportedOperationException();
398 }
399
400 public boolean removeAll(Collection c) {
401 throw new UnsupportedOperationException();
402 }
403
404 public boolean add(Object o) {
405 throw new UnsupportedOperationException();
406 }
407
408 public boolean addAll(Collection c) {
409 throw new UnsupportedOperationException();
410 }
411
412 public void clear() {
413 throw new UnsupportedOperationException();
414 }
415
416 public boolean retainAll(Collection c) {
417 throw new UnsupportedOperationException();
418 }
419
420 public boolean contains(Object o) {
421 mandatoryServiceCheck();
422 return serviceProxies.contains(o);
423 }
424
425 public boolean containsAll(Collection c) {
426 mandatoryServiceCheck();
427 return serviceProxies.containsAll(c);
428 }
429
430 public boolean isEmpty() {
431 mandatoryServiceCheck();
432 return size() == 0;
433 }
434
435 public Object[] toArray() {
436 mandatoryServiceCheck();
437 return serviceProxies.toArray();
438 }
439
440 public Object[] toArray(Object[] array) {
441 mandatoryServiceCheck();
442 return serviceProxies.toArray(array);
443 }
444
445 /**
446 * @param listeners The listeners to set.
447 */
448 public void setListeners(OsgiServiceLifecycleListener[] listeners) {
449 Assert.notNull(listeners);
450 this.listeners = listeners;
451 }
452
453 public void setRequiredAtStartup(boolean serviceRequiredAtStartup) {
454 this.serviceRequiredAtStartup = serviceRequiredAtStartup;
455 }
456
457 public void setStateListeners(List stateListeners) {
458 synchronized (lock) {
459 this.stateListeners = stateListeners;
460 }
461 }
462 }