001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.tcp;
018    
019    import java.io.IOException;
020    import java.net.InetAddress;
021    import java.net.InetSocketAddress;
022    import java.net.ServerSocket;
023    import java.net.Socket;
024    import java.net.SocketException;
025    import java.net.SocketTimeoutException;
026    import java.net.URI;
027    import java.net.URISyntaxException;
028    import java.net.UnknownHostException;
029    import java.util.HashMap;
030    import java.util.concurrent.BlockingQueue;
031    import java.util.concurrent.LinkedBlockingQueue;
032    import java.util.concurrent.TimeUnit;
033    
034    import javax.net.ServerSocketFactory;
035    
036    import org.apache.activemq.Service;
037    import org.apache.activemq.ThreadPriorities;
038    import org.apache.activemq.TransportLoggerSupport;
039    import org.apache.activemq.command.BrokerInfo;
040    import org.apache.activemq.openwire.OpenWireFormatFactory;
041    import org.apache.activemq.transport.Transport;
042    import org.apache.activemq.transport.TransportServer;
043    import org.apache.activemq.transport.TransportServerThreadSupport;
044    import org.apache.activemq.util.IOExceptionSupport;
045    import org.apache.activemq.util.InetAddressUtil;
046    import org.apache.activemq.util.IntrospectionSupport;
047    import org.apache.activemq.util.ServiceListener;
048    import org.apache.activemq.util.ServiceStopper;
049    import org.apache.activemq.util.ServiceSupport;
050    import org.apache.activemq.wireformat.WireFormat;
051    import org.apache.activemq.wireformat.WireFormatFactory;
052    import org.slf4j.Logger;
053    import org.slf4j.LoggerFactory;
054    
055    /**
056     * A TCP based implementation of {@link TransportServer}
057     *
058     * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
059     *
060     */
061    
062    public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
063    
064        private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
065        protected ServerSocket serverSocket;
066        protected int backlog = 5000;
067        protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
068        protected final TcpTransportFactory transportFactory;
069        protected long maxInactivityDuration = 30000;
070        protected long maxInactivityDurationInitalDelay = 10000;
071        protected int minmumWireFormatVersion;
072        protected boolean useQueueForAccept=true;
073    
074        /**
075         * trace=true -> the Transport stack where this TcpTransport
076         * object will be, will have a TransportLogger layer
077         * trace=false -> the Transport stack where this TcpTransport
078         * object will be, will NOT have a TransportLogger layer, and therefore
079         * will never be able to print logging messages.
080         * This parameter is most probably set in Connection or TransportConnector URIs.
081         */
082        protected boolean trace = false;
083    
084        protected int soTimeout = 0;
085        protected int socketBufferSize = 64 * 1024;
086        protected int connectionTimeout =  30000;
087    
088        /**
089         * Name of the LogWriter implementation to use.
090         * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
091         * This parameter is most probably set in Connection or TransportConnector URIs.
092         */
093        protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
094        /**
095         * Specifies if the TransportLogger will be manageable by JMX or not.
096         * Also, as long as there is at least 1 TransportLogger which is manageable,
097         * a TransportLoggerControl MBean will me created.
098         */
099        protected boolean dynamicManagement = false;
100        /**
101         * startLogging=true -> the TransportLogger object of the Transport stack
102         * will initially write messages to the log.
103         * startLogging=false -> the TransportLogger object of the Transport stack
104         * will initially NOT write messages to the log.
105         * This parameter only has an effect if trace == true.
106         * This parameter is most probably set in Connection or TransportConnector URIs.
107         */
108        protected boolean startLogging = true;
109        protected final ServerSocketFactory serverSocketFactory;
110        protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
111        protected Thread socketHandlerThread;
112        /**
113         * The maximum number of sockets allowed for this server
114         */
115        protected int maximumConnections = Integer.MAX_VALUE;
116        protected int currentTransportCount=0;
117    
118        public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
119            super(location);
120            this.transportFactory = transportFactory;
121            this.serverSocketFactory = serverSocketFactory;
122        }
123    
124        public void bind() throws IOException {
125            URI bind = getBindLocation();
126    
127            String host = bind.getHost();
128            host = (host == null || host.length() == 0) ? "localhost" : host;
129            InetAddress addr = InetAddress.getByName(host);
130    
131            try {
132                this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
133                configureServerSocket(this.serverSocket);
134            } catch (IOException e) {
135                throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
136            }
137            try {
138                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
139                    .getFragment()));
140            } catch (URISyntaxException e) {
141    
142                // it could be that the host name contains invalid characters such
143                // as _ on unix platforms
144                // so lets try use the IP address instead
145                try {
146                    setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
147                } catch (URISyntaxException e2) {
148                    throw IOExceptionSupport.create(e2);
149                }
150            }
151        }
152    
153        private void configureServerSocket(ServerSocket socket) throws SocketException {
154            socket.setSoTimeout(2000);
155            if (transportOptions != null) {
156                IntrospectionSupport.setProperties(socket, transportOptions);
157            }
158        }
159    
160        /**
161         * @return Returns the wireFormatFactory.
162         */
163        public WireFormatFactory getWireFormatFactory() {
164            return wireFormatFactory;
165        }
166    
167        /**
168         * @param wireFormatFactory The wireFormatFactory to set.
169         */
170        public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
171            this.wireFormatFactory = wireFormatFactory;
172        }
173    
174        /**
175         * Associates a broker info with the transport server so that the transport
176         * can do discovery advertisements of the broker.
177         *
178         * @param brokerInfo
179         */
180        public void setBrokerInfo(BrokerInfo brokerInfo) {
181        }
182    
183        public long getMaxInactivityDuration() {
184            return maxInactivityDuration;
185        }
186    
187        public void setMaxInactivityDuration(long maxInactivityDuration) {
188            this.maxInactivityDuration = maxInactivityDuration;
189        }
190    
191        public long getMaxInactivityDurationInitalDelay() {
192            return this.maxInactivityDurationInitalDelay;
193        }
194    
195        public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
196            this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
197        }
198    
199        public int getMinmumWireFormatVersion() {
200            return minmumWireFormatVersion;
201        }
202    
203        public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
204            this.minmumWireFormatVersion = minmumWireFormatVersion;
205        }
206    
207        public boolean isTrace() {
208            return trace;
209        }
210    
211        public void setTrace(boolean trace) {
212            this.trace = trace;
213        }
214    
215        public String getLogWriterName() {
216            return logWriterName;
217        }
218    
219        public void setLogWriterName(String logFormat) {
220            this.logWriterName = logFormat;
221        }
222    
223        public boolean isDynamicManagement() {
224            return dynamicManagement;
225        }
226    
227        public void setDynamicManagement(boolean useJmx) {
228            this.dynamicManagement = useJmx;
229        }
230    
231        public boolean isStartLogging() {
232            return startLogging;
233        }
234    
235        public void setStartLogging(boolean startLogging) {
236            this.startLogging = startLogging;
237        }
238    
239        /**
240         * @return the backlog
241         */
242        public int getBacklog() {
243            return backlog;
244        }
245    
246        /**
247         * @param backlog the backlog to set
248         */
249        public void setBacklog(int backlog) {
250            this.backlog = backlog;
251        }
252    
253        /**
254         * @return the useQueueForAccept
255         */
256        public boolean isUseQueueForAccept() {
257            return useQueueForAccept;
258        }
259    
260        /**
261         * @param useQueueForAccept the useQueueForAccept to set
262         */
263        public void setUseQueueForAccept(boolean useQueueForAccept) {
264            this.useQueueForAccept = useQueueForAccept;
265        }
266    
267        /**
268         * pull Sockets from the ServerSocket
269         */
270        public void run() {
271            while (!isStopped()) {
272                Socket socket = null;
273                try {
274                    socket = serverSocket.accept();
275                    if (socket != null) {
276                        if (isStopped() || getAcceptListener() == null) {
277                            socket.close();
278                        } else {
279                            if (useQueueForAccept) {
280                                socketQueue.put(socket);
281                            }else {
282                                handleSocket(socket);
283                            }
284                        }
285                    }
286                } catch (SocketTimeoutException ste) {
287                    // expect this to happen
288                } catch (Exception e) {
289                    if (!isStopping()) {
290                        onAcceptError(e);
291                    } else if (!isStopped()) {
292                        LOG.warn("run()", e);
293                        onAcceptError(e);
294                    }
295                }
296            }
297        }
298    
299        /**
300         * Allow derived classes to override the Transport implementation that this
301         * transport server creates.
302         *
303         * @param socket
304         * @param format
305         * @return
306         * @throws IOException
307         */
308        protected  Transport createTransport(Socket socket, WireFormat format) throws IOException {
309            return new TcpTransport(format, socket);
310        }
311    
312        /**
313         * @return pretty print of this
314         */
315        public String toString() {
316            return "" + getBindLocation();
317        }
318    
319        /**
320         * @param socket
321         * @param inetAddress
322         * @return real hostName
323         * @throws UnknownHostException
324         */
325        protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
326            String result = null;
327            if (socket.isBound()) {
328                if (socket.getInetAddress().isAnyLocalAddress()) {
329                    // make it more human readable and useful, an alternative to 0.0.0.0
330                    result = InetAddressUtil.getLocalHostName();
331                } else {
332                    result = socket.getInetAddress().getCanonicalHostName();
333                }
334            } else {
335                result = bindAddress.getCanonicalHostName();
336            }
337            return result;
338        }
339    
340        protected void doStart() throws Exception {
341            if(useQueueForAccept) {
342                Runnable run = new Runnable() {
343                    public void run() {
344                        try {
345                            while (!isStopped() && !isStopping()) {
346                                Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
347                                if (sock != null) {
348                                    handleSocket(sock);
349                                }
350                            }
351    
352                        } catch (InterruptedException e) {
353                            LOG.info("socketQueue interuppted - stopping");
354                            if (!isStopping()) {
355                                onAcceptError(e);
356                            }
357                        }
358    
359                    }
360    
361                };
362                socketHandlerThread = new Thread(null, run,
363                        "ActiveMQ Transport Server Thread Handler: " + toString(),
364                        getStackSize());
365                socketHandlerThread.setDaemon(true);
366                socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
367                socketHandlerThread.start();
368            }
369            super.doStart();
370    
371        }
372    
373        protected void doStop(ServiceStopper stopper) throws Exception {
374            super.doStop(stopper);
375            if (serverSocket != null) {
376                serverSocket.close();
377            }
378        }
379    
380        public InetSocketAddress getSocketAddress() {
381            return (InetSocketAddress)serverSocket.getLocalSocketAddress();
382        }
383    
384        protected final void handleSocket(Socket socket) {
385            try {
386                if (this.currentTransportCount >= this.maximumConnections) {
387                    throw new ExceededMaximumConnectionsException("Exceeded the maximum " +
388                        "number of allowed client connections. See the 'maximumConnections' " +
389                        "property on the TCP transport configuration URI in the ActiveMQ " +
390                        "configuration file (e.g., activemq.xml)");
391    
392                } else {
393                    HashMap<String, Object> options = new HashMap<String, Object>();
394                    options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
395                    options.put("maxInactivityDurationInitalDelay",
396                        Long.valueOf(maxInactivityDurationInitalDelay));
397                    options.put("minmumWireFormatVersion",
398                        Integer.valueOf(minmumWireFormatVersion));
399                    options.put("trace", Boolean.valueOf(trace));
400                    options.put("soTimeout", Integer.valueOf(soTimeout));
401                    options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
402                    options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
403                    options.put("logWriterName", logWriterName);
404                    options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
405                    options.put("startLogging", Boolean.valueOf(startLogging));
406                    options.putAll(transportOptions);
407    
408                    WireFormat format = wireFormatFactory.createWireFormat();
409                    Transport transport = createTransport(socket, format);
410    
411                    if (transport instanceof ServiceSupport) {
412                        ((ServiceSupport) transport).addServiceListener(this);
413                    }
414    
415                    Transport configuredTransport =
416                        transportFactory.serverConfigure( transport, format, options);
417    
418                    getAcceptListener().onAccept(configuredTransport);
419                }
420            } catch (SocketTimeoutException ste) {
421                // expect this to happen
422            } catch (Exception e) {
423                if (!isStopping()) {
424                    onAcceptError(e);
425                } else if (!isStopped()) {
426                    LOG.warn("run()", e);
427                    onAcceptError(e);
428                }
429            }
430    
431        }
432    
433        public int getSoTimeout() {
434            return soTimeout;
435        }
436    
437        public void setSoTimeout(int soTimeout) {
438            this.soTimeout = soTimeout;
439        }
440    
441        public int getSocketBufferSize() {
442            return socketBufferSize;
443        }
444    
445        public void setSocketBufferSize(int socketBufferSize) {
446            this.socketBufferSize = socketBufferSize;
447        }
448    
449        public int getConnectionTimeout() {
450            return connectionTimeout;
451        }
452    
453        public void setConnectionTimeout(int connectionTimeout) {
454            this.connectionTimeout = connectionTimeout;
455        }
456    
457        /**
458         * @return the maximumConnections
459         */
460        public int getMaximumConnections() {
461            return maximumConnections;
462        }
463    
464        /**
465         * @param maximumConnections the maximumConnections to set
466         */
467        public void setMaximumConnections(int maximumConnections) {
468            this.maximumConnections = maximumConnections;
469        }
470    
471        public void started(Service service) {
472           this.currentTransportCount++;
473        }
474    
475        public void stopped(Service service) {
476            this.currentTransportCount--;
477        }
478    
479        @Override
480        public boolean isSslServer() {
481            return false;
482        }
483    }