View Javadoc

1   //========================================================================
2   //Copyright 2004-2008 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at 
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.io.nio;
16  
17  import java.io.IOException;
18  import java.nio.channels.CancelledKeyException;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.Selector;
21  import java.nio.channels.ServerSocketChannel;
22  import java.nio.channels.SocketChannel;
23  import java.util.ArrayList;
24  import java.util.Iterator;
25  import java.util.List;
26  
27  import org.mortbay.component.AbstractLifeCycle;
28  import org.mortbay.component.LifeCycle;
29  import org.mortbay.io.Connection;
30  import org.mortbay.io.EndPoint;
31  import org.mortbay.log.Log;
32  import org.mortbay.thread.Timeout;
33  
34  
35  /* ------------------------------------------------------------ */
36  /**
37   * The Selector Manager manages and number of SelectSets to allow
38   * NIO scheduling to scale to large numbers of connections.
39   * 
40   * @author gregw
41   *
42   */
43  public abstract class SelectorManager extends AbstractLifeCycle
44  {
45      private boolean _delaySelectKeyUpdate=true;
46      private long _maxIdleTime;
47      private long _lowResourcesConnections;
48      private long _lowResourcesMaxIdleTime;
49      private transient SelectSet[] _selectSet;
50      private int _selectSets=1;
51      private volatile int _set;
52      
53  
54      /* ------------------------------------------------------------ */
55      /**
56       * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
57       * @see {@link #setLowResourcesMaxIdleTime(long)}
58       */
59      public void setMaxIdleTime(long maxIdleTime)
60      {
61          _maxIdleTime=maxIdleTime;
62      }
63      
64      /* ------------------------------------------------------------ */
65      /**
66       * @param selectSets
67       */
68      public void setSelectSets(int selectSets)
69      {
70          long lrc = _lowResourcesConnections * _selectSets; 
71          _selectSets=selectSets;
72          _lowResourcesConnections=lrc/_selectSets;
73      }
74      
75      /* ------------------------------------------------------------ */
76      /**
77       * @return
78       */
79      public long getMaxIdleTime()
80      {
81          return _maxIdleTime;
82      }
83      
84      /* ------------------------------------------------------------ */
85      /**
86       * @return
87       */
88      public int getSelectSets()
89      {
90          return _selectSets;
91      }
92      
93      /* ------------------------------------------------------------ */
94      /**
95       * @return
96       */
97      public boolean isDelaySelectKeyUpdate()
98      {
99          return _delaySelectKeyUpdate;
100     }
101 
102     /* ------------------------------------------------------------ */
103     /** Register a channel
104      * @param channel
105      * @param att Attached Object
106      * @throws IOException
107      */
108     public void register(SocketChannel channel, Object att) throws IOException
109     {
110         int s=_set++; 
111         s=s%_selectSets;
112         SelectSet[] sets=_selectSet;
113         if (sets!=null)
114         {
115             SelectSet set=sets[s];
116             set.addChange(channel,att);
117             set.wakeup();
118         }
119     }
120     
121     /* ------------------------------------------------------------ */
122     /** Register a serverchannel
123      * @param acceptChannel
124      * @return
125      * @throws IOException
126      */
127     public void register(ServerSocketChannel acceptChannel) throws IOException
128     {
129         int s=_set++; 
130         s=s%_selectSets;
131         SelectSet set=_selectSet[s];
132         set.addChange(acceptChannel);
133         set.wakeup();
134     }
135 
136     /* ------------------------------------------------------------ */
137     /**
138      * @return the lowResourcesConnections
139      */
140     public long getLowResourcesConnections()
141     {
142         return _lowResourcesConnections*_selectSets;
143     }
144 
145     /* ------------------------------------------------------------ */
146     /**
147      * Set the number of connections, which if exceeded places this manager in low resources state.
148      * This is not an exact measure as the connection count is averaged over the select sets.
149      * @param lowResourcesConnections the number of connections
150      * @see {@link #setLowResourcesMaxIdleTime(long)}
151      */
152     public void setLowResourcesConnections(long lowResourcesConnections)
153     {
154         _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets;
155     }
156 
157     /* ------------------------------------------------------------ */
158     /**
159      * @return the lowResourcesMaxIdleTime
160      */
161     public long getLowResourcesMaxIdleTime()
162     {
163         return _lowResourcesMaxIdleTime;
164     }
165 
166     /* ------------------------------------------------------------ */
167     /**
168      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when this SelectSet has more connections than {@link #getLowResourcesConnections()}
169      * @see {@link #setMaxIdleTime(long)}
170      */
171     public void setLowResourcesMaxIdleTime(long lowResourcesMaxIdleTime)
172     {
173         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
174     }
175     
176     /* ------------------------------------------------------------ */
177     /**
178      * @param acceptorID
179      * @throws IOException
180      */
181     public void doSelect(int acceptorID) throws IOException
182     {
183         SelectSet[] sets= _selectSet;
184         if (sets!=null && sets.length>acceptorID && sets[acceptorID]!=null)
185             sets[acceptorID].doSelect();
186     }
187 
188 
189     /* ------------------------------------------------------------ */
190     /**
191      * @param delaySelectKeyUpdate
192      */
193     public void setDelaySelectKeyUpdate(boolean delaySelectKeyUpdate)
194     {
195         _delaySelectKeyUpdate=delaySelectKeyUpdate;
196     }
197 
198     /* ------------------------------------------------------------ */
199     /**
200      * @param key
201      * @return
202      * @throws IOException 
203      */
204     protected abstract SocketChannel acceptChannel(SelectionKey key) throws IOException;
205 
206     /* ------------------------------------------------------------------------------- */
207     public abstract boolean dispatch(Runnable task) throws IOException;
208 
209     /* ------------------------------------------------------------ */
210     /* (non-Javadoc)
211      * @see org.mortbay.component.AbstractLifeCycle#doStart()
212      */
213     protected void doStart() throws Exception
214     {
215         _selectSet = new SelectSet[_selectSets];
216         for (int i=0;i<_selectSet.length;i++)
217             _selectSet[i]= new SelectSet(i);
218 
219         super.doStart();
220     }
221 
222 
223     /* ------------------------------------------------------------------------------- */
224     protected void doStop() throws Exception
225     {
226         SelectSet[] sets= _selectSet;
227         _selectSet=null;
228         if (sets!=null)
229             for (int i=0;i<sets.length;i++)
230             {
231                 SelectSet set = sets[i];
232                 if (set!=null)
233                     set.stop();
234             }
235         super.doStop();
236     }
237 
238     /* ------------------------------------------------------------ */
239     /**
240      * @param endpoint
241      */
242     protected abstract void endPointClosed(SelectChannelEndPoint endpoint);
243 
244     /* ------------------------------------------------------------ */
245     /**
246      * @param endpoint
247      */
248     protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
249 
250     /* ------------------------------------------------------------------------------- */
251     protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
252 
253     /* ------------------------------------------------------------ */
254     /**
255      * @param channel
256      * @param selectSet
257      * @param sKey
258      * @return
259      * @throws IOException
260      */
261     protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException;
262 
263     /* ------------------------------------------------------------------------------- */
264     protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment)
265     {
266         Log.warn(ex);
267     }
268     
269     /* ------------------------------------------------------------------------------- */
270     /* ------------------------------------------------------------------------------- */
271     /* ------------------------------------------------------------------------------- */
272     public class SelectSet 
273     {
274         private transient int _change;
275         private transient List[] _changes;
276         private transient Timeout _idleTimeout;
277         private transient int _nextSet;
278         private transient Timeout _retryTimeout;
279         private transient Selector _selector;
280         private transient int _setID;
281         private transient int _jvmBug;
282         private volatile boolean _selecting;
283         
284         /* ------------------------------------------------------------ */
285         SelectSet(int acceptorID) throws Exception
286         {
287             _setID=acceptorID;
288 
289             _idleTimeout = new Timeout(this);
290             _idleTimeout.setDuration(getMaxIdleTime());
291             _retryTimeout = new Timeout(this);
292             _retryTimeout.setDuration(0L);
293 
294             // create a selector;
295             _selector = Selector.open();
296             _changes = new ArrayList[] {new ArrayList(),new ArrayList()};
297             _change=0;
298         }
299         
300         /* ------------------------------------------------------------ */
301         public void addChange(Object point)
302         {
303             synchronized (_changes)
304             {
305                 _changes[_change].add(point);
306                 if (point instanceof SocketChannel)
307                     _changes[_change].add(null);
308             }
309         }
310         
311         /* ------------------------------------------------------------ */
312         public void addChange(SocketChannel channel, Object att)
313         {   
314             synchronized (_changes)
315             {
316                 _changes[_change].add(channel);
317                 _changes[_change].add(att);
318             }
319         }
320         
321         /* ------------------------------------------------------------ */
322         public void cancelIdle(Timeout.Task task)
323         {
324             synchronized (this)
325             {
326                 task.cancel();
327             }
328         }
329 
330         /* ------------------------------------------------------------ */
331         /**
332          * Select and dispatch tasks found from changes and the selector.
333          * 
334          * @throws IOException
335          */
336         public void doSelect() throws IOException
337         {
338             SelectionKey key=null;
339             
340             try
341             {
342                 List changes;
343                 final Selector selector;
344                 synchronized (_changes)
345                 {
346                     changes=_changes[_change];
347                     _change=_change==0?1:0;
348                     _selecting=true;
349                     selector=_selector;
350                 }
351                 
352 
353                 // Make any key changes required
354                 for (int i = 0; i < changes.size(); i++)
355                 {
356                     try
357                     {
358                         Object o = changes.get(i);
359                         if (o instanceof EndPoint)
360                         {
361                             // Update the operations for a key.
362                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)o;
363                             endpoint.doUpdateKey();
364                         }
365                         else if (o instanceof Runnable)
366                         {
367                             dispatch((Runnable)o);
368                         }
369                         else if (o instanceof SocketChannel)
370                         {
371                             // finish accepting/connecting this connection
372                             SocketChannel channel=(SocketChannel)o;
373                             Object att = changes.get(++i);
374 
375                             if (channel.isConnected())
376                             {
377                                 key = channel.register(selector,SelectionKey.OP_READ,att);
378                                 SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
379                                 key.attach(endpoint);
380                                 endpoint.dispatch();
381                             }
382                             else
383                             {
384                                 channel.register(selector,SelectionKey.OP_CONNECT,att);
385                             }
386 
387                         }
388                         else if (o instanceof ServerSocketChannel)
389                         {
390                             ServerSocketChannel channel = (ServerSocketChannel)o;
391                             channel.register(getSelector(),SelectionKey.OP_ACCEPT);
392                         }
393                         else
394                             throw new IllegalArgumentException(o.toString());
395                     }
396                     catch (CancelledKeyException e)
397                     {
398                         if (isRunning())
399                             Log.warn(e);
400                         else
401                             Log.debug(e);
402                     }
403                 }
404                 changes.clear();
405 
406                 long idle_next = 0;
407                 long retry_next = 0;
408                 long now=System.currentTimeMillis();
409                 synchronized (this)
410                 {
411                     _idleTimeout.setNow(now);
412                     _retryTimeout.setNow(now);
413                     if (_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)
414                         _idleTimeout.setDuration(_lowResourcesMaxIdleTime);
415                     else 
416                         _idleTimeout.setDuration(_maxIdleTime);
417                     idle_next=_idleTimeout.getTimeToNext();
418                     retry_next=_retryTimeout.getTimeToNext();
419                 }
420 
421                 // workout how low to wait in select
422                 long wait = 1000L;  // not getMaxIdleTime() as the now value of the idle timers needs to be updated.
423                 if (idle_next >= 0 && wait > idle_next)
424                     wait = idle_next;
425                 if (wait > 0 && retry_next >= 0 && wait > retry_next)
426                     wait = retry_next;
427     
428                 // Do the select.
429                 if (wait > 10) // TODO tune or configure this
430                 {
431                     long before=now;
432                     int selected=selector.select(wait);
433                     now = System.currentTimeMillis();
434                     _idleTimeout.setNow(now);
435                     _retryTimeout.setNow(now);
436 
437                     // Look for JVM bug 
438                     if (selected==0 && wait>0 && (now-before)<wait/2 && selector.selectedKeys().size()==0)
439                     {
440                         if (_jvmBug++>5)  // TODO tune or configure this
441                         {
442                             // Probably JVM BUG!
443                             
444                             Iterator iter = selector.keys().iterator();
445                             while(iter.hasNext())
446                             {
447                                 key = (SelectionKey) iter.next();
448                                 if (key.isValid()&&key.interestOps()==0)
449                                 {
450                                     key.cancel();
451                                 }
452                             }
453                             selector.selectNow();
454                         } 
455                     }
456                     else
457                         _jvmBug=0;
458                 }
459                 else 
460                 {
461                     selector.selectNow();
462                     _jvmBug=0;
463                 }
464 
465                 // have we been destroyed while sleeping
466                 if (_selector==null || !selector.isOpen())
467                     return;
468 
469                 // Look for things to do
470                 Iterator iter = selector.selectedKeys().iterator();
471                 while (iter.hasNext())
472                 {
473                     key = (SelectionKey) iter.next();
474                                         
475                     try
476                     {
477                         if (!key.isValid())
478                         {
479                             key.cancel();
480                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
481                             if (endpoint != null)
482                                 endpoint.doUpdateKey();
483                             continue;
484                         }
485 
486                         Object att = key.attachment();
487                         if (att instanceof SelectChannelEndPoint)
488                         {
489                             SelectChannelEndPoint endpoint = (SelectChannelEndPoint)att;
490                             endpoint.dispatch();
491                         }
492                         else if (key.isAcceptable())
493                         {
494                             SocketChannel channel = acceptChannel(key);
495                             if (channel==null)
496                                 continue;
497 
498                             channel.configureBlocking(false);
499 
500                             // TODO make it reluctant to leave 0
501                             _nextSet=++_nextSet%_selectSet.length;
502 
503                             // Is this for this selectset
504                             if (_nextSet==_setID)
505                             {
506                                 // bind connections to this select set.
507                                 SelectionKey cKey = channel.register(_selectSet[_nextSet].getSelector(), SelectionKey.OP_READ);
508                                 SelectChannelEndPoint endpoint=newEndPoint(channel,_selectSet[_nextSet],cKey);
509                                 cKey.attach(endpoint);
510                                 if (endpoint != null)
511                                     endpoint.dispatch();
512                             }
513                             else
514                             {
515                                 // nope - give it to another.
516                                 _selectSet[_nextSet].addChange(channel);
517                                 _selectSet[_nextSet].wakeup();
518                             }
519                         }
520                         else if (key.isConnectable())
521                         {
522                             // Complete a connection of a registered channel
523                             SocketChannel channel = (SocketChannel)key.channel();
524                             boolean connected=false;
525                             try
526                             {
527                                 connected=channel.finishConnect();
528                             }
529                             catch(Exception e)
530                             {
531                                 connectionFailed(channel,e,att);
532                             }
533                             finally
534                             {
535                                 if (connected)
536                                 {
537                                     key.interestOps(SelectionKey.OP_READ);
538                                     SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
539                                     key.attach(endpoint);
540                                     endpoint.dispatch();
541                                 }
542                                 else
543                                 {
544                                     key.cancel();
545                                 }
546                             }
547                         }
548                         else
549                         {
550                             // Wrap readable registered channel in an endpoint
551                             SocketChannel channel = (SocketChannel)key.channel();
552                             SelectChannelEndPoint endpoint = newEndPoint(channel,this,key);
553                             key.attach(endpoint);
554                             if (key.isReadable())
555                                 endpoint.dispatch();                           
556                         }
557                         key = null;
558                     }
559                     catch (CancelledKeyException e)
560                     {
561                         Log.ignore(e);
562                     }
563                     catch (Exception e)
564                     {
565                         if (isRunning())
566                             Log.warn(e);
567                         else
568                             Log.ignore(e);
569 
570                         if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid())
571                         {
572                             key.interestOps(0);
573 
574                             key.cancel();
575                         } 
576                     }
577                 }
578                 
579                 // Everything always handled
580                 selector.selectedKeys().clear();
581 
582                 // tick over the timers
583                 _idleTimeout.tick(now);
584                 _retryTimeout.tick(now);
585                 
586             }
587             catch (CancelledKeyException e)
588             {
589                 Log.ignore(e);
590             }
591             finally
592             {
593                 _selecting=false;
594             }
595         }
596 
597         /* ------------------------------------------------------------ */
598         public SelectorManager getManager()
599         {
600             return SelectorManager.this;
601         }
602 
603         /* ------------------------------------------------------------ */
604         public long getNow()
605         {
606             return _idleTimeout.getNow();
607         }
608         
609         /* ------------------------------------------------------------ */
610         public void scheduleIdle(Timeout.Task task)
611         {
612             synchronized (this)
613             {
614                 if (_idleTimeout.getDuration() <= 0)
615                     return;
616                 
617                 task.schedule(_idleTimeout);
618             }
619         }
620 
621         /* ------------------------------------------------------------ */
622         public void scheduleTimeout(Timeout.Task task, long timeout)
623         {
624             synchronized (this)
625             {
626                 _retryTimeout.schedule(task, timeout);
627             }
628         }
629 
630         /* ------------------------------------------------------------ */
631         public void wakeup()
632         {
633             Selector selector = _selector;
634             if (selector!=null)
635                 selector.wakeup();
636         }
637 
638         /* ------------------------------------------------------------ */
639         Selector getSelector()
640         {
641             return _selector;
642         }
643 
644         /* ------------------------------------------------------------ */
645         void stop() throws Exception
646         {
647             boolean selecting=true;
648             while(selecting)
649             {
650                 wakeup();
651                 selecting=_selecting;
652             }
653             
654             ArrayList keys=new ArrayList(_selector.keys());
655             Iterator iter =keys.iterator();
656 
657             while (iter.hasNext())
658             {
659                 SelectionKey key = (SelectionKey)iter.next();
660                 if (key==null)
661                     continue;
662                 EndPoint endpoint = (EndPoint)key.attachment();
663                 if (endpoint!=null)
664                 {
665                     try
666                     {
667                         endpoint.close();
668                     }
669                     catch(IOException e)
670                     {
671                         Log.ignore(e);
672                     }
673                 }
674             }
675             
676             synchronized (this)
677             {
678                 selecting=_selecting;
679                 while(selecting)
680                 {
681                     wakeup();
682                     selecting=_selecting;
683                 }
684                 
685                 _idleTimeout.cancelAll();
686                 _retryTimeout.cancelAll();
687                 try
688                 {
689                     if (_selector != null)
690                         _selector.close();
691                 }
692                 catch (IOException e)
693                 {
694                     Log.ignore(e);
695                 } 
696                 _selector=null;
697             }
698         }
699     }
700 }