博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Http请求连接池-HttpClient的AbstractConnPool源码分析
阅读量:7155 次
发布时间:2019-06-29

本文共 10545 字,大约阅读时间需要 35 分钟。

背景

在做服务化拆分的时候,若不是性能要求特别高的场景,我们一般对外暴露Http服务。Spring里提供了一个模板类RestTemplate,通过配置RestTemplate,我们可以快速地访问外部的Http服务。Http底层是通过Tcp的三次握手建立连接的,若每个请求都要重新建立连接,那开销是很大的,特别是对于消息体非常小的场景,开销更大。

若使用连接池的方式,来管理连接对象,能极大地提高服务的吞吐量。

RestTemplate底层是封装了HttpClient(笔者的版本是4.3.6),它提供了连接池机制来处理高并发网络请求。

示例

通常,我们采用如下的样板代码来构建HttpClient:

HttpClientBuilder builder = HttpClientBuilder.create();    builder.setMaxConnTotal(maxConnections).setMaxConnPerRoute(maxConnectionsPerRoute);    if (!connectionReuse) {        builder.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE);    }    if (!automaticRetry) {        builder.disableAutomaticRetries();    }    if (!compress) {        builder.disableContentCompression();    }    HttpClient httpClient = builder.build();

从上面的代码可以看出,HttpClient使用建造者设计模式来构造对象,最后一行代码构建对象,前面的代码是用来设置客户端的最大连接数、单路由最大连接数、是否使用长连接、压缩等特性。

源码分析

我们进入HttpClientBuilder的build()方法,会看到如下代码:

# 构造Http连接池管理器    final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager(            RegistryBuilder.
create() .register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("https", sslSocketFactory) .build()); if (defaultSocketConfig != null) { poolingmgr.setDefaultSocketConfig(defaultSocketConfig); } if (defaultConnectionConfig != null) { poolingmgr.setDefaultConnectionConfig(defaultConnectionConfig); } if (systemProperties) { String s = System.getProperty("http.keepAlive", "true"); if ("true".equalsIgnoreCase(s)) { s = System.getProperty("http.maxConnections", "5"); final int max = Integer.parseInt(s); poolingmgr.setDefaultMaxPerRoute(max); poolingmgr.setMaxTotal(2 * max); } } if (maxConnTotal > 0) { poolingmgr.setMaxTotal(maxConnTotal); } if (maxConnPerRoute > 0) { poolingmgr.setDefaultMaxPerRoute(maxConnPerRoute); } # Http连接管理器采用连接池的方式实现 connManager = poolingmgr;

默认情况下构造出的Http连接管理器是采用连接池的方式实现的。

我们进入 PoolingHttpClientConnectionManager的代码,其连接池的核心实现是依赖于 CPool类,而 CPool又继承了抽象类AbstractConnPool AbstractConnPool@ThreadSafe的注解,说明它是线程安全类,所以 HttpClient线程安全地获取、释放连接都依赖于 AbstractConnPool

接下来我来看最核心的AbstractConnPool类,以下是连接池的结构图:

alt text

连接池最重要的两个公有方法是 leaserelease,即获取连接和释放连接的两个方法。

lease 获取连接

@Override    public Future
lease(final T route, final Object state, final FutureCallback
callback) { Args.notNull(route, "Route"); Asserts.check(!this.isShutDown, "Connection pool shut down"); return new PoolEntryFuture
(this.lock, callback) { @Override public E getPoolEntry( final long timeout, final TimeUnit tunit) throws InterruptedException, TimeoutException, IOException { final E entry = getPoolEntryBlocking(route, state, timeout, tunit, this); onLease(entry); return entry; } }; }

lease方法返回的是一个 Future对象,即需要调用 Futureget方法,才可以得到PoolEntry的对象,它包含了一个连接的具体信息。

而获取连接是通过 getPoolEntryBlocking方法实现的,通过函数名可以知道,这是一个阻塞的方法,即该route所对应的连接池中的连接不够用时,该方法就会阻塞,直到该 route所对应的连接池有连接释放,方法才会被唤醒;或者方法一直等待,直到连接超时抛出异常。

private E getPoolEntryBlocking(            final T route, final Object state,            final long timeout, final TimeUnit tunit,            final PoolEntryFuture
future) throws IOException, InterruptedException, TimeoutException { Date deadline = null; // 设置连接超时时间戳 if (timeout > 0) { deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout)); } // 获取连接,并修改修改连接池,所以加锁--->线程安全 this.lock.lock(); try { // 从Map中获取该route对应的连接池,若Map中没有,则创建该route对应的连接池 final RouteSpecificPool
pool = getPool(route); E entry = null; while (entry == null) { Asserts.check(!this.isShutDown, "Connection pool shut down"); for (;;) { // 获取 同一状态的 空闲连接,即从available链表的头部中移除,添加到leased集合中 entry = pool.getFree(state); // 若返回连接为空,跳出循环 if (entry == null) { break; } // 若连接已过期,则关闭连接 if (entry.isExpired(System.currentTimeMillis())) { entry.close(); } else if (this.validateAfterInactivity > 0) { if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) { if (!validate(entry)) { entry.close(); } } } if (entry.isClosed()) { // 若该连接已关闭,则总的available链表中删除该连接 this.available.remove(entry); // 从该route对应的连接池的leased集合中删除该连接,并且不回收到available链表中 pool.free(entry, false); } else { break; } } // 跳出for循环 if (entry != null) { // 若获取的连接不为空,将连接从总的available链表移除,并添加到leased集合中 // 获取连接成功,直接返回 this.available.remove(entry); this.leased.add(entry); onReuse(entry); return entry; } // 计算该route的最大连接数 // New connection is needed final int maxPerRoute = getMax(route); // Shrink the pool prior to allocating a new connection // 计算该route连接池中的连接数 是否 大于等于 route最大连接数 final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); // 若大于等于 route最大连接数,则收缩该route的连接池 if (excess > 0) { for (int i = 0; i < excess; i++) { // 获取该route连接池中最不常用的空闲连接,即available链表末尾的连接 // 因为回收连接时,总是将连接添加到available链表的头部,所以链表尾部的连接是最有可能过期的 final E lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } // 关闭连接,并从总的空闲链表以及route对应的连接池中删除 lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } } // 该route的连接池大小 小于 route最大连接数 if (pool.getAllocatedCount() < maxPerRoute) { final int totalUsed = this.leased.size(); final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); if (freeCapacity > 0) { final int totalAvailable = this.available.size(); // 总的空闲连接数 大于等于 总的连接池剩余容量 if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { // 从总的available链表中 以及 route对应的连接池中 删除连接,并关闭连接 final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool
otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } // 创建新连接,并添加到总的leased集合以及route连接池的leased集合中,函数返回 final C conn = this.connFactory.create(route); entry = pool.add(conn); this.leased.add(entry); return entry; } } //route的连接池已满,无法分配连接 boolean success = false; try { // 将该获取连接的任务放入pending队列 pool.queue(future); this.pending.add(future); // 阻塞等待,若在超时之前被唤醒,则返回true;若直到超时才返回,则返回false success = future.await(deadline); } finally { // In case of 'success', we were woken up by the // connection pool and should now have a connection // waiting for us, or else we're shutting down. // Just continue in the loop, both cases are checked. // 无论是 被唤醒返回、超时返回 还是被 中断异常返回,都会进入finally代码段 // 从pending队列中移除 pool.unqueue(future); this.pending.remove(future); } // check for spurious wakeup vs. timeout // 判断是伪唤醒 还是 连接超时 // 若是 连接超时,则跳出while循环,并抛出 连接超时的异常; // 若是 伪唤醒,则继续循环获取连接 if (!success && (deadline != null) && (deadline.getTime() <= System.currentTimeMillis())) { break; } } throw new TimeoutException("Timeout waiting for connection"); } finally { // 释放锁 this.lock.unlock(); } }

release 释放连接

@Override    public void release(final E entry, final boolean reusable) {        // 获取锁        this.lock.lock();        try {            // 从总的leased集合中移除连接            if (this.leased.remove(entry)) {                final RouteSpecificPool
pool = getPool(entry.getRoute()); // 回收连接 pool.free(entry, reusable); if (reusable && !this.isShutDown) { this.available.addFirst(entry); onRelease(entry); } else { entry.close(); } // 获取pending队列队头的任务(先进先出原则),唤醒该阻塞的任务 PoolEntryFuture
future = pool.nextPending(); if (future != null) { this.pending.remove(future); } else { future = this.pending.poll(); } if (future != null) { future.wakeup(); } } } finally { // 释放锁 this.lock.unlock(); } }

总结

AbstractConnPool其实就是通过在获取连接、释放连接时加锁,来实现线程安全,思路非常简单,但它没有在route对应的连接池中加锁对象,即 RouteSpecificPool的获取连接、释放连接操作是不加锁的,因为已经在 AbstractConnPool的外部调用中加锁,所以是线程安全的,简化了设计。

另外每个route对应一个连接池,实现了在host级别的隔离,若下游的某台提供服务的主机挂了,无效的连接最多只占用该route对应的连接池,不会占用整个连接池,从而拖垮整个服务。

以上。

原文链接

转载地址:http://uwrgl.baihongyu.com/

你可能感兴趣的文章
硬盘的读写原理
查看>>
eclipse svn时忽略target .project .classpath等目录文件
查看>>
iOS多点触控与手势识别
查看>>
Sql server--索引
查看>>
UML建模工具
查看>>
视频合成软件哪个好,怎么把多个视频快速合并成一个视频
查看>>
在Linux系统中创建SSH服务器别名
查看>>
【JMS 4】spring 整合activemq
查看>>
PDF文档页码怎么设置
查看>>
java单例模式
查看>>
多线程基础 (八)NSOperation相关
查看>>
【已解决】PHP项目需求:在现有网站中每个页面增加一个get参数
查看>>
Linux下安装oracle10g全解
查看>>
软件分层思想
查看>>
JAVA测试实际代码多少行,注释多少行,空格多少行?
查看>>
css与css3对比
查看>>
day7-select Port multiplexing, multiple IO
查看>>
查看哪些用户登录了ftp服务器
查看>>
学习Nagios(1)监控门禁
查看>>
30个优秀的大自然风格网页设计作品欣赏
查看>>