001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017package org.apache.activemq.util.osgi;
018
019import static org.osgi.framework.wiring.BundleRevision.PACKAGE_NAMESPACE;
020
021import java.io.BufferedReader;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.net.URL;
026import java.util.ArrayList;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Properties;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033
034import org.apache.activemq.Service;
035import org.apache.activemq.store.PersistenceAdapter;
036import org.apache.activemq.transport.Transport;
037import org.apache.activemq.transport.discovery.DiscoveryAgent;
038import org.apache.activemq.util.FactoryFinder;
039import org.apache.activemq.util.FactoryFinder.ObjectFactory;
040import org.osgi.framework.Bundle;
041import org.osgi.framework.BundleActivator;
042import org.osgi.framework.BundleContext;
043import org.osgi.framework.BundleEvent;
044import org.osgi.framework.SynchronousBundleListener;
045import org.osgi.framework.wiring.BundleCapability;
046import org.osgi.framework.wiring.BundleWire;
047import org.osgi.framework.wiring.BundleWiring;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * An OSGi bundle activator for ActiveMQ which adapts the {@link org.apache.activemq.util.FactoryFinder}
053 * to the OSGi environment.
054 *
055 */
056public class Activator implements BundleActivator, SynchronousBundleListener, ObjectFactory {
057
058    private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
059
060    private final ConcurrentMap<String, Class<?>> serviceCache = new ConcurrentHashMap<String, Class<?>>();
061    private final ConcurrentMap<Long, BundleWrapper> bundleWrappers = new ConcurrentHashMap<Long, BundleWrapper>();
062    private BundleContext bundleContext;
063    private Set<BundleCapability> packageCapabilities = new HashSet<BundleCapability>();
064
065    // ================================================================
066    // BundleActivator interface impl
067    // ================================================================
068
069    @Override
070    public synchronized void start(BundleContext bundleContext) throws Exception {
071
072        // This is how we replace the default FactoryFinder strategy
073        // with one that is more compatible in an OSGi env.
074        FactoryFinder.setObjectFactory(this);
075
076        debug("activating");
077        this.bundleContext = bundleContext;
078
079        cachePackageCapabilities(Service.class, Transport.class, DiscoveryAgent.class, PersistenceAdapter.class);
080
081        debug("checking existing bundles");
082        bundleContext.addBundleListener(this);
083        for (Bundle bundle : bundleContext.getBundles()) {
084            if (bundle.getState() == Bundle.RESOLVED || bundle.getState() == Bundle.STARTING ||
085                bundle.getState() == Bundle.ACTIVE || bundle.getState() == Bundle.STOPPING) {
086                register(bundle);
087            }
088        }
089        debug("activated");
090    }
091
092    /**
093     * Caches the package capabilities that are needed for a set of interface classes
094     *
095     * @param classes interfaces we want to track
096     */
097    private void cachePackageCapabilities(Class<?> ... classes) {
098        BundleWiring ourWiring = bundleContext.getBundle().adapt(BundleWiring.class);
099        Set<String> packageNames = new HashSet<String>();
100        for (Class<?> clazz: classes) {
101            packageNames.add(clazz.getPackage().getName());
102        }
103
104        List<BundleCapability> ourExports = ourWiring.getCapabilities(PACKAGE_NAMESPACE);
105        for (BundleCapability ourExport : ourExports) {
106            String ourPkgName = (String) ourExport.getAttributes().get(PACKAGE_NAMESPACE);
107            if (packageNames.contains(ourPkgName)) {
108                packageCapabilities.add(ourExport);
109            }
110        }
111    }
112
113
114    @Override
115    public synchronized void stop(BundleContext bundleContext) throws Exception {
116        debug("deactivating");
117        bundleContext.removeBundleListener(this);
118        while (!bundleWrappers.isEmpty()) {
119            unregister(bundleWrappers.keySet().iterator().next());
120        }
121        debug("deactivated");
122        this.bundleContext = null;
123    }
124
125    // ================================================================
126    // SynchronousBundleListener interface impl
127    // ================================================================
128
129    @Override
130    public void bundleChanged(BundleEvent event) {
131        if (event.getType() == BundleEvent.RESOLVED) {
132            register(event.getBundle());
133        } else if (event.getType() == BundleEvent.UNRESOLVED || event.getType() == BundleEvent.UNINSTALLED) {
134            unregister(event.getBundle().getBundleId());
135        }
136    }
137
138    protected void register(final Bundle bundle) {
139        debug("checking bundle " + bundle.getBundleId());
140        if (isOurBundle(bundle) || isImportingUs(bundle) ) {
141            debug("Registering bundle for extension resolution: "+ bundle.getBundleId());
142            bundleWrappers.put(bundle.getBundleId(), new BundleWrapper(bundle));
143        }
144    }
145
146    private boolean isOurBundle(final Bundle bundle) {
147        return bundle.getBundleId() == bundleContext.getBundle().getBundleId();
148    }
149
150    /**
151     * When bundles unload.. we remove them thier cached Class entries from the
152     * serviceCache.  Future service lookups for the service will fail.
153     *
154     * TODO: consider a way to get the Broker release any references to
155     * instances of the service.
156     *
157     * @param bundleId
158     */
159    protected void unregister(long bundleId) {
160        BundleWrapper bundle = bundleWrappers.remove(bundleId);
161        if (bundle != null) {
162            for (String path : bundle.cachedServices) {
163                debug("unregistering service for key: " +path );
164                serviceCache.remove(path);
165            }
166        }
167    }
168
169    // ================================================================
170    // ObjectFactory interface impl
171    // ================================================================
172
173    @Override
174    public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {
175        Class<?> clazz = serviceCache.get(path);
176        if (clazz == null) {
177            StringBuffer warnings = new StringBuffer();
178            // We need to look for a bundle that has that class.
179            int wrrningCounter=1;
180            for (BundleWrapper wrapper : bundleWrappers.values()) {
181                URL resource = wrapper.bundle.getResource(path);
182                if( resource == null ) {
183                    continue;
184                }
185
186                Properties properties = loadProperties(resource);
187
188                String className = properties.getProperty("class");
189                if (className == null) {
190                    warnings.append("("+(wrrningCounter++)+") Invalid service file in bundle "+wrapper+": 'class' property not defined.");
191                    continue;
192                }
193
194                try {
195                    clazz = wrapper.bundle.loadClass(className);
196                } catch (ClassNotFoundException e) {
197                    warnings.append("("+(wrrningCounter++)+") Bundle "+wrapper+" could not load "+className+": "+e);
198                    continue;
199                }
200
201                // Yay.. the class was found.  Now cache it.
202                serviceCache.put(path, clazz);
203                wrapper.cachedServices.add(path);
204                break;
205            }
206
207            if( clazz == null ) {
208                // Since OSGi is such a tricky environment to work in.. lets give folks the
209                // most information we can in the error message.
210                String msg = "Service not found: '" + path + "'";
211                if (warnings.length()!= 0) {
212                    msg += ", "+warnings;
213                }
214                throw new IOException(msg);
215            }
216        }
217        return clazz.newInstance();
218    }
219
220    // ================================================================
221    // Internal Helper Methods
222    // ================================================================
223
224    private void debug(Object msg) {
225        LOG.debug(msg.toString());
226    }
227
228    private Properties loadProperties(URL resource) throws IOException {
229        InputStream in = resource.openStream();
230        try {
231            BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8"));
232            Properties properties = new Properties();
233            properties.load(in);
234            return properties;
235        } finally {
236            try {
237                in.close();
238            } catch (Exception e) {
239            }
240        }
241    }
242
243    /**
244     * We consider a bundle to be a candidate for objects if it imports at least
245     * one of the packages of our interfaces
246     *
247     * @param bundle
248     * @return true if the bundle is improting.
249     */
250    private boolean isImportingUs(Bundle bundle) {
251        BundleWiring wiring = bundle.adapt(BundleWiring.class);
252        List<BundleWire> imports = wiring.getRequiredWires(PACKAGE_NAMESPACE);
253        for (BundleWire importWire : imports) {
254            if (packageCapabilities.contains(importWire.getCapability())) {
255                return true;
256            }
257        }
258        return false;
259    }
260
261    private static class BundleWrapper {
262        private final Bundle bundle;
263        private final List<String> cachedServices = new ArrayList<String>();
264
265        public BundleWrapper(Bundle bundle) {
266            this.bundle = bundle;
267        }
268    }
269}