private E getPoolEntryBlocking( final T route, final Object state, finallong timeout, final TimeUnit tunit, final Future<E> future)throws IOException, InterruptedException, TimeoutException {
Datedeadline=null; if (timeout > 0) { // 指获得连接要等待多久,这个timeout就是ConnectionTimeout。 // 要追溯这个timeout是ConnectionTimeToLive deadline = newDate (System.currentTimeMillis() + tunit.toMillis(timeout)); } this.lock.lock(); try { // 获得一个特定路由的连接池,细节实际就是从可用队列里获得一个连接 final RouteSpecificPool<T, C, E> pool = getPool(route); E entry; for (;;) { Asserts.check(!this.isShutDown, "Connection pool shut down"); for (;;) { // 尝试从其中获得可用的连接,这个操作在第一次执行时一定返回null entry = pool.getFree(state); if (entry == null) { break; } // 如果超时了,关闭这个入口。因为不可能无限保持TCP连接,至少这是个不好的操作 // 所以超时了以后就关闭连接比较合理。这里的超时是指ConnectionTimeToLive if (entry.isExpired(System.currentTimeMillis())) { entry.close(); } // 如果已经被关闭,那就从可用队列里移除。 if (entry.isClosed()) { this.available.remove(entry); pool.free(entry, false); } else { break; } } // 这里一连串的if还是很严谨的 if (entry != null) { // 被使用了,当然要移出可用队列 this.available.remove(entry); // 加入已分配队列 this.leased.add(entry); onReuse(entry); return entry; }
// New connection is needed。 // 因为第一次的时候free一定是空的,所以会新建。 // 根据默认设置,这里会得到2,一会会详细看一下这个方法 finalintmaxPerRoute= getMax(route); // Shrink the pool prior to allocating a new connection // 超出了最大允许的连接数,即maxPerRoute,就回收一下 finalintexcess= Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); if (excess > 0) { for (inti=0; i < excess; i++) { finalElastUsed= pool.getLastUsed(); if (lastUsed == null) { break; } lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } } // 如果这个route的已分配连接没有超过允许的最大连接,就分配。 if (pool.getAllocatedCount() < maxPerRoute) { finalinttotalUsed=this.leased.size(); finalintfreeCapacity= Math.max(this.maxTotal - totalUsed, 0); if (freeCapacity > 0) { finalinttotalAvailable=this.available.size(); if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { finalElastUsed=this.available.removeLast(); lastUsed.close(); final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } // 真正新建一个connection,在这一步,为entry设置了超时时间。 // 这个超时时间在上面一个代码块中会用 finalCconn=this.connFactory.create(route); entry = pool.add(conn); this.leased.add(entry); return entry; } } booleansuccess=false; try { if (future.isCancelled()) { thrownewInterruptedException("Operation interrupted"); } // 否则就将这个申请的请求加到等待队列里 pool.queue(future); this.pending.add(future); if (deadline != null) { success = this.condition.awaitUntil(deadline); } else { this.condition.await(); success = true; } if (future.isCancelled()) { thrownewInterruptedException("Operation interrupted"); } } finally { // In case of 'success', we were woken up by the // connection pool and should now have a connection // waiting for us, or else we're shutting down. // Just continue in the loop, both cases are checked. // 所以接下来就重试了,注意最上面那个for (;;)无限循环 pool.unqueue(future); this.pending.remove(future); } // check for spurious wakeup vs. timeout if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) { break; } } thrownewTimeoutException("Timeout waiting for connection"); } finally { this.lock.unlock(); } }
然后再来稍微看一下这个final int maxPerRoute = getMax(route);方法:
1 2 3 4 5 6 7 8
privateintgetMax(final T route) { finalIntegerv=this.maxPerRoute.get(route); if (v != null) { return v.intValue(); } else { returnthis.defaultMaxPerRoute; } }
/** * {@code ClientConnectionPoolManager} maintains a pool of * {@link HttpClientConnection}s and is able to service connection requests * from multiple execution threads. Connections are pooled on a per route * basis. A request for a route which already the manager has persistent * connections for available in the pool will be services by leasing * a connection from the pool rather than creating a brand new connection. * <p> * {@code ClientConnectionPoolManager} maintains a maximum limit of connection * on a per route basis and in total. Per default this implementation will * create no more than than 2 concurrent connections per given route * and no more 20 connections in total. For many real-world applications * these limits may prove too constraining, especially if they use HTTP * as a transport protocol for their services. Connection limits, however, * can be adjusted using {@link ConnPoolControl} methods. * 上面这一段划重点,意思是默认值对于生产环境可能太小了,特别是当我们把HTTP当RPC用的时候 * </p> * <p> * Total time to live (TTL) set at construction time defines maximum life span * of persistent connections regardless of their expiration setting. No persistent * connection will be re-used past its TTL value. * </p> * <p> * The handling of stale connections was changed in version 4.4. * Previously, the code would check every connection by default before re-using it. * The code now only checks the connection if the elapsed time since * the last use of the connection exceeds the timeout that has been set. * The default timeout is set to 2000ms * </p> * * @since 4.3 */ @Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL) publicclassPoolingHttpClientConnectionManager implementsHttpClientConnectionManager, ConnPoolControl<HttpRoute>, Closeable { // 省略 }