View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  //========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.io.IOException;
18  import java.security.SecureRandom;
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Random;
26  import java.util.Set;
27  import java.util.concurrent.ConcurrentHashMap;
28  import java.util.concurrent.CopyOnWriteArrayList;
29  
30  import javax.servlet.ServletContext;
31  import javax.servlet.http.HttpServletRequest;
32  
33  import org.cometd.Bayeux;
34  import org.cometd.BayeuxListener;
35  import org.cometd.Channel;
36  import org.cometd.ChannelBayeuxListener;
37  import org.cometd.Client;
38  import org.cometd.ClientBayeuxListener;
39  import org.cometd.Extension;
40  import org.cometd.Message;
41  import org.cometd.SecurityPolicy;
42  import org.mortbay.util.LazyList;
43  import org.mortbay.util.ajax.JSON;
44  
45  
46  /* ------------------------------------------------------------ */
47  /**
48   * @author gregw
49   * @author aabeling: added JSONP transport
50   *
51   */
52  public abstract class AbstractBayeux extends MessagePool implements Bayeux
53  {
54      public static final ChannelId META_ID=new ChannelId(META);
55      public static final ChannelId META_CONNECT_ID=new ChannelId(META_CONNECT);
56      public static final ChannelId META_CLIENT_ID=new ChannelId(META_CLIENT);
57      public static final ChannelId META_DISCONNECT_ID=new ChannelId(META_DISCONNECT);
58      public static final ChannelId META_HANDSHAKE_ID=new ChannelId(META_HANDSHAKE);
59      public static final ChannelId META_PING_ID=new ChannelId(META_PING);
60      public static final ChannelId META_STATUS_ID=new ChannelId(META_STATUS);
61      public static final ChannelId META_SUBSCRIBE_ID=new ChannelId(META_SUBSCRIBE);
62      public static final ChannelId META_UNSUBSCRIBE_ID=new ChannelId(META_UNSUBSCRIBE);
63  
64  
65      private HashMap<String,Handler> _handlers=new HashMap<String,Handler>();
66  
67      private ChannelImpl _root = new ChannelImpl("/",this);
68      private ConcurrentHashMap<String,ClientImpl> _clients=new ConcurrentHashMap<String,ClientImpl>();
69      protected SecurityPolicy _securityPolicy=new DefaultPolicy();
70      protected JSON.Literal _advice;
71      protected JSON.Literal _multiFrameAdvice;
72      protected int _adviceVersion=0;
73      protected Object _handshakeAdvice=new JSON.Literal("{\"reconnect\":\"handshake\",\"interval\":500}");
74      protected int _logLevel;
75      protected long _timeout=240000;
76      protected long _interval=0;
77      protected long _maxInterval=30000;
78      protected boolean _initialized;
79      protected ConcurrentHashMap<String, List<String>> _browser2client=new ConcurrentHashMap<String, List<String>>();
80      protected int _multiFrameInterval=-1;
81  
82      protected boolean _requestAvailable;
83      protected ThreadLocal<HttpServletRequest> _request = new ThreadLocal<HttpServletRequest>();
84  
85      transient ServletContext _context;
86      transient Random _random;
87      transient ConcurrentHashMap<String, ChannelId> _channelIdCache;
88      protected Handler _publishHandler;
89      protected Handler _metaPublishHandler;
90      protected int _maxClientQueue=-1;
91  
92      protected Extension[] _extensions;
93      protected JSON.Literal _transports=new JSON.Literal("[\""+Bayeux.TRANSPORT_LONG_POLL+ "\",\""+Bayeux.TRANSPORT_CALLBACK_POLL+"\"]");
94      protected JSON.Literal _replyExt = new JSON.Literal("{\"ack\":\"true\"}");
95      protected List<ClientBayeuxListener> _clientListeners=new CopyOnWriteArrayList<ClientBayeuxListener>();
96      protected List<ChannelBayeuxListener> _channelListeners=new CopyOnWriteArrayList<ChannelBayeuxListener>();
97  
98      /* ------------------------------------------------------------ */
99      /**
100      * @param context.
101      *            The logLevel init parameter is used to set the logging to
102      *            0=none, 1=info, 2=debug
103      */
104     protected AbstractBayeux()
105     {
106         _publishHandler=new PublishHandler();
107         _metaPublishHandler=new MetaPublishHandler();
108         _handlers.put(META_HANDSHAKE,new HandshakeHandler());
109         _handlers.put(META_CONNECT,new ConnectHandler());
110         _handlers.put(META_DISCONNECT,new DisconnectHandler());
111         _handlers.put(META_SUBSCRIBE,new SubscribeHandler());
112         _handlers.put(META_UNSUBSCRIBE,new UnsubscribeHandler());
113         _handlers.put(META_PING,new PingHandler());
114 
115         setTimeout(getTimeout());
116     }
117 
118     /* ------------------------------------------------------------ */
119     public void addExtension(Extension ext)
120     {
121         _extensions = (Extension[])LazyList.addToArray(_extensions,ext,Extension.class);
122     }
123 
124     /* ------------------------------------------------------------ */
125     /**
126      * @param id
127      * @return
128      */
129     public ChannelImpl getChannel(ChannelId id)
130     {
131         return _root.getChild(id);
132     }
133 
134     /* ------------------------------------------------------------ */
135     public ChannelImpl getChannel(String id)
136     {
137         ChannelId cid=getChannelId(id);
138         if (cid.depth()==0)
139             return null;
140         return _root.getChild(cid);
141     }
142 
143     /* ------------------------------------------------------------ */
144     public Channel getChannel(String id, boolean create)
145     {
146         synchronized(this)
147         {
148             ChannelImpl channel=getChannel(id);
149 
150             if (channel==null && create)
151             {
152                 channel=new ChannelImpl(id,this);
153                 _root.addChild(channel);
154 
155                 if (isLogInfo())
156                     logInfo("newChannel: "+channel);
157             }
158             return channel;
159         }
160     }
161 
162     /* ------------------------------------------------------------ */
163     public ChannelId getChannelId(String id)
164     {
165         ChannelId cid = _channelIdCache.get(id);
166         if (cid==null)
167         {
168             // TODO shrink cache!
169             cid=new ChannelId(id);
170             _channelIdCache.put(id,cid);
171         }
172         return cid;
173     }
174 
175     /* ------------------------------------------------------------ */
176     /* (non-Javadoc)
177      * @see org.mortbay.cometd.Bx#getClient(java.lang.String)
178      */
179     public Client getClient(String client_id)
180     {
181         synchronized(this)
182         {
183             if (client_id==null)
184                 return null;
185             Client client = _clients.get(client_id);
186             return client;
187         }
188     }
189 
190     /* ------------------------------------------------------------ */
191     public Set<String> getClientIDs()
192     {
193         return _clients.keySet();
194     }
195 
196     /* ------------------------------------------------------------ */
197     /**
198      * @return The maximum time in ms to wait between polls before timing out a client
199      */
200     public long getMaxInterval()
201     {
202         return _maxInterval;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /**
207      * @return the logLevel. 0=none, 1=info, 2=debug
208      */
209     public int getLogLevel()
210     {
211         return _logLevel;
212     }
213 
214     /* ------------------------------------------------------------ */
215     /* (non-Javadoc)
216      * @see org.mortbay.cometd.Bx#getSecurityPolicy()
217      */
218     public SecurityPolicy getSecurityPolicy()
219     {
220         return _securityPolicy;
221     }
222 
223     /* ------------------------------------------------------------ */
224     public long getTimeout()
225     {
226         return _timeout;
227     }
228 
229     /* ------------------------------------------------------------ */
230     public long getInterval()
231     {
232         return _interval;
233     }
234 
235     /* ------------------------------------------------------------ */
236     /**
237      * @return true if published messages are directly delivered to subscribers. False if
238      * a new message is to be created that holds only supported fields.
239      */
240     public boolean isDirectDeliver()
241     {
242         return false;
243     }
244 
245     /* ------------------------------------------------------------ */
246     /**
247      * @deprecated
248      * @param directDeliver true if published messages are directly delivered to subscribers. False if
249      * a new message is to be created that holds only supported fields.
250      */
251     public void setDirectDeliver(boolean directDeliver)
252     {
253         _context.log("directDeliver is deprecated");
254     }
255 
256     /* ------------------------------------------------------------ */
257     /** Handle a Bayeux message.
258      * This is normally only called by the bayeux servlet or a test harness.
259      * @param client The client if known
260      * @param transport The transport to use for the message
261      * @param message The bayeux message.
262      */
263     public String handle(ClientImpl client, Transport transport, Message message) throws IOException
264     {
265         String channel_id=message.getChannel();
266 
267         Handler handler=(Handler)_handlers.get(channel_id);
268         if (handler!=null)
269         {
270             message=extendRcvMeta(client,message);
271             handler.handle(client,transport,message);
272             _metaPublishHandler.handle(client,transport,message);
273         }
274         else if (channel_id.startsWith(META_SLASH))
275         {
276             message=extendRcvMeta(client,message);
277             _metaPublishHandler.handle(client,transport,message);
278         }
279         else
280         {
281             // non meta channel
282             handler=_publishHandler;
283             message=extendRcv(client,message);
284             handler.handle(client,transport,message);
285         }
286 
287         return channel_id;
288     }
289 
290     /* ------------------------------------------------------------ */
291     public boolean hasChannel(String id)
292     {
293         ChannelId cid=getChannelId(id);
294         return _root.getChild(cid)!=null;
295     }
296 
297     /* ------------------------------------------------------------ */
298     public boolean isInitialized()
299     {
300         return _initialized;
301     }
302 
303     /* ------------------------------------------------------------ */
304     /**
305      * @return the commented
306      * @deprecated
307      */
308     public boolean isJSONCommented()
309     {
310         return false;
311     }
312 
313     /* ------------------------------------------------------------ */
314     public boolean isLogDebug()
315     {
316         return _logLevel>1;
317     }
318 
319     /* ------------------------------------------------------------ */
320     public boolean isLogInfo()
321     {
322         return _logLevel>0;
323     }
324 
325     /* ------------------------------------------------------------ */
326     public void logDebug(String message)
327     {
328         if (_logLevel>1)
329             _context.log(message);
330     }
331 
332     /* ------------------------------------------------------------ */
333     public void logDebug(String message, Throwable th)
334     {
335         if (_logLevel>1)
336             _context.log(message,th);
337     }
338 
339     /* ------------------------------------------------------------ */
340     public void logWarn(String message, Throwable th)
341     {
342         _context.log(message+": "+th.toString());
343     }
344 
345     /* ------------------------------------------------------------ */
346     public void logWarn(String message)
347     {
348         _context.log(message);
349     }
350 
351     /* ------------------------------------------------------------ */
352     public void logInfo(String message)
353     {
354         if (_logLevel>0)
355             _context.log(message);
356     }
357 
358     /* ------------------------------------------------------------ */
359     public Client newClient(String idPrefix)
360     {
361         ClientImpl client = new ClientImpl(this,idPrefix);
362         return client;
363     }
364 
365     /* ------------------------------------------------------------ */
366     public abstract ClientImpl newRemoteClient();
367 
368     /* ------------------------------------------------------------ */
369     /** Create new transport object for a bayeux message
370      * @param client The client
371      * @param message the bayeux message
372      * @return the negotiated transport.
373      */
374     public Transport newTransport(ClientImpl client, Map<?,?> message)
375     {
376         if (isLogDebug())
377             logDebug("newTransport: client="+client+",message="+message);
378 
379         Transport result=null;
380 
381         try
382         {
383             String type=client==null?null:client.getConnectionType();
384             if (type==null)
385                 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
386 
387             if (Bayeux.TRANSPORT_CALLBACK_POLL.equals(type) || type==null)
388             {
389                 String jsonp=(String)message.get(Bayeux.JSONP_PARAMETER);
390                 if(jsonp!=null)
391                     result=new JSONPTransport(jsonp);
392                 else
393                     result=new JSONTransport();
394             }
395             else
396                 result=new JSONTransport();
397 
398         }
399         catch (Exception e)
400         {
401             throw new RuntimeException(e);
402         }
403 
404         if (isLogDebug())
405             logDebug("newTransport: result="+result);
406         return result;
407     }
408 
409     /* ------------------------------------------------------------ */
410     /** Publish data to a channel.
411      * Creates a message and delivers it to the root channel.
412      * @param to
413      * @param from
414      * @param data
415      * @param msgId
416      */
417     protected void doPublish(ChannelId to, Client from, Object data, String msgId, boolean lazy)
418     {
419         final MessageImpl message = newMessage();
420         message.put(CHANNEL_FIELD,to.toString());
421 
422         if (msgId==null)
423         {
424             long id=message.hashCode()
425             ^(to==null?0:to.hashCode())
426             ^(from==null?0:from.hashCode());
427             id=id<0?-id:id;
428             message.put(ID_FIELD,Long.toString(id,36));
429         }
430         else
431             message.put(ID_FIELD,msgId);
432         message.put(DATA_FIELD,data);
433         
434         message.setLazy(lazy);
435 
436         final Message m=extendSendBayeux(from,message);
437 
438         if (m!=null)
439             _root.doDelivery(to,from,m);
440         if (m instanceof MessageImpl)
441             ((MessageImpl)m).decRef();
442     }
443 
444     /* ------------------------------------------------------------ */
445     public boolean removeChannel(ChannelImpl channel)
446     {
447         boolean removed = _root.doRemove(channel);
448         if (removed)
449             for (ChannelBayeuxListener l : _channelListeners)
450                 l.channelRemoved(channel);
451         return removed;
452     }
453 
454     /* ------------------------------------------------------------ */
455     public void addChannel(ChannelImpl channel)
456     {
457         for (ChannelBayeuxListener l : _channelListeners)
458             l.channelAdded(channel);
459     }
460 
461     /* ------------------------------------------------------------ */
462     protected String newClientId(long variation, String idPrefix)
463     {
464         if (idPrefix==null)
465             return Long.toString(getRandom(),36)+Long.toString(variation,36);
466         else
467             return idPrefix+"_"+Long.toString(getRandom(),36);
468     }
469 
470     /* ------------------------------------------------------------ */
471     protected void addClient(ClientImpl client,String idPrefix)
472     {
473         while(true)
474         {
475             String id = newClientId(client.hashCode(),idPrefix);
476             client.setId(id);
477 
478             ClientImpl other = _clients.putIfAbsent(id,client);
479             if (other==null)
480             {
481                 for (ClientBayeuxListener l : _clientListeners)
482                     l.clientAdded((Client)client);
483 
484                 return;
485             }
486         }
487     }
488 
489     /* ------------------------------------------------------------ */
490     /* (non-Javadoc)
491      * @see org.mortbay.cometd.Bx#removeClient(java.lang.String)
492      */
493     public Client removeClient(String client_id)
494     {
495         ClientImpl client;
496         synchronized(this)
497         {
498             if (client_id==null)
499                 return null;
500             client = _clients.remove(client_id);
501         }
502         if (client!=null)
503         {
504             client.unsubscribeAll();
505             for (ClientBayeuxListener l : _clientListeners)
506                 l.clientRemoved((Client)client);
507         }
508         return client;
509     }
510 
511     /* ------------------------------------------------------------ */
512     /**
513      * @param ms The maximum time in ms to wait between polls before timing out a client
514      */
515     public void setMaxInterval(long ms)
516     {
517         _maxInterval=ms;
518     }
519 
520     /* ------------------------------------------------------------ */
521     /**
522      * @param commented the commented to set
523      */
524     public void setJSONCommented(boolean commented)
525     {
526         if (commented)
527             _context.log("JSONCommented is deprecated");
528     }
529 
530     /* ------------------------------------------------------------ */
531     /**
532      * @param logLevel
533      *            the logLevel: 0=none, 1=info, 2=debug
534      */
535     public void setLogLevel(int logLevel)
536     {
537         _logLevel=logLevel;
538     }
539 
540     /* ------------------------------------------------------------ */
541     /* (non-Javadoc)
542      * @see org.mortbay.cometd.Bx#setSecurityPolicy(org.mortbay.cometd.SecurityPolicy)
543      */
544     public void setSecurityPolicy(SecurityPolicy securityPolicy)
545     {
546         _securityPolicy=securityPolicy;
547     }
548 
549 
550     /* ------------------------------------------------------------ */
551     public void setTimeout(long ms)
552     {
553         _timeout = ms;
554         generateAdvice();
555     }
556 
557 
558     /* ------------------------------------------------------------ */
559     public void setInterval(long ms)
560     {
561         _interval = ms;
562         generateAdvice();
563     }
564 
565     /* ------------------------------------------------------------ */
566     /**
567      * The time a client should delay between reconnects when multiple
568      * connections from the same browser are detected. This effectively
569      * produces traditional polling.
570      * @param multiFrameInterval the multiFrameInterval to set
571      */
572     public void setMultiFrameInterval(int multiFrameInterval)
573     {
574         _multiFrameInterval=multiFrameInterval;
575         generateAdvice();
576     }
577 
578     /* ------------------------------------------------------------ */
579     /**
580      * @return the multiFrameInterval in milliseconds
581      */
582     public int getMultiFrameInterval()
583     {
584         return _multiFrameInterval;
585     }
586 
587     /* ------------------------------------------------------------ */
588     void generateAdvice()
589     {
590         setAdvice(new JSON.Literal("{\"reconnect\":\"retry\",\"interval\":"+getInterval()+",\"timeout\":"+getTimeout()+"}"));
591     }
592 
593     /* ------------------------------------------------------------ */
594     public void setAdvice(JSON.Literal advice)
595     {
596         synchronized(this)
597         {
598             _adviceVersion++;
599             _advice=advice;
600             _multiFrameAdvice=new JSON.Literal(JSON.toString(multiFrameAdvice(advice)));
601         }
602     }
603 
604     /* ------------------------------------------------------------ */
605     private Map<String,Object> multiFrameAdvice(JSON.Literal advice)
606     {
607         Map<String,Object> a = (Map<String,Object>)JSON.parse(_advice.toString());
608         a.put("multiple-clients",Boolean.TRUE);
609         if (_multiFrameInterval>0)
610         {
611             a.put("reconnect","retry");
612             a.put("interval",_multiFrameInterval);
613         }
614         else
615             a.put("reconnect","none");
616         return a;
617     }
618 
619     /* ------------------------------------------------------------ */
620     public JSON.Literal getAdvice()
621     {
622         return _advice;
623     }
624 
625     /* ------------------------------------------------------------ */
626     /**
627      * @return TRUE if {@link #getCurrentRequest()} will return the current request
628      */
629     public boolean isRequestAvailable()
630     {
631         return _requestAvailable;
632     }
633 
634     /* ------------------------------------------------------------ */
635     /**
636      * @param requestAvailable TRUE if {@link #getCurrentRequest()} will return the current request
637      */
638     public void setRequestAvailable(boolean requestAvailable)
639     {
640         _requestAvailable=requestAvailable;
641     }
642 
643     /* ------------------------------------------------------------ */
644     /**
645      * @return the current request if {@link #isRequestAvailable()} is true, else null
646      */
647     public HttpServletRequest getCurrentRequest()
648     {
649         return _request.get();
650     }
651 
652     /* ------------------------------------------------------------ */
653     /**
654      * @return the current request if {@link #isRequestAvailable()} is true, else null
655      */
656     void setCurrentRequest(HttpServletRequest request)
657     {
658         _request.set(request);
659     }
660 
661 
662 
663     /* ------------------------------------------------------------ */
664     public Collection<Channel> getChannels()
665     {
666         List<Channel> channels = new ArrayList<Channel>();
667         _root.getChannels(channels);
668         return channels;
669     }
670 
671     /* ------------------------------------------------------------ */
672     /**
673      * @return
674      */
675     public int getChannelCount()
676     {
677         return _root.getChannelCount();
678     }
679 
680     /* ------------------------------------------------------------ */
681     public Collection<Client> getClients()
682     {
683         synchronized(this)
684         {
685             return new ArrayList<Client>(_clients.values());
686         }
687     }
688 
689     /* ------------------------------------------------------------ */
690     /**
691      * @return
692      */
693     public int getClientCount()
694     {
695         synchronized(this)
696         {
697             return _clients.size();
698         }
699     }
700 
701     /* ------------------------------------------------------------ */
702     public boolean hasClient(String clientId)
703     {
704         synchronized(this)
705         {
706             if (clientId==null)
707                 return false;
708             return _clients.containsKey(clientId);
709         }
710     }
711 
712     /* ------------------------------------------------------------ */
713     public Channel removeChannel(String channelId)
714     {
715         Channel channel = getChannel(channelId);
716 
717         boolean removed = false;
718         if (channel!=null)
719             removed = channel.remove();
720 
721         if (removed)
722             return channel;
723         else
724             return null;
725     }
726 
727     /* ------------------------------------------------------------ */
728     protected void initialize(ServletContext context)
729     {
730         synchronized(this)
731         {
732             _initialized=true;
733             _context=context;
734             try
735             {
736                 _random=SecureRandom.getInstance("SHA1PRNG");
737             }
738             catch (Exception e)
739             {
740                 context.log("Could not get secure random for ID generation",e);
741                 _random=new Random();
742             }
743             _random.setSeed(_random.nextLong()^hashCode()^System.nanoTime()^Runtime.getRuntime().freeMemory());
744             _channelIdCache=new ConcurrentHashMap<String, ChannelId>();
745 
746             _root.addChild(new ServiceChannel(Bayeux.SERVICE));
747 
748         }
749     }
750 
751     /* ------------------------------------------------------------ */
752     long getRandom()
753     {
754         long l=_random.nextLong();
755         return l<0?-l:l;
756     }
757 
758     /* ------------------------------------------------------------ */
759     void clientOnBrowser(String browserId,String clientId)
760     {
761         List<String> clients=_browser2client.get(browserId);
762         if (clients==null)
763         {
764             List<String> new_clients=new CopyOnWriteArrayList<String>();
765             clients=_browser2client.putIfAbsent(browserId,new_clients);
766             if (clients==null)
767                 clients=new_clients;
768         }
769         clients.add(clientId);
770     }
771 
772     /* ------------------------------------------------------------ */
773     void clientOffBrowser(String browserId,String clientId)
774     {
775         List<String> clients=_browser2client.get(browserId);
776         
777         if (clients!=null)
778             clients.remove(clientId);
779     }
780 
781     /* ------------------------------------------------------------ */
782     List<String> clientsOnBrowser(String browserId)
783     {
784         List<String> clients=_browser2client.get(browserId);
785         
786         if (clients==null)
787             return Collections.emptyList();
788         return clients;
789     }
790 
791     /* ------------------------------------------------------------ */
792     public void addListener(BayeuxListener listener)
793     {
794         if (listener instanceof ClientBayeuxListener)
795             _clientListeners.add((ClientBayeuxListener)listener);
796         else if(listener instanceof ChannelBayeuxListener)
797             _channelListeners.add((ChannelBayeuxListener)listener);
798     }
799 
800     /* ------------------------------------------------------------ */
801     public int getMaxClientQueue()
802     {
803         return _maxClientQueue;
804     }
805 
806     /* ------------------------------------------------------------ */
807     public void setMaxClientQueue(int size)
808     {
809         _maxClientQueue=size;
810     }
811 
812     /* ------------------------------------------------------------ */
813     protected Message extendRcv(ClientImpl from, Message message)
814     {
815         if (_extensions!=null)
816         {
817             for (int i=_extensions.length;message!=null && i-->0;)
818                 message=_extensions[i].rcv(from, message);
819         }
820 
821         if (from!=null)
822         {
823             Extension[] client_exs = from.getExtensions();
824             if (client_exs!=null)
825             {
826                 for (int i=client_exs.length;message!=null && i-->0;)
827                     message=client_exs[i].rcv(from, message);
828             }
829         }
830 
831         return message;
832     }
833 
834     /* ------------------------------------------------------------ */
835     protected Message extendRcvMeta(ClientImpl from, Message message)
836     {
837         if (_extensions!=null)
838         {
839             for (int i=_extensions.length;message!=null && i-->0;)
840                 message=_extensions[i].rcvMeta(from, message);
841         }
842 
843         if (from!=null)
844         {
845             Extension[] client_exs = from.getExtensions();
846             if (client_exs!=null)
847             {
848                 for (int i=client_exs.length;message!=null && i-->0;)
849                     message=client_exs[i].rcvMeta(from, message);
850             }
851         }
852         return message;
853     }
854 
855     /* ------------------------------------------------------------ */
856     protected Message extendSendBayeux(Client from, Message message)
857     {
858         if (_extensions!=null)
859         {
860             for (int i=0;message!=null && i<_extensions.length;i++)
861             {
862                 Message m=_extensions[i].send(from, message);
863             }
864         }
865 
866         return message;
867     }
868 
869     /* ------------------------------------------------------------ */
870     public Message extendSendClient(Client from, ClientImpl to, Message message)
871     {
872         if (to!=null)
873         {
874             Extension[] client_exs = to.getExtensions();
875             if (client_exs!=null)
876             {
877                 for (int i=0;message!=null && i<client_exs.length;i++)
878                     message=client_exs[i].send(from, message);
879             }
880         }
881 
882         return message;
883     }
884 
885     /* ------------------------------------------------------------ */
886     public Message extendSendMeta(ClientImpl from, Message message)
887     {
888         if (_extensions!=null)
889         {
890             for (int i=0;message!=null && i<_extensions.length;i++)
891                 message=_extensions[i].sendMeta(from, message);
892         }
893 
894         if (from!=null)
895         {
896             Extension[] client_exs = from.getExtensions();
897             if (client_exs!=null)
898             {
899                 for (int i=0;message!=null && i<client_exs.length;i++)
900                     message=client_exs[i].sendMeta(from, message);
901             }
902         }
903 
904         return message;
905     }
906 
907 
908     /* ------------------------------------------------------------ */
909     /* ------------------------------------------------------------ */
910     public static class DefaultPolicy implements SecurityPolicy
911     {
912         public boolean canHandshake(Message message)
913         {
914             return true;
915         }
916 
917         public boolean canCreate(Client client, String channel, Message message)
918         {
919             return client!=null && !channel.startsWith(Bayeux.META_SLASH);
920         }
921 
922         public boolean canSubscribe(Client client, String channel, Message message)
923         {
924 	    if (client!=null && ("/**".equals(channel) || "/*".equals(channel)))
925 	        return false;
926             return client!=null && !channel.startsWith(Bayeux.META_SLASH);
927         }
928 
929         public boolean canPublish(Client client, String channel, Message message)
930         {
931             return client!=null || client==null && Bayeux.META_HANDSHAKE.equals(channel);
932         }
933 
934     }
935 
936 
937     /* ------------------------------------------------------------ */
938     /* ------------------------------------------------------------ */
939     protected abstract class Handler
940     {
941         abstract void handle(ClientImpl client, Transport transport, Message message) throws IOException;
942         abstract ChannelId getMetaChannelId();
943         void unknownClient(Transport transport,String channel) throws IOException
944         {
945             MessageImpl reply=newMessage();
946 
947             reply.put(CHANNEL_FIELD,channel);
948             reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
949             reply.put(ERROR_FIELD,"402::Unknown client");
950             reply.put("advice",_handshakeAdvice);
951             transport.send(reply);
952             reply.decRef();
953         }
954         
955         void sendMetaReply(final ClientImpl client,Message reply, final Transport transport) throws IOException
956         {
957             reply=extendSendMeta(client,reply);
958             if (reply!=null)
959             {
960                 transport.send(reply);
961                 if (reply instanceof MessageImpl)
962                     ((MessageImpl)reply).decRef();
963             }
964         }
965     }
966 
967     /* ------------------------------------------------------------ */
968     /* ------------------------------------------------------------ */
969     protected class ConnectHandler extends Handler
970     {
971         protected String _metaChannel=META_CONNECT;
972 
973         @Override
974         ChannelId getMetaChannelId()
975         {
976             return META_CONNECT_ID;
977         }
978 
979         @Override
980         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
981         {
982             if (client==null)
983             {
984                 unknownClient(transport,_metaChannel);
985                 return;
986             }
987 
988             // is this the first connect message?
989             String type=client.getConnectionType();
990             boolean polling=true;
991             if (type==null)
992             {
993                 type=(String)message.get(Bayeux.CONNECTION_TYPE_FIELD);
994                 client.setConnectionType(type);
995                 polling=false;
996             }
997 
998             Object advice = message.get(ADVICE_FIELD);
999             if (advice!=null)
1000             {
1001                 Long timeout=(Long)((Map)advice).get("timeout");
1002                 if (timeout!=null && timeout.longValue()>0)
1003                     client.setTimeout(timeout.longValue());
1004                 else
1005                     client.setTimeout(0);
1006             }
1007             else
1008                 client.setTimeout(0);
1009 
1010             advice=null;
1011 
1012             // Work out if multiple clients from some browser?
1013             if (polling && _multiFrameInterval>0 && client.getBrowserId()!=null)
1014             {
1015                 List<String> clients=clientsOnBrowser(client.getBrowserId());
1016                 int count=clients.size();
1017                 if (count>1)
1018                 {
1019                     polling=clients.get(0).equals(client.getId());
1020                     advice=client.getAdvice();
1021                     if (advice==null)
1022                         advice=_multiFrameAdvice;
1023                     else // could probably cache this
1024                         advice=multiFrameAdvice((JSON.Literal)advice);
1025                 }
1026             }
1027 
1028             synchronized(this)
1029             {
1030                 if (advice==null)
1031                 {
1032                     if (_adviceVersion!=client._adviseVersion)
1033                     {
1034                         advice=_advice;
1035                         client._adviseVersion=_adviceVersion;
1036                     }
1037                 }
1038                 else
1039                     client._adviseVersion=-1; // clear so it is reset after multi state clears
1040             }
1041 
1042             // reply to connect message
1043             String id=message.getId();
1044 
1045             Message reply=newMessage(message);
1046 
1047             reply.put(CHANNEL_FIELD,META_CONNECT);
1048             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1049             if (advice!=null)
1050                 reply.put(ADVICE_FIELD,advice);
1051             if (id!=null)
1052                 reply.put(ID_FIELD,id);
1053 
1054             if (polling)
1055                 transport.setMetaConnnectReply(reply);
1056             else
1057                 sendMetaReply(client,reply,transport);
1058         }
1059     }
1060 
1061     /* ------------------------------------------------------------ */
1062     /* ------------------------------------------------------------ */
1063     protected class DisconnectHandler extends Handler
1064     {
1065         @Override
1066         ChannelId getMetaChannelId()
1067         {
1068             return META_DISCONNECT_ID;
1069         }
1070 
1071         @Override
1072         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1073         {
1074             if (client==null)
1075             {
1076                 unknownClient(transport,META_DISCONNECT);
1077                 return;
1078             }
1079             if (isLogInfo())
1080                 logInfo("Disconnect "+client.getId());
1081 
1082             client.remove(false);
1083 
1084             Message reply=newMessage(message);
1085             reply.put(CHANNEL_FIELD,META_DISCONNECT);
1086             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1087             String id=message.getId();
1088             if (id!=null)
1089                 reply.put(ID_FIELD,id);
1090 
1091             reply=extendSendMeta(client,reply);
1092 
1093             Message pollReply = transport.getMetaConnectReply();
1094             if (pollReply!=null)
1095             {
1096                 transport.setMetaConnnectReply(null);
1097                 sendMetaReply(client,pollReply,transport);
1098             }
1099             sendMetaReply(client,reply,transport);
1100         }
1101     }
1102 
1103 
1104     /* ------------------------------------------------------------ */
1105     /* ------------------------------------------------------------ */
1106     protected class HandshakeHandler extends Handler
1107     {
1108         @Override
1109         ChannelId getMetaChannelId()
1110         {
1111             return META_HANDSHAKE_ID;
1112         }
1113 
1114         @Override
1115         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1116         {
1117             if (client!=null)
1118                 throw new IllegalStateException();
1119 
1120             if (_securityPolicy!=null && !_securityPolicy.canHandshake(message))
1121             {
1122                 Message reply=newMessage(message);
1123                 reply.put(CHANNEL_FIELD,META_HANDSHAKE);
1124                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1125                 reply.put(ERROR_FIELD,"403::Handshake denied");
1126 
1127                 sendMetaReply(client,reply,transport);
1128                 return;
1129             }
1130 
1131             client=newRemoteClient();
1132 
1133             Message reply=newMessage(message);
1134             reply.put(CHANNEL_FIELD, META_HANDSHAKE);
1135             reply.put(VERSION_FIELD, "1.0");
1136             reply.put(MIN_VERSION_FIELD, "0.9");
1137 
1138             if (client!=null)
1139             {
1140                 reply.put(SUPP_CONNECTION_TYPE_FIELD, _transports);
1141                 reply.put(SUCCESSFUL_FIELD, Boolean.TRUE);
1142                 reply.put(CLIENT_FIELD, client.getId());
1143                 if (_advice!=null)
1144                     reply.put(ADVICE_FIELD,_advice);
1145             }
1146             else
1147             {
1148                 reply.put(Bayeux.SUCCESSFUL_FIELD,Boolean.FALSE);
1149                 if (_advice!=null)
1150                     reply.put(ADVICE_FIELD,_advice);
1151             }
1152 
1153             if (isLogDebug())
1154                 logDebug("handshake.handle: reply="+reply);
1155 
1156             String id=message.getId();
1157             if (id!=null)
1158                 reply.put(ID_FIELD,id);
1159 
1160             sendMetaReply(client,reply,transport);
1161         }
1162     }
1163 
1164     /* ------------------------------------------------------------ */
1165     /* ------------------------------------------------------------ */
1166     protected class PublishHandler extends Handler
1167     {
1168         @Override
1169         ChannelId getMetaChannelId()
1170         {
1171             return null;
1172         }
1173 
1174         @Override
1175         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1176         {
1177             String channel_id=message.getChannel();
1178 
1179             if (client==null && message.containsKey(CLIENT_FIELD))
1180             {
1181                 unknownClient(transport,channel_id);
1182                 return;
1183             }
1184 
1185             String id=message.getId();
1186 
1187             ChannelId cid=getChannelId(channel_id);
1188             Object data=message.get(Bayeux.DATA_FIELD);
1189 
1190             Message reply=newMessage(message);
1191             reply.put(CHANNEL_FIELD,channel_id);
1192             if (id!=null)
1193                 reply.put(ID_FIELD,id);
1194 
1195             if (data!=null&&_securityPolicy.canPublish(client,channel_id,message))
1196             {
1197                 message.remove(CLIENT_FIELD);
1198                 message=extendSendBayeux(client,message);
1199 
1200                 if (message!=null)
1201                 {
1202                     reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1203                 }
1204                 else
1205                 {
1206                     reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1207                     reply.put(ERROR_FIELD,"404::Message deleted");
1208                 }
1209             }
1210             else
1211             {
1212                 message=null;
1213                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1214                 reply.put(ERROR_FIELD,"403::Publish denied");
1215             }
1216 
1217             sendMetaReply(client,reply,transport);
1218 
1219             if (message!=null)
1220                 _root.doDelivery(cid,client,message);
1221         }
1222     }
1223 
1224     /* ------------------------------------------------------------ */
1225     /* ------------------------------------------------------------ */
1226     protected class MetaPublishHandler extends Handler
1227     {
1228         @Override
1229         ChannelId getMetaChannelId()
1230         {
1231             return null;
1232         }
1233 
1234         @Override
1235         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1236         {
1237             String channel_id=message.getChannel();
1238 
1239             if (client==null && !META_HANDSHAKE.equals(channel_id))
1240             {
1241                 // unknown client
1242                 return;
1243             }
1244 
1245             if(_securityPolicy.canPublish(client,channel_id,message))
1246             {
1247                 _root.doDelivery(getChannelId(channel_id),client,message);
1248             }
1249         }
1250     }
1251 
1252     /* ------------------------------------------------------------ */
1253     /* ------------------------------------------------------------ */
1254     protected class SubscribeHandler extends Handler
1255     {
1256         @Override
1257         ChannelId getMetaChannelId()
1258         {
1259             return META_SUBSCRIBE_ID;
1260         }
1261 
1262         @Override
1263         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1264         {
1265             if (client==null)
1266             {
1267                 unknownClient(transport,META_SUBSCRIBE);
1268                 return;
1269             }
1270 
1271             String subscribe_id=(String)message.get(SUBSCRIPTION_FIELD);
1272 
1273             // select a random channel ID if none specifified
1274             if (subscribe_id==null)
1275             {
1276                 subscribe_id=Long.toString(getRandom(),36);
1277                 while (getChannel(subscribe_id)!=null)
1278                     subscribe_id=Long.toString(getRandom(),36);
1279             }
1280 
1281             ChannelId cid=null;
1282             boolean can_subscribe=false;
1283 
1284             if (subscribe_id.startsWith(Bayeux.SERVICE_SLASH))
1285             {
1286                 can_subscribe=true;
1287             }
1288             else if (subscribe_id.startsWith(Bayeux.META_SLASH))
1289             {
1290                 can_subscribe=false;
1291             }
1292             else
1293             {
1294                 cid=getChannelId(subscribe_id);
1295                 can_subscribe=_securityPolicy.canSubscribe(client,subscribe_id,message);
1296             }
1297 
1298             Message reply=newMessage(message);
1299             reply.put(CHANNEL_FIELD,META_SUBSCRIBE);
1300             reply.put(SUBSCRIPTION_FIELD,subscribe_id);
1301 
1302             if (can_subscribe)
1303             {
1304                 if (cid!=null)
1305                 {
1306                     ChannelImpl channel=getChannel(cid);
1307                     if (channel==null&&_securityPolicy.canCreate(client,subscribe_id,message))
1308                         channel=(ChannelImpl)getChannel(subscribe_id, true);
1309 
1310                     if (channel!=null)
1311                         channel.subscribe(client);
1312                     else
1313                         can_subscribe=false;
1314                 }
1315 
1316                 if (can_subscribe)
1317                 {
1318                     reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1319                 }
1320                 else
1321                 {
1322                     reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1323                     reply.put(ERROR_FIELD,"403::cannot create");
1324                 }
1325             }
1326             else
1327             {
1328                 reply.put(SUCCESSFUL_FIELD,Boolean.FALSE);
1329                 reply.put(ERROR_FIELD,"403::cannot subscribe");
1330 
1331             }
1332 
1333             String id=message.getId();
1334             if (id!=null)
1335                 reply.put(ID_FIELD,id);
1336 
1337             sendMetaReply(client,reply,transport);
1338         }
1339     }
1340 
1341     /* ------------------------------------------------------------ */
1342     /* ------------------------------------------------------------ */
1343     protected class UnsubscribeHandler extends Handler
1344     {
1345         @Override
1346         ChannelId getMetaChannelId()
1347         {
1348             return META_UNSUBSCRIBE_ID;
1349         }
1350 
1351         @Override
1352         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1353         {
1354             if (client==null)
1355             {
1356                 unknownClient(transport,META_UNSUBSCRIBE);
1357                 return;
1358             }
1359 
1360             String channel_id=(String)message.get(SUBSCRIPTION_FIELD);
1361             ChannelImpl channel=getChannel(channel_id);
1362             if (channel!=null)
1363                 channel.unsubscribe(client);
1364 
1365             Message reply=newMessage(message);
1366             reply.put(CHANNEL_FIELD,META_UNSUBSCRIBE);
1367             reply.put(SUBSCRIPTION_FIELD,channel_id);
1368             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1369 
1370             String id=message.getId();
1371             if (id!=null)
1372                 reply.put(ID_FIELD,id);
1373 
1374             sendMetaReply(client,reply,transport);
1375         }
1376     }
1377 
1378     /* ------------------------------------------------------------ */
1379     /* ------------------------------------------------------------ */
1380     protected class PingHandler extends Handler
1381     {
1382         @Override
1383         ChannelId getMetaChannelId()
1384         {
1385             return META_PING_ID;
1386         }
1387 
1388         @Override
1389         public void handle(ClientImpl client, Transport transport, Message message) throws IOException
1390         {
1391             Message reply=newMessage(message);
1392             reply.put(CHANNEL_FIELD,META_PING);
1393             reply.put(SUCCESSFUL_FIELD,Boolean.TRUE);
1394 
1395             String id=message.getId();
1396             if (id!=null)
1397                 reply.put(ID_FIELD,id);
1398 
1399             sendMetaReply(client,reply,transport);
1400         }
1401     }
1402 
1403 
1404     /* ------------------------------------------------------------ */
1405     /* ------------------------------------------------------------ */
1406     protected class ServiceChannel extends ChannelImpl
1407     {
1408         ServiceChannel(String id)
1409         {
1410             super(id,AbstractBayeux.this);
1411         }
1412 
1413         /* ------------------------------------------------------------ */
1414         /* (non-Javadoc)
1415          * @see org.mortbay.cometd.ChannelImpl#addChild(org.mortbay.cometd.ChannelImpl)
1416          */
1417         @Override
1418         public void addChild(ChannelImpl channel)
1419         {
1420             super.addChild(channel);
1421             setPersistent(true);
1422         }
1423 
1424         /* ------------------------------------------------------------ */
1425         @Override
1426         public void subscribe(Client client)
1427         {
1428             if (client.isLocal())
1429                 super.subscribe(client);
1430         }
1431 
1432     }
1433 }