View Javadoc

1   //========================================================================
2   //Copyright 2006-2007 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //Licensed under the Apache License, Version 2.0 (the "License");
5   //you may not use this file except in compliance with the License.
6   //You may obtain a copy of the License at
7   //http://www.apache.org/licenses/LICENSE-2.0
8   //Unless required by applicable law or agreed to in writing, software
9   //distributed under the License is distributed on an "AS IS" BASIS,
10  //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  //See the License for the specific language governing permissions and
12  //limitations under the License.
13  //========================================================================
14  package org.mortbay.jetty.client;
15  
16  
17  import java.io.IOException;
18  import java.lang.reflect.Constructor;
19  import java.util.ArrayList;
20  import java.util.LinkedList;
21  import java.util.List;
22  import java.util.concurrent.ArrayBlockingQueue;
23  
24  import javax.servlet.http.Cookie;
25  
26  import org.mortbay.io.Buffer;
27  import org.mortbay.io.ByteArrayBuffer;
28  import org.mortbay.jetty.HttpHeaders;
29  import org.mortbay.jetty.client.security.Authorization;
30  import org.mortbay.jetty.client.security.SecurityListener;
31  import org.mortbay.jetty.servlet.PathMap;
32  import org.mortbay.log.Log;
33  
34  /**
35  * @author Greg Wilkins
36  * @author Guillaume Nodet
37  */
38  public class HttpDestination
39  {
40      private ByteArrayBuffer _hostHeader;
41      private final Address _address;
42      private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
43      private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
44      private final HttpClient _client;
45      private final boolean _ssl;
46      private int _maxConnections;
47      private int _pendingConnections=0;
48      private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10,true);
49      private int _newConnection=0;
50      private Address _proxy;
51      private Authorization _proxyAuthentication;
52      private PathMap _authorizations;
53      private List<Cookie> _cookies;
54  
55      public void dump() throws IOException
56      {
57          synchronized (this)
58          {
59              System.err.println(this);
60              System.err.println("connections="+_connections.size());
61              System.err.println("idle="+_idle.size());
62              System.err.println("pending="+_pendingConnections);
63              for (HttpConnection c : _connections)
64              {
65                  if (!c.isIdle())
66                      c.dump();
67              }
68          }
69      }
70      
71      /* The queue of exchanged for this destination if connections are limited */
72      private LinkedList<HttpExchange> _queue=new LinkedList<HttpExchange>();
73  
74      /* ------------------------------------------------------------ */
75      HttpDestination(HttpClient pool, Address address, boolean ssl, int maxConnections)
76      {
77          _client=pool;
78          _address=address;
79          _ssl=ssl;
80          _maxConnections=maxConnections;
81          String addressString = address.getHost();
82          if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
83          _hostHeader = new ByteArrayBuffer(addressString);
84      }
85  
86      /* ------------------------------------------------------------ */
87      public Address getAddress()
88      {
89          return _address;
90      }
91  
92      /* ------------------------------------------------------------ */
93      public Buffer getHostHeader()
94      {
95          return _hostHeader;
96      }
97      
98      /* ------------------------------------------------------------ */
99      public HttpClient getHttpClient()
100     {
101         return _client;
102     }
103 
104     /* ------------------------------------------------------------ */
105     public boolean isSecure()
106     {
107         return _ssl;
108     }
109     
110     /* ------------------------------------------------------------ */
111     public void addAuthorization(String pathSpec,Authorization authorization)
112     {
113         synchronized (this)
114         {
115             if (_authorizations==null)
116                 _authorizations=new PathMap();
117             _authorizations.put(pathSpec,authorization);
118         }
119         
120         // TODO query and remove methods
121     }
122 
123     /* ------------------------------------------------------------------------------- */
124     public void addCookie(Cookie cookie)
125     {
126         synchronized (this)
127         {
128             if (_cookies==null)
129                 _cookies=new ArrayList<Cookie>();
130             _cookies.add(cookie);
131         }
132         
133         // TODO query, remove and age methods
134     }
135     
136     /* ------------------------------------------------------------------------------- */
137     /**
138      * Get a connection. We either get an idle connection if one is available, or
139      * we make a new connection, if we have not yet reached maxConnections. If we
140      * have reached maxConnections, we wait until the number reduces.
141      * @param timeout max time prepared to block waiting to be able to get a connection
142      * @return
143      * @throws IOException
144      */
145     private HttpConnection getConnection(long timeout) throws IOException
146     {
147         HttpConnection connection = null;
148 
149         while ((connection == null) && (connection = getIdleConnection()) == null && timeout>0)
150         {
151             int totalConnections = 0;
152             boolean starting = false;
153             synchronized (this)
154             {
155                 totalConnections = _connections.size() + _pendingConnections;
156                 if (totalConnections < _maxConnections)
157                 {
158                     _newConnection++;
159                     startNewConnection();
160                     starting = true;
161                 }
162             }
163             
164             if (!starting)
165             {
166                 try
167                 {
168                     Thread.currentThread().sleep(200);
169                     timeout-=200;
170                 }
171                 catch (InterruptedException e)
172                 {
173                     Log.ignore(e);
174                 }
175             }
176             else
177             {
178                try
179                {
180                    Object o = _newQueue.take();
181                    if (o instanceof HttpConnection)
182                    {
183                        connection = (HttpConnection)o;
184                    }
185                    else
186                        throw (IOException)o;
187                }
188                catch (InterruptedException e)
189                {
190                    Log.ignore(e);
191                }
192            }
193         }
194         return connection;
195     }
196     
197     /* ------------------------------------------------------------------------------- */
198     public HttpConnection reserveConnection(long timeout) throws IOException
199     {
200         HttpConnection connection = getConnection(timeout);
201         if (connection != null)
202             connection.setReserved(true);
203         return connection;
204     }
205 
206     /* ------------------------------------------------------------------------------- */
207     public HttpConnection getIdleConnection() throws IOException
208     {
209         synchronized (this)
210         {
211             long now = System.currentTimeMillis();
212             long idleTimeout=_client.getIdleTimeout();
213  
214             // Find an idle connection
215             while (_idle.size() > 0)
216             {
217                 HttpConnection connection = _idle.remove(_idle.size()-1);
218                 long last = connection.getLast();
219                 if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
220                     return connection;
221                 else
222                 {
223                     _connections.remove(connection);
224                     connection.getEndPoint().close();
225                 }
226             }
227 
228             return null;
229         }
230     }
231 
232     /* ------------------------------------------------------------------------------- */
233     protected void startNewConnection() 
234     {
235         try
236         {
237             synchronized (this)
238             {
239                 _pendingConnections++;
240             }
241             _client._connector.startConnection(this);
242         }
243         catch(Exception e)
244         {
245             onConnectionFailed(e);
246         }
247     }
248 
249     /* ------------------------------------------------------------------------------- */
250     public void onConnectionFailed(Throwable throwable)
251     {
252         Throwable connect_failure=null;
253         
254         synchronized (this)
255         {
256             _pendingConnections--;
257             if (_newConnection>0)
258             {
259                 connect_failure=throwable;
260                 _newConnection--;
261             }
262             else if (_queue.size()>0)
263             {
264                 HttpExchange ex=_queue.removeFirst();
265                 ex.getEventListener().onConnectionFailed(throwable);
266             }
267         }
268 
269         if(connect_failure!=null)
270         {
271             try
272             {
273                 _newQueue.put(connect_failure);
274             }
275             catch (InterruptedException e)
276             {
277                 Log.ignore(e);
278             }
279         }
280     }
281 
282     /* ------------------------------------------------------------------------------- */
283     public void onException(Throwable throwable)
284     {
285         synchronized (this)
286         {
287             _pendingConnections--;
288             if (_queue.size()>0)
289             {
290                 HttpExchange ex=_queue.removeFirst();
291                 ex.getEventListener().onException(throwable);
292                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
293             }
294         }
295     }
296     
297     /* ------------------------------------------------------------------------------- */
298     public void onNewConnection(HttpConnection connection) throws IOException
299     {
300         HttpConnection q_connection=null;
301         
302         synchronized (this)
303         {
304             _pendingConnections--;
305             _connections.add(connection);
306             
307             if (_newConnection>0)
308             {
309                 q_connection=connection;
310                 _newConnection--;
311             }
312             else if (_queue.size()==0)
313             {
314                 _idle.add(connection);
315             }
316             else
317             {
318                 HttpExchange ex=_queue.removeFirst();
319                 connection.send(ex);
320             }
321         }
322 
323         if (q_connection!=null)
324         {
325             try
326             {
327                 _newQueue.put(q_connection);
328             }
329             catch (InterruptedException e)
330             {
331                 Log.ignore(e);
332             }
333         }
334     }
335 
336     /* ------------------------------------------------------------------------------- */
337     public void returnConnection(HttpConnection connection, boolean close) throws IOException
338     {
339         if (connection.isReserved())
340             connection.setReserved(false);
341         
342         if (close)
343         {
344             try
345             {
346                 connection.close();
347             }
348             catch(IOException e)
349             {
350                 Log.ignore(e);
351             }
352         }
353         
354         if (!_client.isStarted())
355             return;
356         
357         if (!close && connection.getEndPoint().isOpen())
358         {
359             synchronized (this)
360             {
361                 if (_queue.size()==0)
362                 {
363                     connection.setLast(System.currentTimeMillis());
364                     _idle.add(connection);
365                 }
366                 else
367                 {
368                     HttpExchange ex = _queue.removeFirst();
369                     connection.send(ex);
370                 }
371                 this.notifyAll();
372             }
373         }
374         else 
375         {
376             synchronized (this)
377             {
378                 _connections.remove(connection);
379                 if (!_queue.isEmpty())
380                     startNewConnection();
381             }
382         }
383     }
384     
385     /* ------------------------------------------------------------ */
386     public void send(HttpExchange ex) throws IOException
387     {
388         LinkedList<String> listeners = _client.getRegisteredListeners();
389 
390         if (listeners != null)
391         {
392             // Add registered listeners, fail if we can't load them
393             for (int i = listeners.size(); i > 0; --i)
394             {
395                 String listenerClass = listeners.get(i - 1);
396 
397                 try
398                 {
399                     Class listener = Class.forName(listenerClass);
400                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
401                     HttpEventListener elistener = (HttpEventListener) constructor.newInstance(this, ex);
402                     ex.setEventListener(elistener);
403                 }
404                 catch (Exception e)
405                 {
406                     Log.debug(e);
407                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass );
408                 }
409             }
410         }
411 
412         // Security is supported by default and should be the first consulted
413         if ( _client.hasRealms() )
414         {
415             ex.setEventListener( new SecurityListener( this, ex ) );
416         }
417         
418         doSend(ex);
419     }
420     
421     /* ------------------------------------------------------------ */
422     public void resend(HttpExchange ex) throws IOException
423     {
424         ex.getEventListener().onRetry();
425         doSend(ex);
426     }
427     
428     /* ------------------------------------------------------------ */
429     protected void doSend(HttpExchange ex) throws IOException
430     {
431         // add cookies
432         // TODO handle max-age etc.
433         if (_cookies!=null)
434         {
435             StringBuilder buf=null;
436             for (Cookie cookie : _cookies)
437             {
438                 if (buf==null)
439                     buf=new StringBuilder();
440                 else
441                     buf.append("; ");
442                 buf.append(cookie.getName()); // TODO quotes
443                 buf.append("=");
444                 buf.append(cookie.getValue()); // TODO quotes
445             }
446             if (buf!=null)
447                 ex.addRequestHeader(HttpHeaders.COOKIE,buf.toString());
448         }
449         
450         // Add any known authorizations
451         if (_authorizations!=null)
452         {
453             Authorization auth= (Authorization)_authorizations.match(ex.getURI());
454             if (auth !=null)
455                 ((Authorization)auth).setCredentials(ex);
456         }
457        
458         synchronized(this)
459         {
460             //System.out.println( "Sending: " + ex.toString() );
461 
462             HttpConnection connection=null;
463             if (_queue.size()>0 || (connection=getIdleConnection())==null || !connection.send(ex))
464             {
465                 _queue.add(ex);
466                 if (_connections.size()+_pendingConnections <_maxConnections)
467                 {
468                      startNewConnection();
469                 }
470             }
471         }
472     }
473 
474     /* ------------------------------------------------------------ */
475     public synchronized String toString()
476     {
477         return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
478     }
479     
480     /* ------------------------------------------------------------ */
481     public synchronized String toDetailString()
482     {
483         StringBuilder b = new StringBuilder();
484         b.append(toString());
485         b.append('\n');
486         synchronized(this)
487         {
488             for (HttpConnection connection : _connections)
489             {
490                 if (connection._exchange!=null)
491                 {
492                     b.append(connection.toDetailString());
493                     if (_idle.contains(connection))
494                         b.append(" IDLE");
495                     b.append('\n');
496                 }
497             }
498         }
499         b.append("--");
500         b.append('\n');
501         
502         return b.toString();
503     }
504 
505     /* ------------------------------------------------------------ */
506     public void setProxy(Address proxy)
507     {
508         _proxy=proxy;
509     }
510 
511     /* ------------------------------------------------------------ */
512     public Address getProxy()
513     {
514         return _proxy;
515     }
516 
517     /* ------------------------------------------------------------ */
518     public Authorization getProxyAuthentication()
519     {
520         return _proxyAuthentication;
521     }
522 
523     /* ------------------------------------------------------------ */
524     public void setProxyAuthentication(Authorization authentication)
525     {
526         _proxyAuthentication = authentication;
527     }
528 
529     /* ------------------------------------------------------------ */
530     public boolean isProxied()
531     {
532         return _proxy!=null;
533     }
534 
535     /* ------------------------------------------------------------ */
536     public void close() throws IOException
537     {
538         synchronized (this)
539         {
540             for (HttpConnection connection : _connections)
541             {
542                 connection.close();
543             }
544         }
545     }
546     
547 }