1 import java.io.IOException;
2 import java.util.HashMap;
3 import java.util.Map;
4
5 import com.ibm.mq.MQC;
6 import com.ibm.mq.MQEnvironment;
7 import com.ibm.mq.MQException;
8 import com.ibm.mq.MQGetMessageOptions;
9 import com.ibm.mq.MQMessage;
10 import com.ibm.mq.MQPutMessageOptions;
11 import com.ibm.mq.MQQueue;
12 import com.ibm.mq.MQQueueManager;
13
14 public class CLIENT_MQ{
15 //定义队列管理器和队列的名称
16 private static final String qmName = "MQ_SERVICE";
//MQ的队列管理器名称 ;
17 //private static final String qName = "MIDDLE_SEND_QUEUE"; //MQ远程队列的名称
18 private static MQQueueManager qMgr;
//队列管理器
19 public static void init(){
20 //设置环境:
21 //MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量,MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用,
22 //因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值.
23 MQEnvironment.hostname="10.172.12.156";
//MQ服务器的IP地址
24 MQEnvironment.channel="SERVICE_JAVA";
//通道类型:服务器连接
25 MQEnvironment.CCSID=1381;
//437 //服务器MQ服务使用的编码1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID)
26 MQEnvironment.port=1456;
//MQ端口
27 try {
28 //定义并初始化队列管理器对象并连接
29 //MQQueueManager可以被多线程共享,但是从MQ获取信息的时候是同步的,任何时候只有一个线程可以和MQ通信。
30 qMgr =
new MQQueueManager(qmName);
31 }
catch (MQException e) {
32 // TODO Auto-generated catch block
33 System.out.println("初使化MQ出错"
);
34 e.printStackTrace();
35 }
36 }
37 /**
38 * 往MQ发送消息
39 * @param message
40 * @return
41 */
42 public static Map<String,Object>
sendMessage(Object message,String qName){
43 Map<String,Object> map=
new HashMap<String,Object>
();
44 try{
45 //设置将要连接的队列属性
46 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
47 //(except for completion code constants and error code constants).
48 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
49 //MQOO_OUTPUT:Open the queue to put messages.
50 /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/
51 //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
52 /*以下选项可适合远程队列与本地队列*/
53 //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用
54 //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
55 int openOptions = MQC.MQOO_OUTPUT |
MQC.MQOO_FAIL_IF_QUIESCING;
56 //连接队列
57 //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.
58 //The inquire and set capabilities are inherited from MQManagedObject.
59 /*关闭了就重新打开*/
60 if(qMgr==
null || !
qMgr.isConnected()){
61 qMgr =
new MQQueueManager(qmName);
62 }
63 MQQueue queue =
qMgr.accessQueue(qName, openOptions);
64 //定义一个简单的消息
65 MQMessage putMessage =
new MQMessage();
66 map.put("messageId"
,putMessage);
67 //String uuid=java.util.UUID.randomUUID().toString();
68 //将数据放入消息缓冲区
69 putMessage.writeObject(message);
70 //设置写入消息的属性(默认属性)
71 MQPutMessageOptions pmo =
new MQPutMessageOptions();
72
73 //将消息写入队列
74 queue.put(putMessage,pmo);
75 map.put("message"
,message.toString());
76 queue.close();
77 }
catch (MQException ex) {
78 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " +
ex.reasonCode);
79 ex.printStackTrace();
80 }
catch (IOException ex) {
81 System.out.println("An error occurred whilst writing to the message buffer: " +
ex);
82 }
catch(Exception ex){
83 ex.printStackTrace();
84 }
finally{
85 try {
86 qMgr.disconnect();
87 }
catch (MQException e) {
88 e.printStackTrace();
89 }
90 }
91 return map;
92 }
93
94
95
96
97
98 /**
99 * 处理完消息回放到MQ队列
100 * @param message
101 * @return
102 */
103 public static Map<String,Object>
sendReplyMessage(Object message,String qName,MQMessage mqMessage){
104 Map<String,Object> map=
new HashMap<String,Object>
();
105 try{
106 //设置将要连接的队列属性
107 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
108 //(except for completion code constants and error code constants).
109 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
110 //MQOO_OUTPUT:Open the queue to put messages.
111 /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/
112 //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
113 /*以下选项可适合远程队列与本地队列*/
114 //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用
115 //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
116 int openOptions = MQC.MQOO_OUTPUT |
MQC.MQOO_FAIL_IF_QUIESCING;
117 //连接队列
118 //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.
119 //The inquire and set capabilities are inherited from MQManagedObject.
120 /*关闭了就重新打开*/
121 if(qMgr==
null || !
qMgr.isConnected()){
122 qMgr =
new MQQueueManager(qmName);
123 }
124 MQQueue queue =
qMgr.accessQueue(qName, openOptions);
125 //定义一个简单的消息
126 MQMessage putMessage =
new MQMessage();
127 putMessage.messageId=
mqMessage.messageId;
128 map.put("messageId"
,putMessage);
129 //String uuid=java.util.UUID.randomUUID().toString();
130 //将数据放入消息缓冲区
131 putMessage.writeObject(message);
132 //设置写入消息的属性(默认属性)
133 MQPutMessageOptions pmo =
new MQPutMessageOptions();
134
135 //将消息写入队列
136 queue.put(putMessage,pmo);
137 map.put("message"
,message.toString());
138 queue.close();
139 }
catch (MQException ex) {
140 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " +
ex.reasonCode);
141 ex.printStackTrace();
142 }
catch (IOException ex) {
143 System.out.println("An error occurred whilst writing to the message buffer: " +
ex);
144 }
catch(Exception ex){
145 ex.printStackTrace();
146 }
finally{
147 try {
148 qMgr.disconnect();
149 }
catch (MQException e) {
150 e.printStackTrace();
151 }
152 }
153 return map;
154 }
155
156
157
158
159 /**
160 * 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有TRY...CATCH,如果是第三方程序调用方法,如果无返回则说明无消息
161 * 第三方可以将该方法放于一个无限循环的while(true){...}之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。
162 * @return
163 */
164 public static String getMessage(String qName,MQMessage mqMessage){
165 String message=""
;
166 try{
167 //设置将要连接的队列属性
168 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface
169 //(except for completion code constants and error code constants).
170 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.
171 //MQOO_OUTPUT:Open the queue to put messages.
172 //int qOptioin = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; 发送时使用
173 //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
174
175 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF |
MQC.MQOO_OUTPUT;
176 MQMessage retrieve =
new MQMessage();
177 //设置取出消息的属性(默认属性)
178 //Set the put message options.(设置放置消息选项)
179 MQGetMessageOptions gmo =
new MQGetMessageOptions();
180
181 gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;
//Get messages under sync point control(在同步点控制下获取消息)
182 gmo.options = gmo.options + MQC.MQGMO_WAIT;
// Wait if no messages on the Queue(如果在队列上没有消息则等待)
183 gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;
// Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败)
184 gmo.waitInterval = 3000 ;
// Sets the time limit for the wait.(设置等待的毫秒时间限制)
185 /*关闭了就重新打开*/
186 if(qMgr==
null || !
qMgr.isConnected()){
187 qMgr =
new MQQueueManager(qmName);
188 }
189 MQQueue queue =
qMgr.accessQueue(qName, openOptions);
190
191 MQMessage retrievedMessage =
new MQMessage();
192 //从队列中取出对应messageId的消息
193 retrieve.messageId =
mqMessage.messageId;
194 // 从队列中取出消息
195 queue.get(retrieve, gmo);
196
197
198 Object obj =
retrieve.readObject();
199 message=obj.toString();
//解决中文乱码问题
200 /*
201
202 //int size = rcvMessage.getMessageLength();
203 //byte[] p = new byte[size];
204 //rcvMessage.readFully(p);
205
206 int len=retrieve.getDataLength();
207 byte[] str = new byte[len];
208 retrieve.readFully(str,0,len);
209 message = new String(str);//readUTF();
210 */
211
212 queue.close();
213 }
catch (MQException ex) {
214 int reason=
ex.reasonCode;
215 if(reason==2033)
//no messages
216 {
217 message="nomessage"
;
218 }
else{
219 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " +
ex.reasonCode);
220 }
221 }
catch (IOException ex) {
222 System.out.println("An error occurred whilst writing to the message buffer: " +
ex);
223 }
catch(Exception ex){
224 ex.printStackTrace();
225 }
finally{
226 try {
227 qMgr.disconnect();
228 }
catch (MQException e) {
229 e.printStackTrace();
230 }
231 }
232 return message;
233 }
234
235
236
237 public static void main(String args[]) {
238 init();
239 Map<String,Object> map =
new HashMap<String,Object>
();
240 map=sendMessage("{name: test get message id 123}","SERVICE_TRANSFER_QUEUE"
);
241 MQMessage mqMessage = (MQMessage)map.get("messageId"
);
242 outSys("传输消息:"
,mqMessage.messageId.toString());
243
244 outSys("接收传输队列:",getMessage("SERVICE_TRANSFER_QUEUE"
,mqMessage));
245 Map<String,Object> reply_map =
new HashMap<String,Object>
();
246 reply_map=sendReplyMessage("{name: local queue 008}","SERVICE_RECEIVE_QUEUE"
,mqMessage);
247 outSys("放入正常队列:",reply_map.get("message"
).toString());
248
249 outSys("接收正常队列:",getMessage("SERVICE_RECEIVE_QUEUE"
,mqMessage));
250
251
252 }
253
254
255
256 public static void outSys(String display,String val){
257 System.out.println(display+
val);
258 }
259
260 }
转载于:https://www.cnblogs.com/yanduanduan/p/7156370.html