View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.Arrays;
18  import java.util.Collection;
19  import java.util.List;
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.ConcurrentMap;
22  
23  import org.cometd.Bayeux;
24  import org.cometd.Channel;
25  import org.cometd.ChannelListener;
26  import org.cometd.Client;
27  import org.cometd.DataFilter;
28  import org.cometd.Message;
29  import org.cometd.SubscriptionListener;
30  import org.mortbay.log.Log;
31  import org.mortbay.util.LazyList;
32  
33  
34  /* ------------------------------------------------------------ */
35  /** A Bayuex Channel
36   * 
37   * @author gregw
38   *
39   */
40  public class ChannelImpl implements Channel
41  {
42      protected AbstractBayeux _bayeux;
43      private volatile ClientImpl[] _subscribers=new ClientImpl[0]; // copy on write
44      private volatile DataFilter[] _dataFilters=new DataFilter[0]; // copy on write
45      private volatile SubscriptionListener[] _subscriptionListeners=new SubscriptionListener[0]; // copy on write
46      private ChannelId _id;
47      private ConcurrentMap<String,ChannelImpl> _children = new ConcurrentHashMap<String, ChannelImpl>();
48      private ChannelImpl _wild;
49      private ChannelImpl _wildWild;
50      private boolean _persistent;
51      private int _split;
52      private boolean _lazy;
53      
54      /* ------------------------------------------------------------ */
55      ChannelImpl(String id,AbstractBayeux bayeux)
56      {
57          _id=new ChannelId(id);
58          _bayeux=bayeux;
59      }
60  
61      /* ------------------------------------------------------------ */
62      /**
63       * A Lazy channel marks published messages as lazy.
64       * Lazy messages are queued but do not wake up
65       * waiting clients.
66       * @return true if message is lazy
67       */
68      public boolean isLazy()
69      {
70          return _lazy;
71      }
72  
73      /* ------------------------------------------------------------ */
74      /**
75       * A Lazy channel marks published messages as lazy.
76       * Lazy messages are queued but do not wake up
77       * waiting clients.
78       * @param lazy true if message is lazy
79       */
80      public void setLazy(boolean lazy)
81      {
82          _lazy = lazy;
83      }
84      
85      /* ------------------------------------------------------------ */
86      public void addChild(ChannelImpl channel)
87      {
88          ChannelId child=channel.getChannelId();
89          if (!_id.isParentOf(child))
90          {
91              throw new IllegalArgumentException(_id+" not parent of "+child);
92          }
93          
94          String next = child.getSegment(_id.depth());
95  
96          if ((child.depth()-_id.depth())==1)
97          {
98              // add the channel to this channels
99              ChannelImpl old = _children.putIfAbsent(next,channel);
100             if (old!=null)
101                 throw new IllegalArgumentException("Already Exists");
102 
103             if (ChannelId.WILD.equals(next))
104                 _wild=channel;
105             else if (ChannelId.WILDWILD.equals(next))
106                 _wildWild=channel;
107             _bayeux.addChannel(channel);
108         }
109         else
110         {
111             ChannelImpl branch=_children.get(next);
112             branch=(ChannelImpl)_bayeux.getChannel((_id.depth()==0?"/":(_id.toString()+"/"))+next,true);
113             branch.addChild(channel);
114         }
115     }
116     
117     /* ------------------------------------------------------------ */
118     /**
119      * @param filter
120      */
121     public void addDataFilter(DataFilter filter)
122     {
123         synchronized(this)
124         {
125             _dataFilters=(DataFilter[])LazyList.addToArray(_dataFilters,filter,null);
126         }
127     }
128 
129     /* ------------------------------------------------------------ */
130     /* ------------------------------------------------------------ */
131     /**
132      * @return
133      */
134     public ChannelId getChannelId()
135     {
136         return _id;
137     }
138     
139     /* ------------------------------------------------------------ */
140     public ChannelImpl getChild(ChannelId id)
141     {
142         String next=id.getSegment(_id.depth());
143         if (next==null)
144             return null;
145         
146         ChannelImpl channel = _children.get(next);
147         
148         if (channel==null || channel.getChannelId().depth()==id.depth())
149         {
150             return channel;
151         }
152         return channel.getChild(id);
153     }
154 
155     /* ------------------------------------------------------------ */
156      public void getChannels(List<Channel> list)
157      {
158          list.add(this);
159          for (ChannelImpl channel: _children.values())
160              channel.getChannels(list);
161      }
162 
163      /* ------------------------------------------------------------ */
164      public int getChannelCount()
165      {
166          int count = 1;
167          
168          for(ChannelImpl channel: _children.values())
169              count += channel.getChannelCount();
170          
171          return count;
172      }
173      
174     /* ------------------------------------------------------------ */
175     /**
176      * @return
177      */
178     public String getId()
179     {
180         return _id.toString();
181     }
182 
183     
184     /* ------------------------------------------------------------ */
185     public boolean isPersistent()
186     {
187         return _persistent;
188     }
189 
190     /* ------------------------------------------------------------ */
191     public void deliver(Client from, Iterable<Client> to, Object data, String id)
192     {
193         MessageImpl message=_bayeux.newMessage();
194         message.put(Bayeux.CHANNEL_FIELD,getId());
195         message.put(Bayeux.DATA_FIELD,data);
196         if (id!=null)   
197             message.put(Bayeux.ID_FIELD,id);
198 
199         Message m=_bayeux.extendSendBayeux(from,message);
200         
201         if (m!=null)
202         {
203             for (Client t : to)
204                 ((ClientImpl)t).doDelivery(from,m);
205         }
206         if (m instanceof MessageImpl)
207             ((MessageImpl)m).decRef();
208     }
209     
210     /* ------------------------------------------------------------ */
211     public void publish(Client fromClient, Object data, String msgId)
212     {
213         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);   
214     }
215 
216     /* ------------------------------------------------------------ */
217     public void publishLazy(Client fromClient, Object data, String msgId)
218     {
219         _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);   
220     }
221     
222     /* ------------------------------------------------------------ */
223     public boolean remove()
224     {
225         return _bayeux.removeChannel(this);
226     }
227     
228     /* ------------------------------------------------------------ */
229     public boolean doRemove(ChannelImpl channel)
230     {
231         ChannelId channelId = channel.getChannelId();
232         String key = channelId.getSegment(channelId.depth()-1);
233         if (_children.containsKey(key))
234         {
235             ChannelImpl child = _children.get(key);
236             
237             synchronized (this)
238             {
239                 synchronized (child)
240                 {
241                     if (!child.isPersistent() && child.getSubscriberCount()==0 && child.getChannelCount()==1)
242                     {
243                         _children.remove(key);
244                         return true;
245                     }
246                     else
247                         return false;
248                 }       
249             }
250         }
251         else
252         {
253             for (ChannelImpl child : _children.values())
254             {
255                 if (child.doRemove(channel))
256                     return true;
257             }
258         }
259         return false;
260     }
261     
262     /* ------------------------------------------------------------ */
263     /* ------------------------------------------------------------ */
264     /**
265      * @param filter
266      */
267     public DataFilter removeDataFilter(DataFilter filter)
268     {
269         synchronized(this)
270         {
271             _dataFilters=(DataFilter[])LazyList.removeFromArray(_dataFilters,filter);
272             return filter;
273         }
274     }
275 
276     /* ------------------------------------------------------------ */
277     public void setPersistent(boolean persistent)
278     {
279         _persistent=persistent;
280     }
281 
282     /* ------------------------------------------------------------ */
283     /**
284      * @param client
285      */
286     public void subscribe(Client client)
287     {
288         if (!(client instanceof ClientImpl))
289             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
290         
291         synchronized (this)
292         {
293             for (ClientImpl c : _subscribers)
294             {
295                 if (client.equals(c))
296                     return;
297             }
298             _subscribers=(ClientImpl[])LazyList.addToArray(_subscribers,client,null);
299             
300             for (SubscriptionListener l : _subscriptionListeners)
301                 l.subscribed(client, this);
302         }
303         
304         ((ClientImpl)client).addSubscription(this);
305     }
306 
307     /* ------------------------------------------------------------ */
308     @Override
309     public String toString()
310     {
311         return _id.toString();
312     }
313 
314     /* ------------------------------------------------------------ */
315     /**
316      * @param client
317      */
318     public void unsubscribe(Client client)
319     {
320         if (!(client instanceof ClientImpl))
321             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
322         ((ClientImpl)client).removeSubscription(this);
323         synchronized(this)
324         {
325             _subscribers=(ClientImpl[])LazyList.removeFromArray(_subscribers,client);
326             
327             for (SubscriptionListener l : _subscriptionListeners)
328                 l.unsubscribed(client,this);
329             
330             if (!_persistent && _subscribers.length==0 && _children.size()==0)
331                 remove();
332         }
333     }
334 
335     /* ------------------------------------------------------------ */
336     protected void doDelivery(ChannelId to, Client from, Message msg)
337     {   
338         int tail = to.depth()-_id.depth();
339         
340         Object data = msg.getData();
341         Object old = data;
342         
343         try
344         {
345             switch(tail)
346             {
347                 case 0:      
348                 {
349                     final DataFilter[] filters=_dataFilters;
350                     for (DataFilter filter: filters)
351                     {
352                         data=filter.filter(from,this,data);
353                         if (data==null)
354                             return;
355                     }
356                 }
357                 break;
358 
359                 case 1:
360                     if (_wild!=null)  
361                     {
362                         final DataFilter[] filters=_wild._dataFilters;
363                         for (DataFilter filter: filters)
364                         {
365                             data=filter.filter(from,this,data);
366                             if (data==null)
367                                 return;
368                         }
369                     }
370 
371                 default:
372                     if (_wildWild!=null)  
373                     {
374                         final DataFilter[] filters=_wildWild._dataFilters;
375                         for (DataFilter filter: filters)
376                         {
377                             data=filter.filter(from,this,data);
378                             if (data==null)
379                                 return;
380                         }
381                     }
382             }
383         }
384         catch (IllegalStateException e)
385         {
386             Log.debug(e);
387             return;
388         }
389 
390         // TODO this may not be correct if the message is reused.
391         // probably should close message ?
392         if (data!=old)
393             msg.put(AbstractBayeux.DATA_FIELD,data);
394         
395 
396         switch(tail)
397         {
398             case 0:
399             {
400                 if (_lazy && msg instanceof MessageImpl)
401                     ((MessageImpl)msg).setLazy(true);
402                 
403                 final ClientImpl[] subscribers=_subscribers;
404                 if (subscribers.length>0)
405                 {
406                     // fair delivery 
407                     int split=_split++%_subscribers.length;
408                     for (int i=split;i<subscribers.length;i++)
409                         subscribers[i].doDelivery(from,msg);
410                     for (int i=0;i<split;i++)
411                         subscribers[i].doDelivery(from,msg);
412                 }                
413                 break;
414             }
415 
416             case 1:
417                 if (_wild!=null)
418                 {
419                     if (_wild._lazy && msg instanceof MessageImpl)
420                         ((MessageImpl)msg).setLazy(true);
421                     final ClientImpl[] subscribers=_wild._subscribers;
422                     for (ClientImpl client: subscribers)
423                         client.doDelivery(from,msg);
424                 }
425 
426             default:
427             {
428                 if (_wildWild!=null)
429                 {
430                     if (_wildWild._lazy && msg instanceof MessageImpl)
431                         ((MessageImpl)msg).setLazy(true);
432                     final ClientImpl[] subscribers=_wildWild._subscribers;
433                     for (ClientImpl client: subscribers)
434                         client.doDelivery(from,msg);
435                 }
436                 String next = to.getSegment(_id.depth());
437                 ChannelImpl channel = _children.get(next);
438                 if (channel!=null)
439                     channel.doDelivery(to,from,msg);
440             }
441         }
442     }
443     
444     /* ------------------------------------------------------------ */
445     public Collection<Client> getSubscribers()
446     {
447         synchronized(this)
448         {
449             return Arrays.asList((Client[])_subscribers);
450         }
451     }
452 
453     /* ------------------------------------------------------------ */
454     public int getSubscriberCount()
455     {
456         synchronized(this)
457         {
458             return _subscribers.length;
459         }
460     }
461 
462 
463     /* ------------------------------------------------------------ */
464     /* (non-Javadoc)
465      * @see dojox.cometd.Channel#getFilters()
466      */
467     public Collection<DataFilter> getDataFilters()
468     {
469         synchronized(this)
470         {
471             return Arrays.asList(_dataFilters);
472         }
473     }
474 
475     /* ------------------------------------------------------------ */
476     public void addListener(ChannelListener listener)
477     {
478         synchronized(this)
479         {
480             if (listener instanceof SubscriptionListener)
481                 _subscriptionListeners=(SubscriptionListener[])LazyList.addToArray(_subscriptionListeners,listener,null);
482         }
483     }
484     
485 }