统一对象消息编程详解——线程与异步处理

mac2022-07-05  32

   对于java中的异步处理是用线程方式实现的,所以所谓异步处理或异步调用就是开启新的线程来执行一个过程。对象消息编程将异步(线程)处理变得异常简单。下面看我们是如何进行异步或线程调用。

  对于消息编程,我们执行一个消息时,首先创建一个消息  

  TLMsg msg =New TLMsg()  或 TLMsg msg =createMsg()(该方法包含在基本对象类TLBaseModule中,可以直接使用)

  msg.setAction("action")  //设置执行的消息方法

 然后给目标对象objcet发送消息来执行: putMsg(objcet,msg);

以上这是同步执行一个消息,发送方要等消息执行完毕后才能继续进行。现在我们希望执行为异步方式,既发送完消息后立刻继续执行下一步,而不必等候消息结果返回。那么在发送前对消息设置一个异步标志即可:

 msg.setWaitFlag(false) 

 这时执行过程将自动开启一个线程,目标对象objec在开启的线程中执行消息。我们通过一标志位的设置就很简单的将一个同步过程改为异步执行了。

由于内部机制还是建立了一个线程Thread,对于线程类的各种设置我们可以通过消息参数来控制:

msg.setParam(SESSIONDEAMON,true)  ——线程为deamon;

msg.setParam(SESSIONJOIN,true)     ——主线程join子线程,既线程类Thread的join方法;

msg.setParam(JOINTIME,10)     ——主线程join子线程的时间;

msg.setParam(EXCEPTIONHANDLER,hander)     ——设置线程异常处理句柄;

以上参数为传统线程类的控制项目,我们通过对消息设置这些参数就可以实现线程类的控制。

但对于线程类Thread如果我们希望获取它的返回结果,那对于传统编程就有些麻烦,我们需要传入一个回调类,并且要硬写在异步执行方法的代码里。对于消息编程则事情变得简单,我们可以对消息设置参数:

msg.setParam(TASKRESULTFOR,  handerObj)   ;

msg.setParam(TASKRESULTACTION,  “handerAction”)   ;

这里TASKRESULTFOR 是线程将结果返回的处理类 ,TASKRESULTACTION 是处理的方法。这样消息执行完毕后自动将结果反馈给处理类。

以上的异步执行是启动一个新线程来处理消息,线程的开启比较消耗时间,很多时候我们希望利用线程池,通过线程的复用来节省时间。如果要利用线程池,则设置消息线程池标志:

msg.setParam(INTHREADPOOL, true)   

这时将自动将消息放到一个线程池中,通过线程池来执行一个消息。

写到这就明白了,如果我们希望异步执行一个消息,并且要放到线程池里去执行,那么简单设置两个标志即可:

msg.setWaitFlag(false).setParam(INTHREADPOOL, true) 

这是不是非常的简单?我们不在需要繁琐的每次写代码了。

对于如何利用线程来执行消息,其实是将执行消息发送到一个线程池模块TLThreadPool中,在线程池模块里,通过线程池来执行消息。TLThreadPool 利用java的线程是管理类Executors来产生线程。因此通过配置,可以产生不同的线程类别,如固定、唯一线程池等。

现在我们看使用案例:

public class TestTHreadPool extends TLBaseModule { public TestTHreadPool(String name, TLObjectFactory moduleFactory) { super(name, moduleFactory); ifLog = false; } public static void main(String[] args) { String configdir = "/tlobjdemo/"; String path = TestTHreadPool.class.getResource(configdir).getPath(); System.setProperty("log4j.configurationFile", path + "log4j2.xml"); TLObjectFactory myfactory = TLObjectFactory.getInstance(configdir, "moduleFactory_config.xml"); myfactory.boot(); TestTHreadPool thp = new TestTHreadPool("test", myfactory); thp.run(); } private void run() { TLMsg msg = createMsg().setAction("threadmsg"); putMsg(this, msg); } @Override protected TLMsg checkMsgAction(Object fromWho, TLMsg msg) { TLMsg returnMsg = null; switch (msg.getAction()) { case "threadmsg": threadmsg(fromWho, msg); break; default: } return returnMsg; } private void threadmsg(Object fromWho, TLMsg msg) { String threadName = Thread.currentThread().getName(); System.out.println("threadName: " + threadName); } @Override protected void init() { } }

  上面这是一个简单的程序,入口方法main首先实例化模块工厂,然后实例化模块,执行run方法,在run方法中,构建一个消息msg并执行,该msg的action为“threadmsg”。在threadmsg方法中,简单打印线程名称。现在我们看执行结果:

threadName: main Process finished with exit code 0

消息在主线程main中执行,这是一个同步执行。

现在我们将msg设置为异步消息:

private void run() { TLMsg msg = createMsg().setAction("threadmsg").setWaitFlag(false); putMsg(this, msg); }

然后运行程序:

threadName: Thread-0 Process finished with exit code 0

现在我们看到仅仅设置了消息一个异步标志位,方法threadmsg则在新的线程中执行。

我们再更改代码,异步执行消息三次:

private void run() { TLMsg msg = createMsg().setAction("threadmsg").setWaitFlag(false); putMsg(this, msg); putMsg(this, msg.setWaitFlag(false)); putMsg(this, msg.setWaitFlag(false)); }

再看执行结果:

threadName: Thread-1 threadName: Thread-0 threadName: Thread-2 Process finished with exit code 0

分别在三个不同的线程中执行消息(执行方法threadmsg)。

每次启动新的线程比较消耗时间,现在我们希望在线程池中执行消息,继续更改代码,设定消息参数INTHREADPOOL为true,表明该消息在线程池中执行:

private void run() { TLMsg msg = createMsg().setAction("threadmsg").setWaitFlag(false).setParam(INTHREADPOOL,true); putMsg(this, msg); putMsg(this, msg.setWaitFlag(false).setParam(INTHREADPOOL,true)); putMsg(this, msg.setWaitFlag(false).setParam(INTHREADPOOL,true)); }

 看运行结果: 

2019-10-16 16??32,996 (TLLog.java:179) INFO main:threadPool Tag:start Content: module: threadPool start configFile:/tlobjdemo/threadPool_config.xml 2019-10-16 16??33,027 (TLLog.java:179) INFO main:threadPool Tag:getMsg Content:run action:execute 2019-10-16 16??33,032 (TLLog.java:179) INFO main:threadPool Tag:default Content:start thread task,source:test action:threadmsg 2019-10-16 16??33,033 (TLLog.java:179) INFO main:threadPool Tag:getMsg Content:run action:execute 2019-10-16 16??33,035 (TLLog.java:179) INFO main:threadPool Tag:default Content:start thread task,source:test action:threadmsg 2019-10-16 16??33,036 (TLLog.java:179) INFO main:threadPool Tag:getMsg Content:run action:execute 2019-10-16 16??33,037 (TLLog.java:179) INFO main:threadPool Tag:default Content:start thread task,source:test action:threadmsg threadName: pool-3-thread-1 threadName: pool-3-thread-3 threadName: pool-3-thread-2

我们看到程序自动启动线程池,并在通过线程池执行了三个消息。

现在我们演示如何得到异步处理结果。为此我们稍微修改代码,在子线程执行的方法threadmsg中我们不在打印线程名称,而是返回子线程名称。我们另外建立一个方法threadResult,用来取得异步处理的结果并打印出。

private void run() { TLMsg msg = createMsg().setAction("threadmsg"); putMsg(this, msg.setWaitFlag(false).setParam(INTHREADPOOL,true).setParam(TASKRESULTFOR,this).setParam(TASKRESULTACTION,"threadResult")); putMsg(this, msg.setWaitFlag(false).setParam(INTHREADPOOL,true).setParam(TASKRESULTFOR,this).setParam(TASKRESULTACTION,"threadResult")); putMsg(this, msg.setWaitFlag(false).setParam(INTHREADPOOL,true).setParam(TASKRESULTFOR,this).setParam(TASKRESULTACTION,"threadResult")); System.out.println("异步消息发送完毕"); moduleFactory.shutdown(); } @Override protected TLMsg checkMsgAction(Object fromWho, TLMsg msg) { TLMsg returnMsg = null; switch (msg.getAction()) { case "threadmsg": returnMsg =threadmsg(fromWho, msg); break; case "threadResult": threadResult(fromWho, msg); break; default: } return returnMsg; } private void threadResult(Object fromWho, TLMsg msg) { String threadName = (String) msg.getParam("threadName"); System.out.println("Result from thread,threadName: " + threadName); } private TLMsg threadmsg(Object fromWho, TLMsg msg) { String threadName = Thread.currentThread().getName(); try { sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return createMsg().setParam("threadName",threadName); }

对于执行的消息msg,我们现在增加了两个参数。参数TASKRESULTFOR设定异步处理返回结果的处理模块,本例设定this指定为当前模块。异步处理返回的结果也是消息,参数TASKRESULTACTION设定返回消息的action项,当前参数设定为"threadResult",该消息对应的处理方法为threadResult,对返回结果打印输出。现在看运行情况:

异步消息发送完毕 Result from thread,threadName: pool-3-thread-3 Result from thread,threadName: pool-3-thread-1 Result from thread,threadName: pool-3-thread-2 Process finished with exit code 0

效果和我们预期一致,三个消息分别在线程中执行并返回了结果。

在将系统将消息放入线程池执行时,我们上面没有指定线程池名字(对于消息编程,每个模块都有名字,通过名字来引用模块),采用是默认线程池threadPool。我们也可以设定参数THREADPOOLNAME来自定义线程池:

msg..setParam(THREADPOOLNAME,"mythreadPool");//设定线程池为 mythreadPool

mythreadPool为别名,指向线程池模块TLThreadPool。

在之前的线程池配置中,我们设置为固定数量线程池类型(fixed类型)。三个消息分别在三个线程中执行。我们现在更改线程池类型为单一类型(single),既线程池只一个线程,这时各任务顺序执行。现在我们在配置文件中更改线程池配置:

<module name="mythreadPool" proxyModule="threadPool" poolType="single" poolSize="200" ifLog="false"/>

配置中poolType项指定线程池类型,poolSize为固定数量线程池设定项,对于单一线程池无效。现在我们看没有改动代码而仅仅更改了配置后,程序运行情况:

异步消息发送完毕 Result from thread,threadName: pool-2-thread-1 Result from thread,threadName: pool-2-thread-1 Result from thread,threadName: pool-2-thread-1 Process finished with exit code 0

  消息还是在线程池中执行,但是与之前不同,现在使用同一个线程来顺序执行消息。

 线程池模块TLThreadPool是利用java现有的线程池类,根据配置中的poolType参数开启不同类型的线程池,其初始化代码:

@Override protected void init() { switch (poolType) { case "fixed": threadPool = Executors.newFixedThreadPool(poolSize); break; case "cached": threadPool = Executors.newCachedThreadPool(); break; case "single": threadPool = Executors.newSingleThreadExecutor(); break; default: threadPool = Executors.newFixedThreadPool(poolSize); } }

 

 至此看到,我们的消息编程对开启线程、执行异步操作、取得异步结果,仅仅是通过设定消息的参数就实现,而无需复杂的代码,编程非常简单。

 

 

 

最新回复(0)