PooledDataSource

PooledDataSource 内部实现了连接池功能,用于复用数据库连接。因此,从效率上来说,
PooledDataSource 要高于UnpooledDataSource。PooledDataSource需要借助一些辅助类帮助它完成连接池的功能,所以接下来,我们先来认识一下相关的辅助类。

辅助类介绍

PooledDataSource 需要借助两个辅助类帮其完成功能,这两个辅助类分别是:

  • PoolState
    • 用于记录连接池运行时的状态,比如连接获取次数,无效连接数量等
    • 内部定义了两个 PooledConnection 集合,用于存储空闲连接和活跃连接
  • PooledConnection
    • 内部定义了一个 Connection 类型的变量,用于指向真实的数据库连接
    • 内部还定义了一个 Connection 的代理类,用于对部分方法调用进行拦截

PooledConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class PooledConnection implements InvocationHandler {
private static final String CLOSE = "close";
private static final Class<?>[] IFACES =
new Class<?>[]{Connection.class};
private final int hashCode;
private final PooledDataSource dataSource;
// 真实的数据库连接
private final Connection realConnection;
// 数据库连接代理
private final Connection proxyConnection;
// 从连接池中取出连接时的时间戳
private long checkoutTimestamp;
// 数据库连接创建时间
private long createdTimestamp;
// 数据库连接最后使用时间
private long lastUsedTimestamp;
// connectionTypeCode = (url + username + password).hashCode()
private int connectionTypeCode;
// 表示连接是否有效
private boolean valid;

public PooledConnection(
Connection connection, PooledDataSource dataSource) {
this.hashCode = connection.hashCode();
this.realConnection = connection;
this.dataSource = dataSource;
this.createdTimestamp = System.currentTimeMillis();
this.lastUsedTimestamp = System.currentTimeMillis();
this.valid = true;
// 创建 Connection 的代理类对象
this.proxyConnection = (Connection) Proxy.newProxyInstance(
Connection.class.getClassLoader(), IFACES, this);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {...}
}

PoolState

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class PoolState {
protected PooledDataSource dataSource;
// 空闲连接列表
protected final List<PooledConnection> idleConnections =
new ArrayList<PooledConnection>();
// 活跃连接列表
protected final List<PooledConnection> activeConnections =
new ArrayList<PooledConnection>();
// 从连接池中获取连接的次数
protected long requestCount = 0;
// 请求连接总耗时(单位:毫秒)
protected long accumulatedRequestTime = 0;
// 连接执行时间总耗时
protected long accumulatedCheckoutTime = 0;
// 执行时间超时的连接数
protected long claimedOverdueConnectionCount = 0;
// 超时时间累加值
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
// 等待时间累加值
protected long accumulatedWaitTime = 0;
// 等待次数
protected long hadToWaitCount = 0;
// 无效连接数
protected long badConnectionCount = 0;
}

获取连接

PooledDataSource 会将用过的连接进行回收,以便可以复用连接。因此从 PooledDataSource 获取连接时:

  • 如果空闲链接列表里有连接时,可直接取用。
  • 如果没有空闲连接怎么办呢?此时有两种解决办法:
    • 要么创建新连接
    • 要么等待其他连接完成任务

具体怎么做,需视情况而定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public Connection getConnection() throws SQLException {
// 返回 Connection 的代理对象
return popConnection(dataSource.getUsername(),
dataSource.getPassword()).getProxyConnection();
}

private PooledConnection popConnection(String username, String password)
throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
while (conn == null) {
synchronized (state) {
// 检测空闲连接集合(idleConnections)是否为空
if (!state.idleConnections.isEmpty()) {
// idleConnections 不为空,表示有空闲连接可以使用
conn = state.idleConnections.remove(0);
} else {
// 暂无空闲连接可用,但如果活跃连接数还未超出限制
// poolMaximumActiveConnections,则可创建新的连接
if (state.activeConnections.size() < poolMaximumActiveConnections) {
// 创建新连接
conn = new PooledConnection(dataSource.getConnection(), this);
} else { // 连接池已满,不能创建新连接
// 取出运行时间最长的连接
PooledConnection oldestActiveConnection =
state.activeConnections.get(0);
// 获取运行时长
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
// 检测运行时长是否超出限制,即超时
if (longestCheckoutTime > poolMaximumCheckoutTime) {
// 累加超时相关的统计字段
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections +=
longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
// 从活跃连接集合中移除超时连接
state.activeConnections.remove(oldestActiveConnection);
// 若连接未设置自动提交,此处进行回滚操作
if (!oldestActiveConnection
.getRealConnection().getAutoCommit()){
try {
oldestActiveConnection
.getRealConnection().rollback();
} catch (SQLException e) {...}
}
// 创建一个新的 PooledConnection,注意,此处复用
// oldestActiveConnection 的 realConnection 变量
conn = new PooledConnection(
oldestActiveConnection.getRealConnection(),this);。
// 复用 oldestActiveConnection 的一些信息,注意
// PooledConnection 中的 createdTimestamp 用于记录
// Connection 的创建时间,而非 PooledConnection
// 的创建时间。所以这里要复用原连接的时间信息。
conn.setCreatedTimestamp(
oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(
oldestActiveConnection.getLastUsedTimestamp());
// 设置连接为无效状态
oldestActiveConnection.invalidate();
} else { // 运行时间最长的连接并未超时
try {
if (!countedWait) {
state.hadToWaitCount++;
countedWait = true;
}
long wt = System.currentTimeMillis();
// 当前线程进入等待状态
state.wait(poolTimeToWait);
state.accumulatedWaitTime +=
System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
if (conn != null) {
// 检测连接是否有效,isValid 方法除了会检测 valid 是否为 true,
// 还会通过 PooledConnection 的 pingConnection 方法执行 SQL 语句,
// 检测连接是否可用。pingConnection 方法的逻辑不复杂,大家自行分析。

if (conn.isValid()) {
if (!conn.getRealConnection().getAutoCommit()) {
// 进行回滚操作
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(
dataSource.getUrl(), username, password));
// 设置统计字段
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
state.activeConnections.add(conn);
state.requestCount++;
state.accumulatedRequestTime +=
System.currentTimeMillis() - t;
} else {
// 连接无效,此时累加无效连接相关的统计字段
state.badConnectionCount++;
localBadConnectionCount++;
conn = null;
if(localBadConnectionCount > (poolMaximumIdleConnections
+ poolMaximumLocalBadConnectionTolerance)) {
throw new SQLException(...);
}
}
}
}
}
if (conn == null) {
throw new SQLException(...);
}
return conn;
}

从连接池中获取连接首先会遇到两种情况:

  • 连接池中有空闲连接
    • 处理措施就很简单了,把连接取出返回即可
  • 连接池中无空闲连接
    • 活跃连接数没有超出最大活跃连接数
      • 直接创建新的连接即可
    • 活跃连接数超出最大活跃连接数
      • 活跃连接的运行时间超出限制,即超时
        • 超时连接强行中断,并进行回滚
      • 活跃连接未超时
        • 只能等待
          伪代码逻辑如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (连接池中有空闲连接) {
1. 将连接从空闲连接集合中移除
} else {
if (活跃连接数未超出限制) {
1. 创建新连接
} else {
1. 从活跃连接集合中取出第一个元素
2. 获取连接运行时长
if (连接超时) {
1. 将连接从活跃集合中移除
2. 复用原连接的成员变量,并创建新的 PooledConnection 对象
} else {
1. 线程进入等待状态
2. 线程被唤醒后,重新执行以上逻辑
}
}
}

流程图大致描绘

image.png

回收连接

相比获取连接,回收连接的逻辑要简单的多。回收连接成功与否只取决于空闲连接集
合的状态,所需处理情况很少,因此比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected void pushConnection(PooledConnection conn) throws SQLException {
synchronized (state) {
// 从活跃连接池中移除连接
state.activeConnections.remove(conn);
if (conn.isValid()) {
// 空闲连接集合未满
if (state.idleConnections.size() < poolMaximumIdleConnections
&& conn.getConnectionTypeCode()==expectedConnectionTypeCode){
state.accumulatedCheckoutTime += conn.getCheckoutTime();
// 回滚未提交的事务
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
// 创建新的 PooledConnection
PooledConnection newConn = new PooledConnection(
conn.getRealConnection(), this);
state.idleConnections.add(newConn);
// 复用时间信息
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
// 将原连接置为无效状态
conn.invalidate();
// 通知等待的线程
state.notifyAll();
} else { // 空闲连接集合已满
state.accumulatedCheckoutTime += conn.getCheckoutTime();
// 回滚未ᨀ交的事务
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
// 关闭数据库连接
conn.getRealConnection().close();
conn.invalidate();
}
} else {
state.badConnectionCount++;
}
}
}
  • 首先将连接从活跃连接集合中移除
  • 再根据空闲集合是否有空闲空间进行后续处理
    • 如果空闲集合未满,此时复用原连接的字段信息创建新的连接,并将其放入空闲集合中即可
    • 若空闲集合已满,此时无需回收连接,直接关闭即可

我们知道获取连接的方法 popConnection 是由 getConnection 方法调用的,那回收连接
的方法 pushConnection 是由谁调用的呢?答案是 PooledConnection 中的代理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// -☆- PooledConnection
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
// 检测 close 方法是否被调用,若被调用则拦截之
if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)){
// 将回收连接中,而不是直接将连接关闭
dataSource.pushConnection(this);
return null;
} else {
try {
if (!Object.class.equals(method.getDeclaringClass())) {
checkConnection();
}
// 调用真实连接的目标方法
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
}

getConnection 方法返回的是Connection代理对象。代理对象中的方法被调用时,会被上面的代理逻辑所拦截。如果代理对象的close方法被调用,MyBatis并不会直接调用真实连接的close方法关闭连接,而是调用pushConnection方法回收连接。同时会唤醒处于睡眠中的线程,使其恢复运行