001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.tcp;
018    
019    import java.io.IOException;
020    import java.net.InetAddress;
021    import java.net.InetSocketAddress;
022    import java.net.ServerSocket;
023    import java.net.Socket;
024    import java.net.SocketException;
025    import java.net.SocketTimeoutException;
026    import java.net.URI;
027    import java.net.URISyntaxException;
028    import java.net.UnknownHostException;
029    import java.util.HashMap;
030    import java.util.concurrent.BlockingQueue;
031    import java.util.concurrent.LinkedBlockingQueue;
032    import java.util.concurrent.TimeUnit;
033    
034    import javax.net.ServerSocketFactory;
035    
036    import org.apache.activemq.Service;
037    import org.apache.activemq.ThreadPriorities;
038    import org.apache.activemq.command.BrokerInfo;
039    import org.apache.activemq.openwire.OpenWireFormatFactory;
040    import org.apache.activemq.transport.Transport;
041    import org.apache.activemq.transport.TransportLoggerFactory;
042    import org.apache.activemq.transport.TransportServer;
043    import org.apache.activemq.transport.TransportServerThreadSupport;
044    import org.apache.activemq.util.IOExceptionSupport;
045    import org.apache.activemq.util.InetAddressUtil;
046    import org.apache.activemq.util.IntrospectionSupport;
047    import org.apache.activemq.util.ServiceListener;
048    import org.apache.activemq.util.ServiceStopper;
049    import org.apache.activemq.util.ServiceSupport;
050    import org.apache.activemq.wireformat.WireFormat;
051    import org.apache.activemq.wireformat.WireFormatFactory;
052    import org.slf4j.Logger;
053    import org.slf4j.LoggerFactory;
054    
055    /**
056     * A TCP based implementation of {@link TransportServer}
057     * 
058     * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
059     * 
060     */
061    
062    public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
063    
064        private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
065        protected ServerSocket serverSocket;
066        protected int backlog = 5000;
067        protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
068        protected final TcpTransportFactory transportFactory;
069        protected long maxInactivityDuration = 30000;
070        protected long maxInactivityDurationInitalDelay = 10000;
071        protected int minmumWireFormatVersion;
072        protected boolean useQueueForAccept=true;
073           
074        /**
075         * trace=true -> the Transport stack where this TcpTransport
076         * object will be, will have a TransportLogger layer
077         * trace=false -> the Transport stack where this TcpTransport
078         * object will be, will NOT have a TransportLogger layer, and therefore
079         * will never be able to print logging messages.
080         * This parameter is most probably set in Connection or TransportConnector URIs.
081         */
082        protected boolean trace = false;
083    
084        protected int soTimeout = 0;
085        protected int socketBufferSize = 64 * 1024;
086        protected int connectionTimeout =  30000;
087    
088        /**
089         * Name of the LogWriter implementation to use.
090         * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
091         * This parameter is most probably set in Connection or TransportConnector URIs.
092         */
093        protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
094        /**
095         * Specifies if the TransportLogger will be manageable by JMX or not.
096         * Also, as long as there is at least 1 TransportLogger which is manageable,
097         * a TransportLoggerControl MBean will me created.
098         */
099        protected boolean dynamicManagement = false;
100        /**
101         * startLogging=true -> the TransportLogger object of the Transport stack
102         * will initially write messages to the log.
103         * startLogging=false -> the TransportLogger object of the Transport stack
104         * will initially NOT write messages to the log.
105         * This parameter only has an effect if trace == true.
106         * This parameter is most probably set in Connection or TransportConnector URIs.
107         */
108        protected boolean startLogging = true;
109        protected final ServerSocketFactory serverSocketFactory;
110        protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
111        protected Thread socketHandlerThread;
112        /**
113         * The maximum number of sockets allowed for this server
114         */
115        protected int maximumConnections = Integer.MAX_VALUE;
116        protected int currentTransportCount=0;
117      
118        public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
119            super(location);
120            this.transportFactory = transportFactory;
121            this.serverSocketFactory = serverSocketFactory;
122            
123        }
124    
125        public void bind() throws IOException {
126            URI bind = getBindLocation();
127    
128            String host = bind.getHost();
129            host = (host == null || host.length() == 0) ? "localhost" : host;
130            InetAddress addr = InetAddress.getByName(host);
131    
132            try {
133    
134                this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
135                configureServerSocket(this.serverSocket);
136                
137            } catch (IOException e) {
138                throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
139            }
140            try {
141                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
142                    .getFragment()));
143            } catch (URISyntaxException e) {
144    
145                // it could be that the host name contains invalid characters such
146                // as _ on unix platforms
147                // so lets try use the IP address instead
148                try {
149                    setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
150                } catch (URISyntaxException e2) {
151                    throw IOExceptionSupport.create(e2);
152                }
153            }
154        }
155    
156        private void configureServerSocket(ServerSocket socket) throws SocketException {
157            socket.setSoTimeout(2000);
158            if (transportOptions != null) {
159                IntrospectionSupport.setProperties(socket, transportOptions);
160            }
161        }
162    
163        /**
164         * @return Returns the wireFormatFactory.
165         */
166        public WireFormatFactory getWireFormatFactory() {
167            return wireFormatFactory;
168        }
169    
170        /**
171         * @param wireFormatFactory The wireFormatFactory to set.
172         */
173        public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
174            this.wireFormatFactory = wireFormatFactory;
175        }
176    
177        /**
178         * Associates a broker info with the transport server so that the transport
179         * can do discovery advertisements of the broker.
180         * 
181         * @param brokerInfo
182         */
183        public void setBrokerInfo(BrokerInfo brokerInfo) {
184        }
185    
186        public long getMaxInactivityDuration() {
187            return maxInactivityDuration;
188        }
189    
190        public void setMaxInactivityDuration(long maxInactivityDuration) {
191            this.maxInactivityDuration = maxInactivityDuration;
192        }
193        
194        public long getMaxInactivityDurationInitalDelay() {
195            return this.maxInactivityDurationInitalDelay;
196        }
197    
198        public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
199            this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
200        }
201    
202        public int getMinmumWireFormatVersion() {
203            return minmumWireFormatVersion;
204        }
205    
206        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
207            this.minmumWireFormatVersion = minmumWireFormatVersion;
208        }
209    
210        public boolean isTrace() {
211            return trace;
212        }
213    
214        public void setTrace(boolean trace) {
215            this.trace = trace;
216        }
217        
218        public String getLogWriterName() {
219            return logWriterName;
220        }
221    
222        public void setLogWriterName(String logFormat) {
223            this.logWriterName = logFormat;
224        }        
225    
226        public boolean isDynamicManagement() {
227            return dynamicManagement;
228        }
229    
230        public void setDynamicManagement(boolean useJmx) {
231            this.dynamicManagement = useJmx;
232        }
233    
234        public boolean isStartLogging() {
235            return startLogging;
236        }
237    
238    
239        public void setStartLogging(boolean startLogging) {
240            this.startLogging = startLogging;
241        }
242        
243        /**
244         * @return the backlog
245         */
246        public int getBacklog() {
247            return backlog;
248        }
249    
250        /**
251         * @param backlog the backlog to set
252         */
253        public void setBacklog(int backlog) {
254            this.backlog = backlog;
255        }
256    
257        /**
258         * @return the useQueueForAccept
259         */
260        public boolean isUseQueueForAccept() {
261            return useQueueForAccept;
262        }
263    
264        /**
265         * @param useQueueForAccept the useQueueForAccept to set
266         */
267        public void setUseQueueForAccept(boolean useQueueForAccept) {
268            this.useQueueForAccept = useQueueForAccept;
269        }
270        
271    
272        /**
273         * pull Sockets from the ServerSocket
274         */
275        public void run() {
276            while (!isStopped()) {
277                Socket socket = null;
278                try {
279                    socket = serverSocket.accept();
280                    if (socket != null) {
281                        if (isStopped() || getAcceptListener() == null) {
282                            socket.close();
283                        } else {
284                            if (useQueueForAccept) {
285                                socketQueue.put(socket);
286                            }else {
287                                handleSocket(socket);
288                            }
289                        }
290                    }
291                } catch (SocketTimeoutException ste) {
292                    // expect this to happen
293                } catch (Exception e) {
294                    if (!isStopping()) {
295                        onAcceptError(e);
296                    } else if (!isStopped()) {
297                        LOG.warn("run()", e);
298                        onAcceptError(e);
299                    }
300                }
301            }
302        }
303    
304        /**
305         * Allow derived classes to override the Transport implementation that this
306         * transport server creates.
307         * 
308         * @param socket
309         * @param format
310         * @return
311         * @throws IOException
312         */
313        protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
314            return new TcpTransport(format, socket);
315        }
316    
317        /**
318         * @return pretty print of this
319         */
320        public String toString() {
321            return "" + getBindLocation();
322        }
323    
324        /**
325         * @param socket 
326         * @param inetAddress
327         * @return real hostName
328         * @throws UnknownHostException
329         */
330        protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
331            String result = null;
332            if (socket.isBound()) {
333                if (socket.getInetAddress().isAnyLocalAddress()) {
334                    // make it more human readable and useful, an alternative to 0.0.0.0
335                    result = InetAddressUtil.getLocalHostName();
336                } else {
337                    result = socket.getInetAddress().getCanonicalHostName();
338                }
339            } else {
340                result = bindAddress.getCanonicalHostName();
341            }
342            return result;
343        }
344        
345        protected void doStart() throws Exception {
346            if(useQueueForAccept) {
347                Runnable run = new Runnable() {
348                    public void run() {
349                        try {
350                            while (!isStopped() && !isStopping()) {
351                                Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
352                                if (sock != null) {
353                                    handleSocket(sock);
354                                }
355                            }
356        
357                        } catch (InterruptedException e) {
358                            LOG.info("socketQueue interuppted - stopping");
359                            if (!isStopping()) {
360                                onAcceptError(e);
361                            }
362                        }
363        
364                    }
365        
366                };
367                socketHandlerThread = new Thread(null, run,
368                        "ActiveMQ Transport Server Thread Handler: " + toString(),
369                        getStackSize());
370                socketHandlerThread.setDaemon(true);
371                socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
372                socketHandlerThread.start();
373            }
374            super.doStart();
375            
376        }
377    
378        protected void doStop(ServiceStopper stopper) throws Exception {
379            super.doStop(stopper);
380            if (serverSocket != null) {
381                serverSocket.close();
382            }
383        }
384    
385        public InetSocketAddress getSocketAddress() {
386            return (InetSocketAddress)serverSocket.getLocalSocketAddress();
387        }
388    
389        protected final void handleSocket(Socket socket) {
390            try {
391                if (this.currentTransportCount >= this.maximumConnections) {
392                    throw new ExceededMaximumConnectionsException("Exceeded the maximum " + 
393                        "number of allowed client connections. See the 'maximumConnections' " + 
394                        "property on the TCP transport configuration URI in the ActiveMQ " + 
395                        "configuration file (e.g., activemq.xml)"); 
396                    
397                } else {
398                    HashMap<String, Object> options = new HashMap<String, Object>();
399                    options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
400                    options.put("maxInactivityDurationInitalDelay", 
401                        Long.valueOf(maxInactivityDurationInitalDelay));
402                    options.put("minmumWireFormatVersion", 
403                        Integer.valueOf(minmumWireFormatVersion));
404                    options.put("trace", Boolean.valueOf(trace));
405                    options.put("soTimeout", Integer.valueOf(soTimeout));
406                    options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
407                    options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
408                    options.put("logWriterName", logWriterName);
409                    options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
410                    options.put("startLogging", Boolean.valueOf(startLogging));
411                    options.putAll(transportOptions);
412    
413                    WireFormat format = wireFormatFactory.createWireFormat();
414                    Transport transport = createTransport(socket, format);
415    
416                    if (transport instanceof ServiceSupport) {
417                        ((ServiceSupport) transport).addServiceListener(this);
418                    }
419    
420                    Transport configuredTransport = 
421                        transportFactory.serverConfigure( transport, format, options);
422    
423                    getAcceptListener().onAccept(configuredTransport);
424                }
425            } catch (SocketTimeoutException ste) {
426                // expect this to happen
427            } catch (Exception e) {
428                if (!isStopping()) {
429                    onAcceptError(e);
430                } else if (!isStopped()) {
431                    LOG.warn("run()", e);
432                    onAcceptError(e);
433                }
434            }
435            
436        }    
437    
438            public int getSoTimeout() {
439                    return soTimeout;
440            }
441    
442            public void setSoTimeout(int soTimeout) {
443                    this.soTimeout = soTimeout;
444            }
445    
446            public int getSocketBufferSize() {
447                    return socketBufferSize;
448            }
449    
450            public void setSocketBufferSize(int socketBufferSize) {
451                    this.socketBufferSize = socketBufferSize;
452            }
453    
454            public int getConnectionTimeout() {
455                    return connectionTimeout;
456            }
457    
458            public void setConnectionTimeout(int connectionTimeout) {
459                    this.connectionTimeout = connectionTimeout;
460            }
461    
462        /**
463         * @return the maximumConnections
464         */
465        public int getMaximumConnections() {
466            return maximumConnections;
467        }
468    
469        /**
470         * @param maximumConnections the maximumConnections to set
471         */
472        public void setMaximumConnections(int maximumConnections) {
473            this.maximumConnections = maximumConnections;
474        }
475    
476        
477        public void started(Service service) {
478           this.currentTransportCount++;
479        }
480    
481        public void stopped(Service service) {
482            this.currentTransportCount--;
483        }
484    }