(1)根据xml文件来管理线程池的最大最小线程数 (2)对线程池通过Timer定期扫描以防止线程未激活; (3)通过某一个变量(本程序中是freeThreadCount)来得到空闲线程的数目;
一、配置xml(listen.xml)是: <?xml version="1.0" encoding="UTF-8"?> <config> <ConsumeThreadPool> <minPools>10</minPools> <!--线程池最小线程--> <maxPools>100</maxPools> <!--线程池最大线程--> <checkThreadPeriod>5</checkThreadPeriod> <!--检查线程池中线程的周期5分钟--> </ConsumeThreadPool> </config>
二、对于ConsumeThreadPoolPara的javabean:
import java.io.*; public class ConsumeThreadPoolPara implements Serializable{ private int minPools; private int maxPools; private int checkThreadPeriod;
public int getMinPools(){ return minPools; } public int getMaxPools(){ return maxPools; } public int getCheckThreadPeriod(){ return checkThreadPeriod; } public void setMinPools(int minPools){ this.minPools = minPools; } public void setMaxPools(int maxPools){ this.maxPools = maxPools; } public void setCheckThreadPeriod(int checkThreadPeriod){ this.checkThreadPeriod = checkThreadPeriod; } public String toString(){ return minPools+" " + maxPools+" "+checkThreadPeriod; } public ConsumeThreadPoolPara() { } public static void main(String[] args) { ConsumeThreadPoolPara consumeThreadPool1 = new ConsumeThreadPoolPara(); }
}
三、解析xml程序代码(生成ConsumeThreadPoolPara): 使用jdom解析: import org.jdom.*; import org.jdom.input.SAXBuilder; import java.io.*; import java.util.*;
public class ParseConfig { static Hashtable Listens = null; static ConnPara connpara = null; static ConsumeThreadPoolPara consumeThreadPoolPara = null; private static String configxml = "listen.xml";
static{ getConsumeThreadPoolPara(); //得到消费的线程池的参数 }
/** * 装载文档 * @return 返回根结点 * @throws JDOMException */ public static Element loadDocument() throws JDOMException{ SAXBuilder parser = new SAXBuilder(); // 新建立构造器 try { Document document = parser.build(configxml); Element root = document.getRootElement(); return root; }catch(JDOMException e){ logger.error("listen.xml文件格式非法!"); throw new JDOMException(); } }
public static ConsumeThreadPoolPara getConsumeThreadPoolPara(){ if(consumeThreadPoolPara ==null){ try { Element root = loadDocument(); Element consumeThreadPool = root.getChild("ConsumeThreadPool"); if (consumeThreadPool != null) { //代表有数据库配置 consumeThreadPoolPara = new ConsumeThreadPoolPara(); Element minPools = consumeThreadPool.getChild("minPools"); consumeThreadPoolPara.setMinPools(Integer.parseInt(minPools.getTextTrim())); Element maxPools = consumeThreadPool.getChild("maxPools"); consumeThreadPoolPara.setMaxPools(Integer.parseInt(maxPools.getTextTrim())); Element checkThreadPeriod = consumeThreadPool.getChild("checkThreadPeriod"); consumeThreadPoolPara.setCheckThreadPeriod(Integer.parseInt(checkThreadPeriod.getTextTrim())); } } catch (JDOMException e) { } } return consumeThreadPoolPara; } }
四、线程池源代码: import java.util.*;
/** * <p>Title: 线程池</p> * <p>Description: 采集消费模块</p> * <p>Copyright: Copyright (c) 2004</p> * <p>Company: </p> * @author 张荣斌 * @version 1.0 */
public class ThreadPool { private static int minPools = 10; //最小连接池数目 private static int maxPools = 100; //最大连接池数目 private static int checkThreadPeriod = 5; //检查连接池的周期 ArrayList m_ThreadList; //工作线程列表 LinkedList m_RunList = null; //工作任务列表 int totalThread = 0; //总线程数 static int freeThreadCount = 0; //未被使用的线程数目 private java.util.Timer timer = null; //定时器 static Object o = new Object();
static{ //先初始化线程池的参数 ConsumeThreadPoolPara consumeThreadPoolPara = ParseConfig.getConsumeThreadPoolPara(); if(consumeThreadPoolPara!=null){ minPools = consumeThreadPoolPara.getMinPools(); maxPools = consumeThreadPoolPara.getMaxPools(); checkThreadPeriod = consumeThreadPoolPara.getCheckThreadPeriod()*60*1000; } } public void setMinPools(int minPools){ this.minPools = minPools; } public void setMaxPools(int maxPools){ this.maxPools = maxPools; } public void setCheckThreadPeriod(int checkThreadPeriod){ this.checkThreadPeriod = checkThreadPeriod; } public ThreadPool() {
m_ThreadList=new ArrayList(); m_RunList=new LinkedList(); for(int i=0;i<minPools;i++){ WorkerThread temp=new WorkerThread(); totalThread = totalThread + 1; m_ThreadList.add(temp); temp.start(); try{ Thread.sleep(100); }catch(Exception e){ } } timer = new Timer(true); //启动定时器 timer.schedule(new CheckThreadTask(this),0,checkThreadPeriod); }
/** * 当有一个工作来的时候启动线程池的线程 * 1.当空闲线程数为0的时候,看总线程是否小于最大线程池的数目,就new一个新的线程,否则sleep,直到有空闲线程为止; * 2.当空闲线程不为0,则将任务丢给空闲线程去完成 * @param work */ public synchronized void run(String work) { if (freeThreadCount == 0) { if(totalThread<maxPools){ WorkerThread temp = new WorkerThread(); totalThread = totalThread + 1; m_ThreadList.add(temp); temp.start(); synchronized(m_RunList){ m_RunList.add(work); m_RunList.notify(); } }else{ while (freeThreadCount == 0) { try { Thread.sleep(200); } catch (InterruptedException e) { } } synchronized(m_RunList){ m_RunList.add(work); m_RunList.notify(); } } } else { synchronized(m_RunList){ m_RunList.add(work); m_RunList.notify(); } } }
/** * 检查所有的线程的有效性 */ public synchronized void checkAllThreads() {
Iterator lThreadIterator = m_ThreadList.iterator();
while (lThreadIterator.hasNext()) { //逐个遍厉 WorkerThread lTestThread = (WorkerThread) lThreadIterator.next();
if (! (lTestThread.isAlive())) { //如果处在非活动状态时 lTestThread = new WorkerThread(); //重新生成个线程 lTestThread.start(); //启动 } } }
/** * 打印调试信息 */ public void printDebugInfo(){ System.out.println("totalThread="+totalThread); System.out.println("m_ThreadList.size()="+m_ThreadList.size()); }
/** * * <p>Title: 工作线程类</p> * @author 张荣斌 * @version 1.0 */ class WorkerThread extends Thread{ boolean running = true; String work;
public void run(){ while(running){ synchronized(o){ freeThreadCount++; } synchronized(m_RunList){ while(m_RunList.size() == 0){ try{ m_RunList.wait(); if(!running) return; }catch(InterruptedException e){ } } synchronized(o){ freeThreadCount--; } work = (String)m_RunList.removeLast(); if(work==null) return; }
// 得到了work 进行工作,这里work可以换成自己的工作类 } } }
} /** * * <p>Title: 定时器调动的任务</p> * @author 张荣斌 * @version 1.0 */ class CheckThreadTask extends TimerTask{ private static boolean isRunning = false; private ThreadPool pool;
public CheckThreadTask(ThreadPool pool){ this.pool = pool; } public void run() { if (!isRunning) { isRunning = true; pool.checkAllThreads(); isRunning = false; } } } 
|