1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.springframework.osgi.service.importer.support.internal.aop;
18
19 import java.util.Collections;
20 import java.util.Iterator;
21 import java.util.List;
22
23 import org.osgi.framework.BundleContext;
24 import org.osgi.framework.Constants;
25 import org.osgi.framework.Filter;
26 import org.osgi.framework.ServiceEvent;
27 import org.osgi.framework.ServiceListener;
28 import org.osgi.framework.ServiceReference;
29 import org.springframework.beans.factory.InitializingBean;
30 import org.springframework.context.ApplicationEvent;
31 import org.springframework.context.ApplicationEventPublisher;
32 import org.springframework.context.ApplicationEventPublisherAware;
33 import org.springframework.osgi.service.ServiceUnavailableException;
34 import org.springframework.osgi.service.importer.DefaultOsgiServiceDependency;
35 import org.springframework.osgi.service.importer.OsgiServiceDependency;
36 import org.springframework.osgi.service.importer.OsgiServiceLifecycleListener;
37 import org.springframework.osgi.service.importer.ServiceProxyDestroyedException;
38 import org.springframework.osgi.service.importer.event.OsgiServiceDependencyWaitEndedEvent;
39 import org.springframework.osgi.service.importer.event.OsgiServiceDependencyWaitTimedOutEvent;
40 import org.springframework.osgi.service.importer.event.OsgiServiceDependencyWaitStartingEvent;
41 import org.springframework.osgi.service.importer.support.internal.dependency.ImporterStateListener;
42 import org.springframework.osgi.service.importer.support.internal.support.DefaultRetryCallback;
43 import org.springframework.osgi.service.importer.support.internal.support.RetryCallback;
44 import org.springframework.osgi.service.importer.support.internal.support.RetryTemplate;
45 import org.springframework.osgi.service.importer.support.internal.support.ServiceWrapper;
46 import org.springframework.osgi.service.importer.support.internal.util.OsgiServiceBindingUtils;
47 import org.springframework.osgi.util.OsgiListenerUtils;
48 import org.springframework.osgi.util.OsgiServiceReferenceUtils;
49 import org.springframework.util.Assert;
50 import org.springframework.util.ObjectUtils;
51
52 /**
53 * Interceptor adding dynamic behaviour for unary service (..1 cardinality). It
54 * will look for a service using the given filter, retrying if the service is
55 * down or unavailable. Will dynamically rebound a new service, if one is
56 * available with a higher service ranking. <p/> <p/> In case no service is
57 * available, it will throw an exception.
58 *
59 * <p/> <strong>Note</strong>: this is a stateful interceptor and should not be
60 * shared.
61 *
62 * @author Costin Leau
63 */
64 public class ServiceDynamicInterceptor extends ServiceInvoker implements InitializingBean,
65 ApplicationEventPublisherAware {
66
67 /**
68 * Override the default implementation to plug in event notification.
69 *
70 * @author Costin Leau
71 *
72 */
73 private class EventSenderRetryTemplate extends RetryTemplate {
74
75 public EventSenderRetryTemplate(RetryTemplate retryTemplate) {
76 super(retryTemplate);
77 }
78
79 public Object execute(RetryCallback callback, Object notificationLock) {
80
81 publishEvent(new OsgiServiceDependencyWaitStartingEvent(eventSource, dependency, this.getWaitTime()
82 * this.getRetryNumbers()));
83
84 Object result = null;
85
86 long start = System.currentTimeMillis();
87 long stop;
88
89 try {
90 result = super.execute(callback, notificationLock);
91 stop = System.currentTimeMillis() - start;
92 }
93 catch (RuntimeException exception) {
94 stop = System.currentTimeMillis() - start;
95 publishEvent(new OsgiServiceDependencyWaitTimedOutEvent(eventSource, dependency, stop));
96 throw exception;
97 }
98
99
100 if (callback.isComplete(result)) {
101 publishEvent(new OsgiServiceDependencyWaitEndedEvent(eventSource, dependency, stop));
102 }
103 else {
104 publishEvent(new OsgiServiceDependencyWaitTimedOutEvent(eventSource, dependency, stop));
105 }
106
107 return result;
108 }
109 }
110
111 /**
112 * Listener tracking the OSGi services which form the dynamic reference.
113 */
114
115
116
117
118
119
120 private class Listener implements ServiceListener {
121
122 public void serviceChanged(ServiceEvent event) {
123 ClassLoader tccl = Thread.currentThread().getContextClassLoader();
124 try {
125 Thread.currentThread().setContextClassLoader(classLoader);
126 ServiceReference ref = event.getServiceReference();
127
128
129 long serviceId = ((Long) ref.getProperty(Constants.SERVICE_ID)).longValue();
130
131 Integer rank = (Integer) ref.getProperty(Constants.SERVICE_RANKING);
132 int ranking = (rank == null ? 0 : rank.intValue());
133
134 boolean debug = log.isDebugEnabled();
135
136 switch (event.getType()) {
137
138 case (ServiceEvent.REGISTERED):
139
140 case (ServiceEvent.MODIFIED): {
141
142 boolean servicePresent = false;
143
144 synchronized (lock) {
145 servicePresent = (wrapper != null && wrapper.isServiceAlive());
146 }
147
148 if (updateWrapperIfNecessary(ref, serviceId, ranking)) {
149
150 OsgiServiceBindingUtils.callListenersBind(bundleContext, proxy, ref, listeners);
151
152 if (!servicePresent) {
153 notifySatisfiedStateListeners();
154 }
155 }
156
157 break;
158 }
159 case (ServiceEvent.UNREGISTERING): {
160
161 boolean serviceRemoved = false;
162 /**
163 * used if the service goes down and there is no
164 * replacement
165 */
166 /**
167 * since the listeners will require a valid proxy, the
168 * invalidation has to happen *after* calling the
169 * listeners
170 */
171 ServiceWrapper oldWrapper = wrapper;
172
173 synchronized (lock) {
174
175 if (wrapper != null) {
176 if (serviceId == wrapper.getServiceId()) {
177 serviceRemoved = true;
178 wrapper = null;
179
180 }
181 }
182 }
183
184 ServiceReference newReference = null;
185
186 boolean isDestroyed = false;
187
188 synchronized (lock) {
189 isDestroyed = destroyed;
190 }
191
192
193 if (!isDestroyed) {
194 newReference = OsgiServiceReferenceUtils.getServiceReference(bundleContext,
195 (filter == null ? null : filter.toString()));
196
197
198
199
200
201 if (newReference != null) {
202
203 serviceChanged(new ServiceEvent(ServiceEvent.MODIFIED, newReference));
204 }
205 }
206
207
208
209 if (newReference == null && serviceRemoved) {
210
211
212 synchronized (lock) {
213 wrapper = oldWrapper;
214 }
215
216
217 OsgiServiceBindingUtils.callListenersUnbind(bundleContext, proxy, ref, listeners);
218
219
220 synchronized (lock) {
221 wrapper = null;
222 }
223
224 if (debug) {
225 String message = "Service reference [" + ref + "] was unregistered";
226 if (serviceRemoved) {
227 message += " and unbound from the service proxy";
228 }
229 else {
230 message += " but did not affect the service proxy";
231 }
232 log.debug(message);
233 }
234
235
236 notifyUnsatisfiedStateListeners();
237 }
238
239 break;
240 }
241 default:
242 throw new IllegalArgumentException("unsupported event type");
243 }
244 }
245 catch (Throwable e) {
246
247
248 log.fatal("Exception during service event handling", e);
249 }
250 finally {
251 Thread.currentThread().setContextClassLoader(tccl);
252 }
253 }
254
255 private void notifySatisfiedStateListeners() {
256 synchronized (stateListeners) {
257 for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
258 ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
259 stateListener.importerSatisfied(eventSource, dependency);
260 }
261 }
262 }
263
264 private void notifyUnsatisfiedStateListeners() {
265 synchronized (stateListeners) {
266 for (Iterator iterator = stateListeners.iterator(); iterator.hasNext();) {
267 ImporterStateListener stateListener = (ImporterStateListener) iterator.next();
268 stateListener.importerUnsatisfied(eventSource, dependency);
269 }
270 }
271 }
272
273 private boolean updateWrapperIfNecessary(ServiceReference ref, long serviceId, int serviceRanking) {
274 boolean updated = false;
275 try {
276 synchronized (lock) {
277 if (wrapper != null && wrapper.isServiceAlive()) {
278
279 if (serviceRanking > wrapper.getServiceRanking()) {
280 updated = true;
281 updateReferenceHolders(ref);
282 }
283
284 if (serviceRanking == wrapper.getServiceRanking()) {
285 if (serviceId < wrapper.getServiceId()) {
286 updated = true;
287 updateReferenceHolders(ref);
288 }
289 }
290 }
291
292
293
294 else {
295 updated = true;
296 updateReferenceHolders(ref);
297 }
298 lock.notifyAll();
299 return updated;
300 }
301 }
302 finally {
303 if (log.isDebugEnabled()) {
304 String message = "Service reference [" + ref + "]";
305 if (updated)
306 message += " bound to proxy";
307 else
308 message += " not bound to proxy";
309 log.debug(message);
310 }
311 }
312 }
313
314 /**
315 * Update internal holders for the backing ServiceReference.
316 *
317 * @param ref
318 */
319 private void updateReferenceHolders(ServiceReference ref) {
320
321 wrapper = new ServiceWrapper(ref, bundleContext);
322 referenceDelegate.swapDelegates(ref);
323 }
324 }
325
326
327 private static final int hashCode = ServiceDynamicInterceptor.class.hashCode() * 13;
328
329 private final BundleContext bundleContext;
330
331 private final Filter filter;
332
333 /** TCCL to set when calling listeners */
334 private final ClassLoader classLoader;
335
336 private final SwappingServiceReferenceProxy referenceDelegate;
337
338 /** event listener */
339 private final ServiceListener listener;
340
341 /** mandatory flag */
342 private boolean serviceRequiredAtStartup = true;
343
344 /** flag indicating whether the destruction has started or not */
345 private boolean isDuringDestruction = false;
346
347 /** flag indicating whether the proxy is already destroyed or not */
348 private boolean destroyed = false;
349
350 /** private lock */
351 private final Object lock = new Object();
352
353 /** utility service wrapper */
354 private ServiceWrapper wrapper;
355
356 /** retry template */
357 private RetryTemplate retryTemplate;
358
359 /** dependable service importer */
360 private Object eventSource;
361
362 /** event source (importer) name */
363 private String sourceName;
364
365 /** listener that need to be informed of bind/rebind/unbind */
366 private OsgiServiceLifecycleListener[] listeners = new OsgiServiceLifecycleListener[0];
367
368 /** reference to the created proxy passed to the listeners */
369 private Object proxy;
370
371 /** event publisher */
372 private ApplicationEventPublisher applicationEventPublisher;
373
374 /** dependency object */
375 private OsgiServiceDependency dependency;
376
377 /** internal state listeners */
378 private List stateListeners = Collections.EMPTY_LIST;
379
380
381 public ServiceDynamicInterceptor(BundleContext context, Filter filter, ClassLoader classLoader) {
382 this.bundleContext = context;
383 this.filter = filter;
384 this.classLoader = classLoader;
385
386 referenceDelegate = new SwappingServiceReferenceProxy();
387 listener = new Listener();
388 }
389
390 public Object getTarget() {
391 Object target = lookupService();
392
393
394 if (target == null) {
395 throw new ServiceUnavailableException(filter);
396 }
397 return target;
398 }
399
400 /**
401 * Look the service by waiting the service to appear. Note this method
402 * should use the same lock as the listener handling the service reference.
403 */
404 private Object lookupService() {
405 synchronized (lock) {
406 if (destroyed && !isDuringDestruction)
407 throw new ServiceProxyDestroyedException();
408
409 return (Object) retryTemplate.execute(new DefaultRetryCallback() {
410
411 public Object doWithRetry() {
412 return (wrapper != null) ? wrapper.getService() : null;
413 }
414 }, lock);
415 }
416 }
417
418 private void publishEvent(ApplicationEvent event) {
419 if (applicationEventPublisher != null) {
420 if (log.isTraceEnabled())
421 log.trace("Publishing event through publisher " + applicationEventPublisher);
422 try {
423 applicationEventPublisher.publishEvent(event);
424 }
425 catch (IllegalStateException ise) {
426 log.debug(
427 "Event "
428 + event
429 + " not published as the publisher is not initialized - usually this is caused by eager initialization of the importers by post processing",
430 ise);
431 }
432
433 }
434 else if (log.isTraceEnabled())
435 log.trace("No application event publisher set; no events will be published");
436 }
437
438 public void afterPropertiesSet() {
439 Assert.notNull(proxy);
440 Assert.notNull(eventSource);
441
442 boolean debug = log.isDebugEnabled();
443
444 retryTemplate = new EventSenderRetryTemplate(retryTemplate);
445
446 dependency = new DefaultOsgiServiceDependency(sourceName, filter, serviceRequiredAtStartup);
447
448 if (debug)
449 log.debug("Adding OSGi mandatoryListeners for services matching [" + filter + "]");
450 OsgiListenerUtils.addSingleServiceListener(bundleContext, listener, filter);
451
452 if (serviceRequiredAtStartup) {
453 if (debug)
454 log.debug("1..x cardinality - looking for service [" + filter + "] at startup...");
455 Object target = getTarget();
456 if (debug)
457 log.debug("Service retrieved " + target);
458 }
459 }
460
461 public void destroy() {
462 OsgiListenerUtils.removeServiceListener(bundleContext, listener);
463 synchronized (lock) {
464
465 destroyed = true;
466 isDuringDestruction = true;
467 if (wrapper != null) {
468 ServiceReference ref = wrapper.getReference();
469 if (ref != null) {
470
471 listener.serviceChanged(new ServiceEvent(ServiceEvent.UNREGISTERING, ref));
472 }
473 }
474 /** destruction process has ended */
475 isDuringDestruction = false;
476 }
477 }
478
479 /**
480 * {@inheritDoc}
481 *
482 * This particular interceptor returns a delegated service reference so that
483 * callers can keep the reference even if the underlying target service
484 * reference changes in time.
485 */
486 public ServiceReference getServiceReference() {
487 return referenceDelegate;
488 }
489
490 public void setRetryTemplate(RetryTemplate retryTemplate) {
491 this.retryTemplate = retryTemplate;
492 }
493
494 public RetryTemplate getRetryTemplate() {
495 return retryTemplate;
496 }
497
498 public OsgiServiceLifecycleListener[] getListeners() {
499 return listeners;
500 }
501
502 public void setListeners(OsgiServiceLifecycleListener[] listeners) {
503 this.listeners = listeners;
504 }
505
506 public void setServiceImporter(Object importer) {
507 this.eventSource = importer;
508 }
509
510 public void setServiceImporterName(String name) {
511 this.sourceName = name;
512 }
513
514 public void setRequiredAtStartup(boolean requiredAtStartup) {
515 this.serviceRequiredAtStartup = requiredAtStartup;
516 }
517
518 public void setProxy(Object proxy) {
519 this.proxy = proxy;
520 }
521
522 public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
523 this.applicationEventPublisher = applicationEventPublisher;
524 }
525
526 /** Internal state listeners */
527 public void setStateListeners(List stateListeners) {
528 synchronized (lock) {
529 this.stateListeners = stateListeners;
530 }
531 }
532
533 public boolean equals(Object other) {
534 if (this == other)
535 return true;
536 if (other instanceof ServiceDynamicInterceptor) {
537 ServiceDynamicInterceptor oth = (ServiceDynamicInterceptor) other;
538 return (serviceRequiredAtStartup == oth.serviceRequiredAtStartup
539 && ObjectUtils.nullSafeEquals(wrapper, oth.wrapper)
540 && ObjectUtils.nullSafeEquals(filter, oth.filter) && ObjectUtils.nullSafeEquals(retryTemplate,
541 oth.retryTemplate));
542 }
543 else
544 return false;
545 }
546
547 public int hashCode() {
548 return hashCode;
549 }
550 }