001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.util.concurrent.Callable;
020    import java.util.concurrent.ExecutionException;
021    import java.util.concurrent.ExecutorService;
022    import java.util.concurrent.Future;
023    import java.util.concurrent.TimeUnit;
024    
025    import javax.management.MBeanException;
026    import javax.management.NotCompliantMBeanException;
027    import javax.management.ObjectName;
028    import javax.management.ReflectionException;
029    
030    /**
031     * MBean that invokes the requested operation using an async operation and waits for the result
032     * if the operation times out then an exception is thrown.
033     */
034    public class AsyncAnnotatedMBean extends AnnotatedMBean {
035    
036        private ExecutorService executor;
037        private long timeout = 0;
038    
039        public <T> AsyncAnnotatedMBean(ExecutorService executor, long timeout, T impl, Class<T> mbeanInterface) throws NotCompliantMBeanException {
040            super(impl, mbeanInterface);
041    
042            this.executor = executor;
043            this.timeout = timeout;
044        }
045    
046        protected AsyncAnnotatedMBean(Class<?> mbeanInterface) throws NotCompliantMBeanException {
047            super(mbeanInterface);
048        }
049    
050        protected Object asyncInvole(String s, Object[] objects, String[] strings) throws MBeanException, ReflectionException {
051            return super.invoke(s, objects, strings);
052        }
053    
054        @SuppressWarnings({ "unchecked", "rawtypes" })
055        public static void registerMBean(ExecutorService executor, long timeout, ManagementContext context, Object object, ObjectName objectName) throws Exception {
056    
057            if (timeout < 0 && executor != null) {
058                throw new IllegalArgumentException("async timeout cannot be negative.");
059            }
060    
061            if (timeout > 0 && executor == null) {
062                throw new NullPointerException("timeout given but no ExecutorService instance given.");
063            }
064    
065            String mbeanName = object.getClass().getName() + "MBean";
066    
067            for (Class c : object.getClass().getInterfaces()) {
068                if (mbeanName.equals(c.getName())) {
069                    if (timeout == 0) {
070                        context.registerMBean(new AnnotatedMBean(object, c), objectName);
071                    } else {
072                        context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c), objectName);
073                    }
074                    return;
075                }
076            }
077    
078            context.registerMBean(object, objectName);
079        }
080    
081        @Override
082        public Object invoke(String s, Object[] objects, String[] strings) throws MBeanException, ReflectionException {
083    
084            final String action = s;
085            final Object[] params = objects;
086            final String[] signature = strings;
087    
088            Future<Object> task = executor.submit(new Callable<Object>() {
089    
090                @Override
091                public Object call() throws Exception {
092                    return asyncInvole(action, params, signature);
093                }
094            });
095    
096            try {
097                return task.get(timeout, TimeUnit.MILLISECONDS);
098            } catch (ExecutionException e) {
099                if (e.getCause() instanceof MBeanException) {
100                    throw (MBeanException) e.getCause();
101                }
102    
103                throw new MBeanException(e);
104            } catch (Exception e) {
105                throw new MBeanException(e);
106            } finally {
107                if (!task.isDone()) {
108                    task.cancel(true);
109                }
110            }
111        }
112    }