注意连接队列服务器时,参数必须和服务器配置一致
private string queue;//队列名
private bool durable;//持久化
private bool exclusive;//独占
private bool autoDelete;//自动删除
默认帐号guest不能远程。
默认访问队列端口是5672,后台网站端口默认是15672。
1、实现发送和接收,类RabbitMQServerT
using System; using MQServer; using RabbitMQ.Client; using System.Text; using System.Configuration; using RabbitMQ.Client.Events; using Newtonsoft.Json; namespace MQServer { /// <summary> /// RabbitMQ消息队列类 /// </summary> public class RabbitMQServerT { protected readonly Action<string, object> receive;//接收回调 private object penetrate;//接收回调透传参数 private string queue;//队列名 private bool durable;//持久化 private bool exclusive;//独占 private bool autoDelete;//自动删除 private bool isBeginInvoke;//接收后业务是否异步,异步的话消息可能在确认前被其他线程读走,造成重复读。//不异步就阻塞。//异步请独占 //接收消息对象 private IConnection connection; private IModel channel; public bool IsReceive; private ConnectionFactory factory; private RabbitMQServerT() { } /// <summary> /// 使用默认配置参数 /// </summary> /// <param name="_receive">消费事件,空则不消费</param> /// <param name="_queue">消息路径最后一层名字,可用于区分业务</param> /// <param name="_penetrate">接收回调透传参数</param> public RabbitMQServerT(Action<string, object> _receive, string _queue = @"hello", object _penetrate = null) { queue = _queue; receive = _receive; penetrate = _penetrate; isBeginInvoke = false; durable = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_durable"].ToString());// exclusive = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_exclusive"].ToString());// autoDelete = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_autoDelete"].ToString());// factory = new ConnectionFactory(); factory.HostName = ConfigurationManager.AppSettings["RabbitMQHostName"];//RabbitMQ服务器 factory.UserName = ConfigurationManager.AppSettings["RabbitMQUserName"];//用户名 factory.Password = ConfigurationManager.AppSettings["RabbitMQPassword"];//密码 factory.Port = int.Parse(ConfigurationManager.AppSettings["RabbitMQPort"].ToString());// if (!string.IsNullOrWhiteSpace(ConfigurationManager.AppSettings["RabbitMQVirtualHost"])) { factory.VirtualHost = ConfigurationManager.AppSettings["RabbitMQVirtualHost"];// } } /// <summary> /// 使用手动参数 /// </summary> /// <param name="_receive">消费事件,空则不消费</param> /// <param name="_queue">消息路径最后一层名字,可用于区分业务</param> /// <param name="_penetrate">接收回调透传参数</param> /// <param name="factory">连接队列服务器</param> /// <param name="durable">持久化</param> /// <param name="exclusive">独占</param> /// <param name="autoDelete">自动删除</param> /// <param name="isBeginInvoke">接收是否异步//异步请独占,否则异常</param> public RabbitMQServerT(Action<string, object> _receive, string _queue, object _penetrate, ConnectionFactory factory ,bool durable,bool exclusive, bool autoDelete,bool isBeginInvoke) { queue = _queue; receive = _receive; penetrate = _penetrate; this.factory = factory; this.durable = durable; this.exclusive = exclusive; this.autoDelete = autoDelete; this.isBeginInvoke = isBeginInvoke; //异步请独占,不然会重复读 if (isBeginInvoke == true && exclusive == false) { throw new Exception("接收消息队列对象RabbitMQServerT参数isBeginInvoke=true异步执行接收业务,如果要异步执行业务,请独占该消息exclusive=true,否则会被其他线程重复读取。"); } } /// <summary> /// 发送消息 /// </summary> /// <param name="message"></param> public void Send(string message) { //发送消息队列 try { using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);//创建一个消息队列 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queue, null, body); //开始传递 //TLogHelper.Info(message, "RabbitMQServerTSend");//发送的内容写进TXT } } } catch (Exception ex) { TLogHelper.Error(ex.Message, "发送消息队列异常RabbitMQServerTSend:\n" + message); } } /// <summary> /// 发送消息 /// </summary> /// <param name="message"></param> public void Send(RabbitMQMsgModel model) { //发送消息队列 string message = JsonConvert.SerializeObject(model); Send(message); } /// <summary> /// 进行接收消息队列 /// </summary> public void Receive() { if (receive == null) { return; } IsReceive = true; try { connection = factory.CreateConnection(); channel = connection.CreateModel(); channel.QueueDeclare(queue, durable, exclusive, autoDelete, null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { try { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); //接收后业务 if (isBeginInvoke) { receive?.BeginInvoke(message, penetrate,(e)=>{ //确认消息 channel.BasicAck(ea.DeliveryTag, false); },null); } else { receive?.Invoke(message, penetrate); //确认消息 channel.BasicAck(ea.DeliveryTag, false); } } catch (Exception ex) { TLogHelper.Error(ex.Message, "接收消息队列业务异常Received:"+queue); } finally { //再次生成接收 Receive(); } }; channel.BasicConsume(queue, true, consumer); } catch (Exception ex) { TLogHelper.Error(ex.Message, "接收消息队列异常Receive"); } } /// <summary> /// 取消接收 /// </summary> public void EndReceive() { IsReceive=false; channel.Dispose(); connection.Dispose(); } } }
消息格式RabbitMQMsgModel
namespace MQServer { public class RabbitMQMsgModel { /// <summary> /// 业务名 /// </summary> public string BLLName { get; set; } /// <summary> /// 业务数据 /// </summary> public object Data { get; set; } } }
2、BLL实现消息业务
BaseMQBLL
using System; namespace MQServer { /// <summary> /// 使用队列业务基类 /// </summary> public abstract class BaseMQBLL : IDisposable { public bool IsReceive { get { return MQ.IsReceive; } } protected readonly RabbitMQServerT MQ; private BaseMQBLL(){ } protected BaseMQBLL(string queue,object _penetrate) { MQ = new MQServer.RabbitMQServerT((string source, object o) => { try { ReceiveBack(source, _penetrate); ////test //throw new Exception("测试消息异常"); } catch (Exception) { throw; } }, queue, _penetrate: null); } /// <summary> /// 开启接收 /// </summary> public void Receive() { MQ.Receive(); } /// <summary> /// 关闭接收 /// </summary> public void EndReceive() { MQ.EndReceive(); } /// <summary> /// 声明必须重写的接收回调 /// </summary> /// <param name="source"></param> /// <param name="receiveO"></param> protected abstract void ReceiveBack(string source, object receiveO); public void Dispose() { EndReceive(); } } }
MQTestHello: BaseMQBLL
using MQServer; using Newtonsoft.Json; namespace BLL { public class MQTestHello : BaseMQBLL { public MQTestHello() : base("hello", null) { } /// <summary> /// 重写接收回调 /// </summary> /// <param name="source"></param> /// <param name="receiveO"></param> protected override void ReceiveBack(string source, object receiveO) { //解析source,根据source中的BLLName方法名,执行不同业务 RabbitMQMsgModel model = JsonConvert.DeserializeObject<RabbitMQMsgModel>(source); switch (model.BLLName) { case "Hello": Hello(model.Data); break; default: break; } } /// <summary> /// 发送Hello消息 /// </summary> public void SendHello(string msg) { MQ.Send(new RabbitMQMsgModel() { BLLName = "Hello", Data = msg }); } /// <summary> /// 接收到Hello消息回调 /// </summary> /// <param name="data"></param> public void Hello(object data) { TLogHelper.Info(JsonConvert.SerializeObject(data), "读取消息在MQTestHello"); } } }
3、记录日志
TLogHelper
using Newtonsoft.Json; using System; using System.IO; using System.Messaging; using System.Text; namespace MQServer.Log { public class TLogHelper { public static object _lock = new object(); public static void MQ(Message myMessage, string detail = "") { string msg = JsonConvert.SerializeObject(myMessage.Body); Write(msg, detail, "MessageQueue"); } public static void Info(string msg, string detail = "") { Write(msg, detail, "Info"); } public static void Info(object msg, string detail = "") { Write(JsonConvert.SerializeObject(msg), detail, "Info"); } public static void Error(string msg, string detail = "") { Write(msg, detail, "Error"); } private static void Write(string msg,string detail="", string title = "Info") { DateTime now = DateTime.Now; string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"]; if (!Directory.Exists(logPath)) { Directory.CreateDirectory(logPath); } logPath += now.ToString("yyyyMMdd") + ".txt"; lock (_lock) { FileStream fs = new FileStream(@logPath, FileMode.OpenOrCreate, FileAccess.Write); StreamWriter m_streamWriter = new StreamWriter(fs); m_streamWriter.BaseStream.Seek(0, SeekOrigin.End); m_streamWriter.WriteLine(); m_streamWriter.WriteLine(now.ToString("yyyyMMddHHmmssfff") + " " + title); if (!string.IsNullOrWhiteSpace(detail)) { m_streamWriter.WriteLine(detail); } m_streamWriter.WriteLine(msg); m_streamWriter.Flush(); m_streamWriter.Close(); fs.Close(); } } public static string Read() { string res = ""; string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"]; logPath += DateTime.Now.ToString("yyyyMMdd") + ".txt"; lock (_lock) { StreamReader fs = new StreamReader(@logPath, Encoding.UTF8); res = fs.ReadToEnd(); fs.Dispose(); } return res; } } }
4、Form窗体测试
RabbitMQForm : Form
using BLL; using Newtonsoft.Json; using System; using System.Windows.Forms; namespace WinFormActiveMQ { public partial class RabbitMQForm : Form { static MQTestHello hello; static TCPTestHello tcpHello; static TCPTestHello tcpHello2; private Label la_main; private Timer tim; private System.Threading.Thread thrListener; private System.Threading.Thread thrListener2; public RabbitMQForm() { InitializeComponent(); la_main = new Label(); la_main.Name = "la_main"; la_main.Width = 282; la_main.TabIndex = 0; Controls.Add(la_main); tim = new Timer(); tim.Interval = 1000; tim.Tick += CheckReceive; tim.Start(); } private void Form1_Load(object sender, EventArgs e) { textBox1.ScrollBars = ScrollBars.Vertical; la_main.Text = "RabbitMQ消息队列,点击开启接收。"; TCPIp1.Text = "127.0.0.1"; TCPPort1.Text = "90"; TCPIp2.Text = "127.0.0.1"; TCPPort2.Text = "91"; hello = new MQTestHello(); } /// <summary> /// 守护线程 /// 检测接收线程是否还在,不在了重新接收 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void CheckReceive(object sender, EventArgs e) { //测试Hello队列 if (TestHello.Checked) { if (hello!=null && !hello.IsReceive) { hello.Receive(); } } if (TCPHello.Checked) { if (tcpHello != null && !tcpHello.IsListening) { thrListener = tcpHello.ListenStart(ListenClientBack); } } if (TCPListen2.Checked) { if (tcpHello2 != null && !tcpHello2.IsListening) { thrListener2 = tcpHello2.ListenStart(ListenClientBack); } } } /// <summary> /// 开启接收Hello队列,并发送一条测试消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void TestHello_CheckedChanged(object sender, EventArgs e) { if (TestHello.Checked) { //开启接收 if (!hello.IsReceive) { hello.Receive(); } //测试发送 hello.SendHello("测试第一消息"); } else { //关闭接收 hello.EndReceive(); } } /// <summary> /// 开启接收TCP请求,hello业务的 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void TCPHello_CheckedChanged(object sender, EventArgs e) { //设置ip端口输入 TCPIp1.Enabled = !TCPHello.Checked; TCPPort1.Enabled = !TCPHello.Checked; if (TCPHello.Checked) { int port = 0; try { port = int.Parse(TCPPort1.Text); } catch (Exception) { TCPHello.Checked = !TCPHello.Checked; TCPIp1.Enabled = !TCPHello.Checked; TCPPort1.Enabled = !TCPHello.Checked; MessageBox.Show("请输入正确端口号"); return; } tcpHello = new TCPTestHello(TCPIp1.Text, port, "", port); ///开启监听 thrListener = tcpHello.ListenStart(ListenClientBack); } else { //关闭监听 tcpHello.CloseListen(); } } /// <summary> /// 收到终端的TCP请求后回调显示 /// </summary> private string ListenClientBack(object obj) { if (obj == null) { //监听执行回调出错 changeListenText("\r\n 监听执行回调出错"); //用选择框改变事件关闭监听 TCPHello_CheckedChanged(null,null); return ""; } string res = JsonConvert.SerializeObject(obj); changeListenText("\r\n收到终端的请求在Form:" + res); return res; }