001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
028import org.apache.activemq.broker.region.DestinationInterceptor;
029import org.apache.activemq.broker.region.RegionBroker;
030import org.apache.activemq.broker.region.virtual.VirtualDestination;
031import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035public abstract class UpdateVirtualDestinationsTask implements Runnable {
036
037    public static final Logger LOG = LoggerFactory.getLogger(UpdateVirtualDestinationsTask.class);
038    private final AbstractRuntimeConfigurationBroker plugin;
039
040    public UpdateVirtualDestinationsTask(
041            AbstractRuntimeConfigurationBroker plugin) {
042        super();
043        this.plugin = plugin;
044    }
045
046    @Override
047    public void run() {
048
049        boolean updatedExistingInterceptor = false;
050        RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService()
051                .getRegionBroker();
052
053        for (DestinationInterceptor destinationInterceptor : plugin
054                .getBrokerService().getDestinationInterceptors()) {
055            if (destinationInterceptor instanceof VirtualDestinationInterceptor) {
056                // update existing interceptor
057                final VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) destinationInterceptor;
058
059                Set<VirtualDestination> existingVirtualDests = new HashSet<>();
060                Collections.addAll(existingVirtualDests, virtualDestinationInterceptor.getVirtualDestinations());
061
062                Set<VirtualDestination> newVirtualDests = new HashSet<>();
063                Collections.addAll(newVirtualDests, getVirtualDestinations());
064
065                Set<VirtualDestination> addedVirtualDests = new HashSet<>();
066                Set<VirtualDestination> removedVirtualDests = new HashSet<>();
067                //detect new virtual destinations
068                for (VirtualDestination newVirtualDest : newVirtualDests) {
069                    if (!existingVirtualDests.contains(newVirtualDest)) {
070                        addedVirtualDests.add(newVirtualDest);
071                    }
072                }
073                //detect removed virtual destinations
074                for (VirtualDestination existingVirtualDest : existingVirtualDests) {
075                    if (!newVirtualDests.contains(existingVirtualDest)) {
076                        removedVirtualDests.add(existingVirtualDest);
077                    }
078                }
079
080                virtualDestinationInterceptor
081                        .setVirtualDestinations(getVirtualDestinations());
082                plugin.info("applied updates to: "
083                        + virtualDestinationInterceptor);
084                updatedExistingInterceptor = true;
085
086                ConnectionContext connectionContext;
087                try {
088                    connectionContext = plugin.getBrokerService().getAdminConnectionContext();
089                    //signal updates
090                    if (plugin.getBrokerService().isUseVirtualDestSubs()) {
091                        for (VirtualDestination removedVirtualDest : removedVirtualDests) {
092                            plugin.virtualDestinationRemoved(connectionContext, removedVirtualDest);
093                            LOG.info("Removing virtual destination: {}", removedVirtualDest);
094                        }
095
096                        for (VirtualDestination addedVirtualDest : addedVirtualDests) {
097                            plugin.virtualDestinationAdded(connectionContext, addedVirtualDest);
098                            LOG.info("Adding virtual destination: {}", addedVirtualDest);
099                        }
100                    }
101
102                } catch (Exception e) {
103                    LOG.warn("Could not process virtual destination advisories", e);
104                }
105            }
106        }
107
108        if (!updatedExistingInterceptor) {
109            // add
110            VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
111            virtualDestinationInterceptor.setVirtualDestinations(getVirtualDestinations());
112
113            List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
114            interceptorsList.addAll(Arrays.asList(plugin.getBrokerService()
115                    .getDestinationInterceptors()));
116            interceptorsList.add(virtualDestinationInterceptor);
117
118            DestinationInterceptor[] destinationInterceptors = interceptorsList
119                    .toArray(new DestinationInterceptor[] {});
120            plugin.getBrokerService().setDestinationInterceptors(
121                    destinationInterceptors);
122
123            ((CompositeDestinationInterceptor) regionBroker
124                    .getDestinationInterceptor())
125                    .setInterceptors(destinationInterceptors);
126            plugin.info("applied new: " + interceptorsList);
127        }
128        regionBroker.reapplyInterceptor();
129    }
130
131    protected abstract VirtualDestination[] getVirtualDestinations();
132}