基本思路
建立一个容器
每次到这个容器里得到连接,如果为空则建立一个新连接。 当连接使用完后归还给这个容器
这里就有二个难点
1. 容器必需是同步的,线程安全的。 2. 连接怎归还连接池
方案:
针对这二个难点,我们分别提出了二个解决方法
1.使用ConcurrentLinkedQueue实现先进先出队列
ConcurrentLinkedQueue无界线程安全队列介绍
这个类在java.util.concurrent包中,我们来看看官方是怎描述这个类的 一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素.此实现采用了有效的“无等待 (wait-free)”算法 2.动态代理实现连接归还连接池
大家也可以参考刘冬在IBM发表的文章
http://www.ibm.com/developerworks/cn/java/l-connpoolproxy/
接下来我们来看看整体代码
import java.io.PrintWriter;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.sql.Connection; import java.sql.Driver;
import java.sql.SQLException; import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock;
import javax.sql.DataSource;
public class JavaGGDataSource implements DataSource { //连接队列
private ConcurrentLinkedQueue<_Connection> connQueue = new ConcurrentLinkedQueue<_Connection>(); //存放所有连接容器
private List<_Connection> conns = new ArrayList<_Connection>(); private Driver driver = null;
private String jdbcUrl = null; private String user = null; private String password = null;
private int maxActive = -1;// -1为不连接数 private String driverClass = null;
private int timeout = 1000 * 60 * 60 * 4;// 默认为4小时,即4小时没有任何sql操作就把所有连接重新建立连接
private AtomicLong lastCheckout = new AtomicLong(System.currentTimeMillis());
private AtomicInteger connCount = new AtomicInteger(); //线程锁,主要用于新建连接和清空连接时
private ReentrantLock lock = new ReentrantLock();
public void closeAllConnection() { }
/**
* 归还连接给连接池 *
* @param conn *@date 2009-8-13 *@author eric.chan */
public void offerConnection(_Connection conn) { connQueue.offer(conn); }
@Override
public Connection getConnection() throws SQLException { return getConnection(user, password); }
/**
* 从池中得到连接,如果池中没有连接,则建立新的sql连接 *
* @param username * @param password * @author eric.chan */
@Override
public Connection getConnection(String username, String password) throws SQLException { checkTimeout();
_Connection conn = connQueue.poll(); if (conn == null) {
if (maxActive > 0 && connCount.get() >= maxActive) { for (;;) {// 采用自旋方法 从已满的池中得到一个连接 conn = connQueue.poll(); if (conn != null) break; else
continue; } }
lock.lock(); try {
if (maxActive > 0 && connCount.get() >= maxActive) { // 处理并发问题
return getConnection(username, password); }
Properties info = new Properties(); info.put(\"user\ info.put(\"password\
Connection conn1 = loadDriver().connect(jdbcUrl, info); conn = new _Connection(conn1, this);
int c = connCount.incrementAndGet();// 当前连接数加1 conns.add(conn);
System.out.println(\"info : init no. \" + c + \" connectioned\");
} finally {
lock.unlock();
}
} }
lastCheckout.getAndSet(System.currentTimeMillis()); return conn.getConnection();
/**
* 检查最后一次的连接时间 *
* @throws SQLException *@date 2009-8-13 *@author eric.chan */
private void checkTimeout() throws SQLException { long now = System.currentTimeMillis(); long lt = lastCheckout.get(); if ((now - lt) > timeout) { _Connection conn = null; lock.lock(); try {
if(connCount.get()==0)return;
while ((conn = connQueue.poll()) != null) {
System.out.println(\"connection \" + conn + \" close \"); conn.close(); conn = null; }
for(_Connection con:conns){ con.close(); }
conns.clear();
System.out.println(\"info : reset all connections\"); connCount.getAndSet(0);// 重置连接数计数器
lastCheckout.getAndSet(System.currentTimeMillis()); } finally {
lock.unlock(); } } }
/** *
* @return
*@date 2009-8-13 *@author eric.chan
*/
private Driver loadDriver() { if (driver == null) { try {
driver = (Driver)
Class.forName(driverClass).newInstance();
} catch (ClassNotFoundException e) {
System.out.println(\"error : can not find driver class \" + driverClass);
} catch (Exception e) { e.printStackTrace(); } }
return driver; }
@Override
public PrintWriter getLogWriter() throws SQLException { return null; }
@Override
public int getLoginTimeout() throws SQLException { return 0; }
@Override
public void setLogWriter(PrintWriter out) throws SQLException { }
@Override
public void setLoginTimeout(int seconds) throws SQLException { }
@Override
public boolean isWrapperFor(Class iface) throws SQLException { throw new SQLException(\"no Implemented isWrapperFor method\"); }
@Override
public T unwrap(Class iface) throws SQLException {
throw new SQLException(\"no Implemented unwrap method\"); }
public String getJdbcUrl() { return jdbcUrl; }
public void setJdbcUrl(String jdbcUrl) { this.jdbcUrl = jdbcUrl; }
public String getUsername() { return user; }
public void setUsername(String user) { this.user = user; }
public String getPassword() { return password; }
public void setPassword(String password) { this.password = password; }
public String getDriverClass() { return driverClass; }
public void setDriverClass(String driverClass) { this.driverClass = driverClass; }
public int getTimeout() { return timeout; }
public void setTimeout(int timeout) { this.timeout = timeout * 1000; }
public void setMaxActive(int maxActive) { this.maxActive = maxActive; }
public int getMaxActive() { return maxActive; } } /**
* 数据连接的自封装 ,是java.sql.Connection的一个钩子,主要是处理close方法 *
* @author eric * */
class _Connection implements InvocationHandler {
private final static String CLOSE_METHOD_NAME = \"close\";
private final Connection conn; private final JavaGGDataSource ds;
_Connection(Connection conn, JavaGGDataSource ds) { this.conn = conn; this.ds = ds; }
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object obj = null;
// 判断是否调用了close的方法,如果调用close方法则把连接置为无用状态 if (CLOSE_METHOD_NAME.equals(method.getName())) { // 归还连接给连接池
ds.offerConnection(this); } else {
// 运行非close的方法
obj = method.invoke(conn, args); }
return obj; }
public Connection getConnection() {
// 返回数据库连接conn的接管类,以便截住close方法 Connection conn2 = (Connection)
Proxy.newProxyInstance(conn.getClass().getClassLoader(), new Class[] { Connection.class }, this); return conn2; }
public void close() throws SQLException {
// 调用真正的close方法,一但调用此方法就直接关闭连接 if(conn!=null&&!conn.isClosed()) conn.close(); } }
_Connection类是一个私有类,主要实现一个对Connection动态代理的功能(有点象windows的钩子)
说白了就是实现调用connection.close方法时我们映射到另一个方法上. 呵呵,是不是好简单呢,代码没有多复杂。
这里有一个问题要说明一吓:如果设置的maxActive值小于当前线程总数,那么当并发非常大时会出现资源争夺情况,一吓子cpu就会提高不小,所以建议设为无,或大于线程总数的值。
接下来我们测试测试
开五十个线程,对同一个表进行select/insert 10000次操作,每次select/insert一条记录
代码如下:
public static void main(String[] args) {
JavaGGDataSource ds = new JavaGGDataSource();
ds.setDriverClass(\"com.mysql.jdbc.Driver\");
ds.setJdbcUrl(\"jdbc:mysql://192.168.1.6:3306/test\"); ds.setPassword(\"ps\"); ds.setUsername(\"name\"); ds.setTimeout(300); // ds.setMaxActive(60);
for (int i = 0; i < 50; i++) { new GG(ds).start(); } }
class GG extends Thread {
JavaGGDataSource ds = null;
long l = System.currentTimeMillis();
public GG(JavaGGDataSource ds) { this.ds = ds; }
static String sql = \"insert into testgg(col1,cols) values (?,?)\"; static String selectsql = \"select * from testgg where id=?\";
public void run() {
for (int t = 0; t < 10000; t++) { Connection conn = null; try {
conn = ds.getConnection();
PreparedStatement ps = conn.prepareStatement(sql); //以下为insert
ps.setInt(1, 1336); ps.setString(2, \"ddd\"); ps.executeUpdate(); //以下为select
ResultSet rs=ps.executeQuery(); while(rs.next()){ rs.getInt(\"id\"); rs.getInt(\"col1\"); }
rs.close(); ps.close();
} catch (SQLException e) {
// TODO Auto-generated catch block e.printStackTrace(); } finally { try {
if (conn != null) {
// ds.offerConnection(conn); conn.close(); }
} catch (Exception e) { e.printStackTrace(); } } }
System.out.println(System.currentTimeMillis() - l); }
测试结果
50个线程select 10000*50次结果
Javaggds 4062156毫秒 连接池有50个连接(和线程数一样)
C3p0 12351657毫秒 连接池有500个连接(和设置的最大连接数一样 ) 50个线程insert 10000*50次结果
Javaggds 3912552734 连接池有50个连接(和线程数一样)
C3p0 6000065141毫秒 连接池有500个连接(和设置的最大连接数一样 )
测试分析:
c3p0是使用同锁或同步块对连接池进行同步的,所以它的时间会控制在一定范围之内
但带来的问题是线程竞争和线程等待。
Javaggds是使用了concurrent包中的无等待算法队列,这个同步是在cpu层面上做的,所以同步的块非常小,大家有兴趣可以看看CAS同步算法。
Hibernate结合
编辑hibernate 加入/修改配置为
com.javagg.datasource.DataSourceConnectionProvider com.javagg.datasource.JavaGGDataSource DataSourceConnectonProvider代码如下: import java.lang.reflect.Method; import java.sql.Connection; import java.sql.SQLException; import java.util.Iterator; import java.util.Properties; import javax.sql.DataSource; import org.apache.commons.beanutils.BeanUtils; import org.hibernate.HibernateException; import org.hibernate.connection.ConnectionProvider; public class DataSourceConnectionProvider implements ConnectionProvider { private final static String BASE_KEY = \"db.\"; private final static String DATASOURCE_KEY = \"db.datasource\"; protected DataSource dataSource; /* * (non-Javadoc) * * @see * org.hibernate.connection.ConnectionProvider#configure(java.util.Properties * ) */ public void configure(Properties props) throws HibernateException { initDataSource(props); } /* * (non-Javadoc) * * @see org.hibernate.connection.ConnectionProvider#getConnection() */ public Connection getConnection() throws SQLException { return dataSource.getConnection(); } /* * (non-Javadoc) * * @see * org.hibernate.connection.ConnectionProvider#closeConnection(java.sql. * Connection) */ public void closeConnection(Connection conn) throws SQLException { if (conn != null) conn.close(); } /* * (non-Javadoc) * * @see org.hibernate.connection.ConnectionProvider#close() */ public void close() throws HibernateException { if (dataSource != null) try { Method mClose = dataSource.getClass().getMethod(\"close\"); mClose.invoke(dataSource); } catch (Exception e) { throw new HibernateException(e); } dataSource = null; } /* * (non-Javadoc) * * @see * org.hibernate.connection.ConnectionProvider#supportsAggressiveRelease() */ public boolean supportsAggressiveRelease() { return false; } /** * Initialize the datasource * * @param props * @throws HibernateException */ protected void initDataSource(Properties props) throws HibernateException { String dataSourceClass = null; Properties new_props = new Properties(); Iterator keys = props.keySet().iterator(); while (keys.hasNext()) { String key = (String) keys.next(); if (key.equals(DATASOURCE_KEY)) { dataSourceClass=props.getProperty(key); } else if (key.startsWith(BASE_KEY)) { String value = props.getProperty(key); new_props.setProperty(key.substring(BASE_KEY.length()), value); } } if (dataSourceClass == null) throw new HibernateException(\"Property 'db.datasource' no defined.\"); try { dataSource = (DataSource) Class.forName(dataSourceClass).newInstance(); BeanUtils.populate(dataSource, new_props); } catch (Exception e) { throw new HibernateException(e); } } } 接下来我们测试配置有没有成功 代码如下: public static void main(String args[]) { Configuration cfg = new Configuration(); cfg.configure(); SessionFactory sf = cfg.buildSessionFactory(); for (int i = 0; i < 100; i++) { Session sess = sf.openSession(); TestGGBean pc = new TestGGBean(); pc.setCol1(1111); pc.setCols(\"ddaaaa\"); sess.save(pc); sess.flush(); sess.close(); } }
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- huatuo0.cn 版权所有 湘ICP备2023017654号-2
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务