您好,欢迎来到华佗小知识。
搜索
您的当前位置:首页用JAVA实现无等待数据库连接池

用JAVA实现无等待数据库连接池

来源:华佗小知识
我们都知道数据库连接是一种有限和非常昂贵的应用资源,怎样对这些资源进行高效的管理,能有效的改善整个系统的性能和健壮性。数据库连接池正是针对这个问题而提出来的。 数据库连接负责分配、释放和管理数据库连接。使数据库连接可以重复利用,而不是用一次建立一次数据库连接。

基本思路

建立一个容器

每次到这个容器里得到连接,如果为空则建立一个新连接。 当连接使用完后归还给这个容器

这里就有二个难点

1. 容器必需是同步的,线程安全的。 2. 连接怎归还连接池

方案:

针对这二个难点,我们分别提出了二个解决方法

1.使用ConcurrentLinkedQueue实现先进先出队列

ConcurrentLinkedQueue无界线程安全队列介绍

这个类在java.util.concurrent包中,我们来看看官方是怎描述这个类的 一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素.此实现采用了有效的“无等待 (wait-free)”算法 2.动态代理实现连接归还连接池

大家也可以参考刘冬在IBM发表的文章

http://www.ibm.com/developerworks/cn/java/l-connpoolproxy/

接下来我们来看看整体代码

import java.io.PrintWriter;

import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.sql.Connection; import java.sql.Driver;

import java.sql.SQLException; import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock;

import javax.sql.DataSource;

public class JavaGGDataSource implements DataSource { //连接队列

private ConcurrentLinkedQueue<_Connection> connQueue = new ConcurrentLinkedQueue<_Connection>(); //存放所有连接容器

private List<_Connection> conns = new ArrayList<_Connection>(); private Driver driver = null;

private String jdbcUrl = null; private String user = null; private String password = null;

private int maxActive = -1;// -1为不连接数 private String driverClass = null;

private int timeout = 1000 * 60 * 60 * 4;// 默认为4小时,即4小时没有任何sql操作就把所有连接重新建立连接

private AtomicLong lastCheckout = new AtomicLong(System.currentTimeMillis());

private AtomicInteger connCount = new AtomicInteger(); //线程锁,主要用于新建连接和清空连接时

private ReentrantLock lock = new ReentrantLock();

public void closeAllConnection() { }

/**

* 归还连接给连接池 *

* @param conn *@date 2009-8-13 *@author eric.chan */

public void offerConnection(_Connection conn) { connQueue.offer(conn); }

@Override

public Connection getConnection() throws SQLException { return getConnection(user, password); }

/**

* 从池中得到连接,如果池中没有连接,则建立新的sql连接 *

* @param username * @param password * @author eric.chan */

@Override

public Connection getConnection(String username, String password) throws SQLException { checkTimeout();

_Connection conn = connQueue.poll(); if (conn == null) {

if (maxActive > 0 && connCount.get() >= maxActive) { for (;;) {// 采用自旋方法 从已满的池中得到一个连接 conn = connQueue.poll(); if (conn != null) break; else

continue; } }

lock.lock(); try {

if (maxActive > 0 && connCount.get() >= maxActive) { // 处理并发问题

return getConnection(username, password); }

Properties info = new Properties(); info.put(\"user\ info.put(\"password\

Connection conn1 = loadDriver().connect(jdbcUrl, info); conn = new _Connection(conn1, this);

int c = connCount.incrementAndGet();// 当前连接数加1 conns.add(conn);

System.out.println(\"info : init no. \" + c + \" connectioned\");

} finally {

lock.unlock();

}

} }

lastCheckout.getAndSet(System.currentTimeMillis()); return conn.getConnection();

/**

* 检查最后一次的连接时间 *

* @throws SQLException *@date 2009-8-13 *@author eric.chan */

private void checkTimeout() throws SQLException { long now = System.currentTimeMillis(); long lt = lastCheckout.get(); if ((now - lt) > timeout) { _Connection conn = null; lock.lock(); try {

if(connCount.get()==0)return;

while ((conn = connQueue.poll()) != null) {

System.out.println(\"connection \" + conn + \" close \"); conn.close(); conn = null; }

for(_Connection con:conns){ con.close(); }

conns.clear();

System.out.println(\"info : reset all connections\"); connCount.getAndSet(0);// 重置连接数计数器

lastCheckout.getAndSet(System.currentTimeMillis()); } finally {

lock.unlock(); } } }

/** *

* @return

*@date 2009-8-13 *@author eric.chan

*/

private Driver loadDriver() { if (driver == null) { try {

driver = (Driver)

Class.forName(driverClass).newInstance();

} catch (ClassNotFoundException e) {

System.out.println(\"error : can not find driver class \" + driverClass);

} catch (Exception e) { e.printStackTrace(); } }

return driver; }

@Override

public PrintWriter getLogWriter() throws SQLException { return null; }

@Override

public int getLoginTimeout() throws SQLException { return 0; }

@Override

public void setLogWriter(PrintWriter out) throws SQLException { }

@Override

public void setLoginTimeout(int seconds) throws SQLException { }

@Override

public boolean isWrapperFor(Class iface) throws SQLException { throw new SQLException(\"no Implemented isWrapperFor method\"); }

@Override

public T unwrap(Class iface) throws SQLException {

throw new SQLException(\"no Implemented unwrap method\"); }

public String getJdbcUrl() { return jdbcUrl; }

public void setJdbcUrl(String jdbcUrl) { this.jdbcUrl = jdbcUrl; }

public String getUsername() { return user; }

public void setUsername(String user) { this.user = user; }

public String getPassword() { return password; }

public void setPassword(String password) { this.password = password; }

public String getDriverClass() { return driverClass; }

public void setDriverClass(String driverClass) { this.driverClass = driverClass; }

public int getTimeout() { return timeout; }

public void setTimeout(int timeout) { this.timeout = timeout * 1000; }

public void setMaxActive(int maxActive) { this.maxActive = maxActive; }

public int getMaxActive() { return maxActive; } } /**

* 数据连接的自封装 ,是java.sql.Connection的一个钩子,主要是处理close方法 *

* @author eric * */

class _Connection implements InvocationHandler {

private final static String CLOSE_METHOD_NAME = \"close\";

private final Connection conn; private final JavaGGDataSource ds;

_Connection(Connection conn, JavaGGDataSource ds) { this.conn = conn; this.ds = ds; }

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object obj = null;

// 判断是否调用了close的方法,如果调用close方法则把连接置为无用状态 if (CLOSE_METHOD_NAME.equals(method.getName())) { // 归还连接给连接池

ds.offerConnection(this); } else {

// 运行非close的方法

obj = method.invoke(conn, args); }

return obj; }

public Connection getConnection() {

// 返回数据库连接conn的接管类,以便截住close方法 Connection conn2 = (Connection)

Proxy.newProxyInstance(conn.getClass().getClassLoader(), new Class[] { Connection.class }, this); return conn2; }

public void close() throws SQLException {

// 调用真正的close方法,一但调用此方法就直接关闭连接 if(conn!=null&&!conn.isClosed()) conn.close(); } }

_Connection类是一个私有类,主要实现一个对Connection动态代理的功能(有点象windows的钩子)

说白了就是实现调用connection.close方法时我们映射到另一个方法上. 呵呵,是不是好简单呢,代码没有多复杂。

这里有一个问题要说明一吓:如果设置的maxActive值小于当前线程总数,那么当并发非常大时会出现资源争夺情况,一吓子cpu就会提高不小,所以建议设为无,或大于线程总数的值。

接下来我们测试测试

开五十个线程,对同一个表进行select/insert 10000次操作,每次select/insert一条记录

代码如下:

public static void main(String[] args) {

JavaGGDataSource ds = new JavaGGDataSource();

ds.setDriverClass(\"com.mysql.jdbc.Driver\");

ds.setJdbcUrl(\"jdbc:mysql://192.168.1.6:3306/test\"); ds.setPassword(\"ps\"); ds.setUsername(\"name\"); ds.setTimeout(300); // ds.setMaxActive(60);

for (int i = 0; i < 50; i++) { new GG(ds).start(); } }

class GG extends Thread {

JavaGGDataSource ds = null;

long l = System.currentTimeMillis();

public GG(JavaGGDataSource ds) { this.ds = ds; }

static String sql = \"insert into testgg(col1,cols) values (?,?)\"; static String selectsql = \"select * from testgg where id=?\";

public void run() {

for (int t = 0; t < 10000; t++) { Connection conn = null; try {

conn = ds.getConnection();

PreparedStatement ps = conn.prepareStatement(sql); //以下为insert

ps.setInt(1, 1336); ps.setString(2, \"ddd\"); ps.executeUpdate(); //以下为select

ResultSet rs=ps.executeQuery(); while(rs.next()){ rs.getInt(\"id\"); rs.getInt(\"col1\"); }

rs.close(); ps.close();

} catch (SQLException e) {

// TODO Auto-generated catch block e.printStackTrace(); } finally { try {

if (conn != null) {

// ds.offerConnection(conn); conn.close(); }

} catch (Exception e) { e.printStackTrace(); } } }

System.out.println(System.currentTimeMillis() - l); }

测试结果

50个线程select 10000*50次结果

Javaggds 4062156毫秒 连接池有50个连接(和线程数一样)

C3p0 12351657毫秒 连接池有500个连接(和设置的最大连接数一样 ) 50个线程insert 10000*50次结果

Javaggds 3912552734 连接池有50个连接(和线程数一样)

C3p0 6000065141毫秒 连接池有500个连接(和设置的最大连接数一样 )

测试分析:

c3p0是使用同锁或同步块对连接池进行同步的,所以它的时间会控制在一定范围之内

但带来的问题是线程竞争和线程等待。

Javaggds是使用了concurrent包中的无等待算法队列,这个同步是在cpu层面上做的,所以同步的块非常小,大家有兴趣可以看看CAS同步算法。

Hibernate结合

编辑hibernate 加入/修改配置为

com.javagg.datasource.DataSourceConnectionProvider name=\"db.driverClass\">com.mysql.jdbc.Driver name=\"db.jdbcUrl\">jdbc:mysql://192.168.1.6:3306/test name password

com.javagg.datasource.JavaGGDataSource

-1< 无连接数 >

3600< 一小时timeout 单位为秒 >

DataSourceConnectonProvider代码如下:

import java.lang.reflect.Method; import java.sql.Connection; import java.sql.SQLException; import java.util.Iterator; import java.util.Properties;

import javax.sql.DataSource;

import org.apache.commons.beanutils.BeanUtils; import org.hibernate.HibernateException;

import org.hibernate.connection.ConnectionProvider;

public class DataSourceConnectionProvider implements ConnectionProvider {

private final static String BASE_KEY = \"db.\";

private final static String DATASOURCE_KEY = \"db.datasource\";

protected DataSource dataSource;

/*

* (non-Javadoc) * * @see *

org.hibernate.connection.ConnectionProvider#configure(java.util.Properties * ) */

public void configure(Properties props) throws HibernateException { initDataSource(props); }

/*

* (non-Javadoc) * * @see

org.hibernate.connection.ConnectionProvider#getConnection() */

public Connection getConnection() throws SQLException { return dataSource.getConnection(); }

/*

* (non-Javadoc) * * @see *

org.hibernate.connection.ConnectionProvider#closeConnection(java.sql. * Connection) */

public void closeConnection(Connection conn) throws SQLException { if (conn != null) conn.close(); }

/*

* (non-Javadoc) *

* @see org.hibernate.connection.ConnectionProvider#close() */

public void close() throws HibernateException {

if (dataSource != null) try {

Method mClose =

dataSource.getClass().getMethod(\"close\"); mClose.invoke(dataSource); } catch (Exception e) {

throw new HibernateException(e); }

dataSource = null; }

/*

* (non-Javadoc) * * @see *

org.hibernate.connection.ConnectionProvider#supportsAggressiveRelease()

*/

public boolean supportsAggressiveRelease() { return false; }

/**

* Initialize the datasource *

* @param props

* @throws HibernateException */

protected void initDataSource(Properties props) throws HibernateException {

String dataSourceClass = null;

Properties new_props = new Properties(); Iterator keys = props.keySet().iterator(); while (keys.hasNext()) {

String key = (String) keys.next(); if (key.equals(DATASOURCE_KEY)) {

dataSourceClass=props.getProperty(key); } else if (key.startsWith(BASE_KEY)) { String value = props.getProperty(key);

new_props.setProperty(key.substring(BASE_KEY.length()), value);

} }

if (dataSourceClass == null)

throw new HibernateException(\"Property 'db.datasource' no defined.\"); try {

dataSource = (DataSource)

Class.forName(dataSourceClass).newInstance();

BeanUtils.populate(dataSource, new_props); } catch (Exception e) {

throw new HibernateException(e); } } }

接下来我们测试配置有没有成功 代码如下:

public static void main(String args[]) {

Configuration cfg = new Configuration(); cfg.configure();

SessionFactory sf = cfg.buildSessionFactory(); for (int i = 0; i < 100; i++) {

Session sess = sf.openSession();

TestGGBean pc = new TestGGBean(); pc.setCol1(1111);

pc.setCols(\"ddaaaa\"); sess.save(pc); sess.flush(); sess.close(); } }

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- huatuo0.cn 版权所有 湘ICP备2023017654号-2

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务