1 package org.cometd.oort;
2
3 import java.io.IOException;
4 import java.util.Arrays;
5 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.util.Map;
8 import java.util.Set;
9
10 import org.cometd.Bayeux;
11 import org.cometd.Channel;
12 import org.cometd.Client;
13 import org.cometd.Message;
14 import org.cometd.MessageListener;
15 import org.mortbay.cometd.MessageImpl;
16 import org.mortbay.cometd.client.BayeuxClient;
17 import org.mortbay.jetty.client.Address;
18
19
20
21
22
23
24
25
26 public class OortComet extends BayeuxClient
27 {
28 protected Oort _oort;
29 protected String _cometUrl;
30 protected String _cometSecret;
31 protected boolean _connected;
32 protected boolean _handshook;
33
34 OortComet(Oort oort,String cometUrl)
35 {
36 super(oort._httpClient,cometUrl,oort._timer);
37 _cometUrl=cometUrl;
38 _oort=oort;
39 addListener(new OortCometListener());
40 }
41
42 public boolean isConnected()
43 {
44 return _connected;
45 }
46
47 public boolean isHandshook()
48 {
49 return _handshook;
50 }
51
52 @Override
53 protected String extendOut(String message)
54 {
55 if (message==BayeuxClient.Handshake.__HANDSHAKE)
56 {
57 try
58 {
59 Message[] msg = _msgPool.parse(message);
60
61 Map<String,Object> oort = new HashMap<String,Object>();
62 oort.put("oort",_oort.getURL());
63 oort.put("oortSecret",_oort.getSecret());
64 oort.put("comet",_cometUrl);
65 Map<String,Object> ext = msg[0].getExt(true);
66 ext.put("oort",oort);
67
68 super.extendOut(msg[0]);
69 message= _msgPool.getJSON().toJSON(msg);
70
71 for (Message m:msg)
72 if (m instanceof MessageImpl)
73 ((MessageImpl)m).decRef();
74
75 }
76 catch (IOException e)
77 {
78 throw new IllegalArgumentException(e);
79 }
80
81 }
82 else
83 message=super.extendOut(message);
84
85 System.err.println(_oort.getURL()+" ==> "+message);
86 return message;
87 }
88
89 @Override
90 protected void metaConnect(boolean success, Message message)
91 {
92 _connected=success;
93 super.metaConnect(success,message);
94 }
95
96 @Override
97 protected void metaHandshake(boolean success, boolean reestablish, Message message)
98 {
99 synchronized (_oort)
100 {
101 _handshook=success;
102 super.metaHandshake(success,reestablish,message);
103 if (success)
104 {
105 Map<String,Object> ext = (Map<String,Object>)message.get("ext");
106 if (ext!=null)
107 {
108 Map<String,Object> oort = (Map<String,Object>)ext.get("oort");
109 if (oort!=null)
110 {
111 _cometSecret=(String)oort.get("cometSecret");
112
113 startBatch();
114 subscribe("/oort/cloud");
115 for (String channel : _oort._channels)
116 subscribe(channel);
117 publish("/oort/cloud",_oort.getKnownComets(),_cometSecret);
118 endBatch();
119 }
120 }
121 System.err.println(_oort.getURL()+" <== "+ext);
122 }
123 }
124 }
125
126 @Override
127 protected void metaPublishFail(Throwable e, Message[] messages)
128 {
129
130 super.metaPublishFail(e,messages);
131 }
132
133
134 protected class OortCometListener implements MessageListener
135 {
136 public void deliver(Client fromClient, Client toClient, Message msg)
137 {
138 String channelId = msg.getChannel();
139 if (msg.getData()!=null)
140 {
141 if (channelId.startsWith("/oort/"))
142 {
143 if (channelId.equals("/oort/cloud"))
144 {
145 Object[] data = (Object[])msg.getData();
146 Set<String> comets = new HashSet<String>();
147 for (Object o:data)
148 comets.add(o.toString());
149 _oort.observedComets(comets);
150 }
151
152 synchronized (_oort)
153 {
154 for( MessageListener listener : _oort._oortMessageListeners)
155 listener.deliver(fromClient,toClient,msg);
156 }
157 }
158 else
159 {
160 Channel channel = _oort._bayeux.getChannel(msg.getChannel(),false);
161 if (channel!=null)
162 channel.publish(_oort._oortClient,msg.getData(),msg.getId());
163 }
164 }
165 }
166 }
167 }