1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.cometd;
16
17 import java.util.Arrays;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22
23 import org.cometd.Bayeux;
24 import org.cometd.Channel;
25 import org.cometd.ChannelListener;
26 import org.cometd.Client;
27 import org.cometd.DataFilter;
28 import org.cometd.Message;
29 import org.cometd.SubscriptionListener;
30 import org.mortbay.log.Log;
31 import org.mortbay.util.LazyList;
32
33
34
35
36
37
38
39
40 public class ChannelImpl implements Channel
41 {
42 protected AbstractBayeux _bayeux;
43 private volatile ClientImpl[] _subscribers=new ClientImpl[0];
44 private volatile DataFilter[] _dataFilters=new DataFilter[0];
45 private volatile SubscriptionListener[] _subscriptionListeners=new SubscriptionListener[0];
46 private ChannelId _id;
47 private ConcurrentMap<String,ChannelImpl> _children = new ConcurrentHashMap<String, ChannelImpl>();
48 private ChannelImpl _wild;
49 private ChannelImpl _wildWild;
50 private boolean _persistent;
51 private int _split;
52 private boolean _lazy;
53
54
55 ChannelImpl(String id,AbstractBayeux bayeux)
56 {
57 _id=new ChannelId(id);
58 _bayeux=bayeux;
59 }
60
61
62
63
64
65
66
67
68 public boolean isLazy()
69 {
70 return _lazy;
71 }
72
73
74
75
76
77
78
79
80 public void setLazy(boolean lazy)
81 {
82 _lazy = lazy;
83 }
84
85
86 public void addChild(ChannelImpl channel)
87 {
88 ChannelId child=channel.getChannelId();
89 if (!_id.isParentOf(child))
90 {
91 throw new IllegalArgumentException(_id+" not parent of "+child);
92 }
93
94 String next = child.getSegment(_id.depth());
95
96 if ((child.depth()-_id.depth())==1)
97 {
98
99 ChannelImpl old = _children.putIfAbsent(next,channel);
100 if (old!=null)
101 throw new IllegalArgumentException("Already Exists");
102
103 if (ChannelId.WILD.equals(next))
104 _wild=channel;
105 else if (ChannelId.WILDWILD.equals(next))
106 _wildWild=channel;
107 _bayeux.addChannel(channel);
108 }
109 else
110 {
111 ChannelImpl branch=_children.get(next);
112 branch=(ChannelImpl)_bayeux.getChannel((_id.depth()==0?"/":(_id.toString()+"/"))+next,true);
113 branch.addChild(channel);
114 }
115 }
116
117
118
119
120
121 public void addDataFilter(DataFilter filter)
122 {
123 synchronized(this)
124 {
125 _dataFilters=(DataFilter[])LazyList.addToArray(_dataFilters,filter,null);
126 }
127 }
128
129
130
131
132
133
134 public ChannelId getChannelId()
135 {
136 return _id;
137 }
138
139
140 public ChannelImpl getChild(ChannelId id)
141 {
142 String next=id.getSegment(_id.depth());
143 if (next==null)
144 return null;
145
146 ChannelImpl channel = _children.get(next);
147
148 if (channel==null || channel.getChannelId().depth()==id.depth())
149 {
150 return channel;
151 }
152 return channel.getChild(id);
153 }
154
155
156 public void getChannels(List<Channel> list)
157 {
158 list.add(this);
159 for (ChannelImpl channel: _children.values())
160 channel.getChannels(list);
161 }
162
163
164 public int getChannelCount()
165 {
166 int count = 1;
167
168 for(ChannelImpl channel: _children.values())
169 count += channel.getChannelCount();
170
171 return count;
172 }
173
174
175
176
177
178 public String getId()
179 {
180 return _id.toString();
181 }
182
183
184
185 public boolean isPersistent()
186 {
187 return _persistent;
188 }
189
190
191 public void deliver(Client from, Iterable<Client> to, Object data, String id)
192 {
193 MessageImpl message=_bayeux.newMessage();
194 message.put(Bayeux.CHANNEL_FIELD,getId());
195 message.put(Bayeux.DATA_FIELD,data);
196 if (id!=null)
197 message.put(Bayeux.ID_FIELD,id);
198
199 Message m=_bayeux.extendSendBayeux(from,message);
200
201 if (m!=null)
202 {
203 for (Client t : to)
204 ((ClientImpl)t).doDelivery(from,m);
205 }
206 if (m instanceof MessageImpl)
207 ((MessageImpl)m).decRef();
208 }
209
210
211 public void publish(Client fromClient, Object data, String msgId)
212 {
213 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,false);
214 }
215
216
217 public void publishLazy(Client fromClient, Object data, String msgId)
218 {
219 _bayeux.doPublish(getChannelId(),fromClient,data,msgId,true);
220 }
221
222
223 public boolean remove()
224 {
225 return _bayeux.removeChannel(this);
226 }
227
228
229 public boolean doRemove(ChannelImpl channel)
230 {
231 ChannelId channelId = channel.getChannelId();
232 String key = channelId.getSegment(channelId.depth()-1);
233 if (_children.containsKey(key))
234 {
235 ChannelImpl child = _children.get(key);
236
237 synchronized (this)
238 {
239 synchronized (child)
240 {
241 if (!child.isPersistent() && child.getSubscriberCount()==0 && child.getChannelCount()==1)
242 {
243 _children.remove(key);
244 return true;
245 }
246 else
247 return false;
248 }
249 }
250 }
251 else
252 {
253 for (ChannelImpl child : _children.values())
254 {
255 if (child.doRemove(channel))
256 return true;
257 }
258 }
259 return false;
260 }
261
262
263
264
265
266
267 public DataFilter removeDataFilter(DataFilter filter)
268 {
269 synchronized(this)
270 {
271 _dataFilters=(DataFilter[])LazyList.removeFromArray(_dataFilters,filter);
272 return filter;
273 }
274 }
275
276
277 public void setPersistent(boolean persistent)
278 {
279 _persistent=persistent;
280 }
281
282
283
284
285
286 public void subscribe(Client client)
287 {
288 if (!(client instanceof ClientImpl))
289 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
290
291 synchronized (this)
292 {
293 for (ClientImpl c : _subscribers)
294 {
295 if (client.equals(c))
296 return;
297 }
298 _subscribers=(ClientImpl[])LazyList.addToArray(_subscribers,client,null);
299
300 for (SubscriptionListener l : _subscriptionListeners)
301 l.subscribed(client, this);
302 }
303
304 ((ClientImpl)client).addSubscription(this);
305 }
306
307
308 @Override
309 public String toString()
310 {
311 return _id.toString();
312 }
313
314
315
316
317
318 public void unsubscribe(Client client)
319 {
320 if (!(client instanceof ClientImpl))
321 throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
322 ((ClientImpl)client).removeSubscription(this);
323 synchronized(this)
324 {
325 _subscribers=(ClientImpl[])LazyList.removeFromArray(_subscribers,client);
326
327 for (SubscriptionListener l : _subscriptionListeners)
328 l.unsubscribed(client,this);
329
330 if (!_persistent && _subscribers.length==0 && _children.size()==0)
331 remove();
332 }
333 }
334
335
336 protected void doDelivery(ChannelId to, Client from, Message msg)
337 {
338 int tail = to.depth()-_id.depth();
339
340 Object data = msg.getData();
341 Object old = data;
342
343 try
344 {
345 switch(tail)
346 {
347 case 0:
348 {
349 final DataFilter[] filters=_dataFilters;
350 for (DataFilter filter: filters)
351 {
352 data=filter.filter(from,this,data);
353 if (data==null)
354 return;
355 }
356 }
357 break;
358
359 case 1:
360 if (_wild!=null)
361 {
362 final DataFilter[] filters=_wild._dataFilters;
363 for (DataFilter filter: filters)
364 {
365 data=filter.filter(from,this,data);
366 if (data==null)
367 return;
368 }
369 }
370
371 default:
372 if (_wildWild!=null)
373 {
374 final DataFilter[] filters=_wildWild._dataFilters;
375 for (DataFilter filter: filters)
376 {
377 data=filter.filter(from,this,data);
378 if (data==null)
379 return;
380 }
381 }
382 }
383 }
384 catch (IllegalStateException e)
385 {
386 Log.debug(e);
387 return;
388 }
389
390
391
392 if (data!=old)
393 msg.put(AbstractBayeux.DATA_FIELD,data);
394
395
396 switch(tail)
397 {
398 case 0:
399 {
400 if (_lazy && msg instanceof MessageImpl)
401 ((MessageImpl)msg).setLazy(true);
402
403 final ClientImpl[] subscribers=_subscribers;
404 if (subscribers.length>0)
405 {
406
407 int split=_split++%_subscribers.length;
408 for (int i=split;i<subscribers.length;i++)
409 subscribers[i].doDelivery(from,msg);
410 for (int i=0;i<split;i++)
411 subscribers[i].doDelivery(from,msg);
412 }
413 break;
414 }
415
416 case 1:
417 if (_wild!=null)
418 {
419 if (_wild._lazy && msg instanceof MessageImpl)
420 ((MessageImpl)msg).setLazy(true);
421 final ClientImpl[] subscribers=_wild._subscribers;
422 for (ClientImpl client: subscribers)
423 client.doDelivery(from,msg);
424 }
425
426 default:
427 {
428 if (_wildWild!=null)
429 {
430 if (_wildWild._lazy && msg instanceof MessageImpl)
431 ((MessageImpl)msg).setLazy(true);
432 final ClientImpl[] subscribers=_wildWild._subscribers;
433 for (ClientImpl client: subscribers)
434 client.doDelivery(from,msg);
435 }
436 String next = to.getSegment(_id.depth());
437 ChannelImpl channel = _children.get(next);
438 if (channel!=null)
439 channel.doDelivery(to,from,msg);
440 }
441 }
442 }
443
444
445 public Collection<Client> getSubscribers()
446 {
447 synchronized(this)
448 {
449 return Arrays.asList((Client[])_subscribers);
450 }
451 }
452
453
454 public int getSubscriberCount()
455 {
456 synchronized(this)
457 {
458 return _subscribers.length;
459 }
460 }
461
462
463
464
465
466
467 public Collection<DataFilter> getDataFilters()
468 {
469 synchronized(this)
470 {
471 return Arrays.asList(_dataFilters);
472 }
473 }
474
475
476 public void addListener(ChannelListener listener)
477 {
478 synchronized(this)
479 {
480 if (listener instanceof SubscriptionListener)
481 _subscriptionListeners=(SubscriptionListener[])LazyList.addToArray(_subscriptionListeners,listener,null);
482 }
483 }
484
485 }