PooledDataSource
PooledDataSource 内部实现了连接池功能,用于复用数据库连接。因此,从效率上来说,
PooledDataSource 要高于UnpooledDataSource。PooledDataSource需要借助一些辅助类帮助它完成连接池的功能,所以接下来,我们先来认识一下相关的辅助类。
辅助类介绍
PooledDataSource 需要借助两个辅助类帮其完成功能,这两个辅助类分别是:
- PoolState
- 用于记录连接池运行时的状态,比如连接获取次数,无效连接数量等
- 内部定义了两个 PooledConnection 集合,用于存储空闲连接和活跃连接
- PooledConnection
- 内部定义了一个 Connection 类型的变量,用于指向真实的数据库连接
- 内部还定义了一个 Connection 的代理类,用于对部分方法调用进行拦截
PooledConnection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| class PooledConnection implements InvocationHandler { private static final String CLOSE = "close"; private static final Class<?>[] IFACES = new Class<?>[]{Connection.class}; private final int hashCode; private final PooledDataSource dataSource; // 真实的数据库连接 private final Connection realConnection; // 数据库连接代理 private final Connection proxyConnection; // 从连接池中取出连接时的时间戳 private long checkoutTimestamp; // 数据库连接创建时间 private long createdTimestamp; // 数据库连接最后使用时间 private long lastUsedTimestamp; // connectionTypeCode = (url + username + password).hashCode() private int connectionTypeCode; // 表示连接是否有效 private boolean valid; public PooledConnection( Connection connection, PooledDataSource dataSource) { this.hashCode = connection.hashCode(); this.realConnection = connection; this.dataSource = dataSource; this.createdTimestamp = System.currentTimeMillis(); this.lastUsedTimestamp = System.currentTimeMillis(); this.valid = true; // 创建 Connection 的代理类对象 this.proxyConnection = (Connection) Proxy.newProxyInstance( Connection.class.getClassLoader(), IFACES, this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {...} }
|
PoolState
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class PoolState { protected PooledDataSource dataSource; // 空闲连接列表 protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>(); // 活跃连接列表 protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>(); // 从连接池中获取连接的次数 protected long requestCount = 0; // 请求连接总耗时(单位:毫秒) protected long accumulatedRequestTime = 0; // 连接执行时间总耗时 protected long accumulatedCheckoutTime = 0; // 执行时间超时的连接数 protected long claimedOverdueConnectionCount = 0; // 超时时间累加值 protected long accumulatedCheckoutTimeOfOverdueConnections = 0; // 等待时间累加值 protected long accumulatedWaitTime = 0; // 等待次数 protected long hadToWaitCount = 0; // 无效连接数 protected long badConnectionCount = 0; }
|
获取连接
PooledDataSource 会将用过的连接进行回收,以便可以复用连接。因此从 PooledDataSource 获取连接时:
- 如果空闲链接列表里有连接时,可直接取用。
- 如果没有空闲连接怎么办呢?此时有两种解决办法:
具体怎么做,需视情况而定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
| public Connection getConnection() throws SQLException { // 返回 Connection 的代理对象 return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection(); }
private PooledConnection popConnection(String username, String password) throws SQLException { boolean countedWait = false; PooledConnection conn = null; long t = System.currentTimeMillis(); int localBadConnectionCount = 0; while (conn == null) { synchronized (state) { // 检测空闲连接集合(idleConnections)是否为空 if (!state.idleConnections.isEmpty()) { // idleConnections 不为空,表示有空闲连接可以使用 conn = state.idleConnections.remove(0); } else { // 暂无空闲连接可用,但如果活跃连接数还未超出限制 // poolMaximumActiveConnections,则可创建新的连接 if (state.activeConnections.size() < poolMaximumActiveConnections) { // 创建新连接 conn = new PooledConnection(dataSource.getConnection(), this); } else { // 连接池已满,不能创建新连接 // 取出运行时间最长的连接 PooledConnection oldestActiveConnection = state.activeConnections.get(0); // 获取运行时长 long longestCheckoutTime = oldestActiveConnection.getCheckoutTime(); // 检测运行时长是否超出限制,即超时 if (longestCheckoutTime > poolMaximumCheckoutTime) { // 累加超时相关的统计字段 state.claimedOverdueConnectionCount++; state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; state.accumulatedCheckoutTime += longestCheckoutTime; // 从活跃连接集合中移除超时连接 state.activeConnections.remove(oldestActiveConnection); // 若连接未设置自动提交,此处进行回滚操作 if (!oldestActiveConnection .getRealConnection().getAutoCommit()){ try { oldestActiveConnection .getRealConnection().rollback(); } catch (SQLException e) {...} } // 创建一个新的 PooledConnection,注意,此处复用 // oldestActiveConnection 的 realConnection 变量 conn = new PooledConnection( oldestActiveConnection.getRealConnection(),this);。 // 复用 oldestActiveConnection 的一些信息,注意 // PooledConnection 中的 createdTimestamp 用于记录 // Connection 的创建时间,而非 PooledConnection // 的创建时间。所以这里要复用原连接的时间信息。 conn.setCreatedTimestamp( oldestActiveConnection.getCreatedTimestamp()); conn.setLastUsedTimestamp( oldestActiveConnection.getLastUsedTimestamp()); // 设置连接为无效状态 oldestActiveConnection.invalidate(); } else { // 运行时间最长的连接并未超时 try { if (!countedWait) { state.hadToWaitCount++; countedWait = true; } long wt = System.currentTimeMillis(); // 当前线程进入等待状态 state.wait(poolTimeToWait); state.accumulatedWaitTime += System.currentTimeMillis() - wt; } catch (InterruptedException e) { break; } } } } if (conn != null) { // 检测连接是否有效,isValid 方法除了会检测 valid 是否为 true, // 还会通过 PooledConnection 的 pingConnection 方法执行 SQL 语句, // 检测连接是否可用。pingConnection 方法的逻辑不复杂,大家自行分析。 if (conn.isValid()) { if (!conn.getRealConnection().getAutoCommit()) { // 进行回滚操作 conn.getRealConnection().rollback(); } conn.setConnectionTypeCode(assembleConnectionTypeCode( dataSource.getUrl(), username, password)); // 设置统计字段 conn.setCheckoutTimestamp(System.currentTimeMillis()); conn.setLastUsedTimestamp(System.currentTimeMillis()); state.activeConnections.add(conn); state.requestCount++; state.accumulatedRequestTime += System.currentTimeMillis() - t; } else { // 连接无效,此时累加无效连接相关的统计字段 state.badConnectionCount++; localBadConnectionCount++; conn = null; if(localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) { throw new SQLException(...); } } } } } if (conn == null) { throw new SQLException(...); } return conn; }
|
从连接池中获取连接首先会遇到两种情况:
- 连接池中有空闲连接
- 连接池中无空闲连接
- 活跃连接数没有超出最大活跃连接数
- 活跃连接数超出最大活跃连接数
- 活跃连接的运行时间超出限制,即超时
- 活跃连接未超时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| if (连接池中有空闲连接) { 1. 将连接从空闲连接集合中移除 } else { if (活跃连接数未超出限制) { 1. 创建新连接 } else { 1. 从活跃连接集合中取出第一个元素 2. 获取连接运行时长 if (连接超时) { 1. 将连接从活跃集合中移除 2. 复用原连接的成员变量,并创建新的 PooledConnection 对象 } else { 1. 线程进入等待状态 2. 线程被唤醒后,重新执行以上逻辑 } } }
|
流程图大致描绘
回收连接
相比获取连接,回收连接的逻辑要简单的多。回收连接成功与否只取决于空闲连接集
合的状态,所需处理情况很少,因此比较简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| protected void pushConnection(PooledConnection conn) throws SQLException { synchronized (state) { // 从活跃连接池中移除连接 state.activeConnections.remove(conn); if (conn.isValid()) { // 空闲连接集合未满 if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode()==expectedConnectionTypeCode){ state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 回滚未提交的事务 if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } // 创建新的 PooledConnection PooledConnection newConn = new PooledConnection( conn.getRealConnection(), this); state.idleConnections.add(newConn); // 复用时间信息 newConn.setCreatedTimestamp(conn.getCreatedTimestamp()); newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp()); // 将原连接置为无效状态 conn.invalidate(); // 通知等待的线程 state.notifyAll(); } else { // 空闲连接集合已满 state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 回滚未ᨀ交的事务 if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } // 关闭数据库连接 conn.getRealConnection().close(); conn.invalidate(); } } else { state.badConnectionCount++; } } }
|
- 首先将连接从活跃连接集合中移除
- 再根据空闲集合是否有空闲空间进行后续处理
- 如果空闲集合未满,此时复用原连接的字段信息创建新的连接,并将其放入空闲集合中即可
- 若空闲集合已满,此时无需回收连接,直接关闭即可
我们知道获取连接的方法 popConnection 是由 getConnection 方法调用的,那回收连接
的方法 pushConnection 是由谁调用的呢?答案是 PooledConnection 中的代理逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| // -☆- PooledConnection public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); // 检测 close 方法是否被调用,若被调用则拦截之 if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)){ // 将回收连接中,而不是直接将连接关闭 dataSource.pushConnection(this); return null; } else { try { if (!Object.class.equals(method.getDeclaringClass())) { checkConnection(); } // 调用真实连接的目标方法 return method.invoke(realConnection, args); } catch (Throwable t) { throw ExceptionUtil.unwrapThrowable(t); } } }
|
getConnection 方法返回的是Connection代理对象。代理对象中的方法被调用时,会被上面的代理逻辑所拦截。如果代理对象的close方法被调用,MyBatis并不会直接调用真实连接的close方法关闭连接,而是调用pushConnection方法回收连接。同时会唤醒处于睡眠中的线程,使其恢复运行