当前位置 博文首页 > 文章内容

    【C#】【D】消息队列RabbitMQ,和TCP监听Demo。

    作者: 栏目:未分类 时间:2020-10-24 18:01:17

    本站于2023年9月4日。收到“大连君*****咨询有限公司”通知
    说我们IIS7站长博客,有一篇博文用了他们的图片。
    要求我们给他们一张图片6000元。要不然法院告我们

    为避免不必要的麻烦,IIS7站长博客,全站内容图片下架、并积极应诉
    博文内容全部不再显示,请需要相关资讯的站长朋友到必应搜索。谢谢!

    另祝:版权碰瓷诈骗团伙,早日弃暗投明。

    相关新闻:借版权之名、行诈骗之实,周某因犯诈骗罪被判处有期徒刑十一年六个月

    叹!百花齐放的时代,渐行渐远!



     

    注意连接队列服务器时,参数必须和服务器配置一致

      private string queue;//队列名
            private bool durable;//持久化
            private bool exclusive;//独占
            private bool autoDelete;//自动删除

    默认帐号guest不能远程。

    默认访问队列端口是5672,后台网站端口默认是15672。

     

    1、实现发送和接收,类RabbitMQServerT

    using System;
    using MQServer;
    using RabbitMQ.Client;
    using System.Text;
    using System.Configuration;
    using RabbitMQ.Client.Events;
    using Newtonsoft.Json;
    
    namespace MQServer
    {
        /// <summary>
        /// RabbitMQ消息队列类
        /// </summary>
        public class RabbitMQServerT 
        {
            protected readonly Action<string, object> receive;//接收回调
            private object penetrate;//接收回调透传参数
            private string queue;//队列名
            private bool durable;//持久化
            private bool exclusive;//独占
            private bool autoDelete;//自动删除
            private bool isBeginInvoke;//接收后业务是否异步,异步的话消息可能在确认前被其他线程读走,造成重复读。//不异步就阻塞。//异步请独占
    
            //接收消息对象
            private IConnection connection;
            private IModel channel;
    
            public bool IsReceive;
    
            private ConnectionFactory factory;
            private RabbitMQServerT()
            {
            }
    
            /// <summary>
            /// 使用默认配置参数
            /// </summary>
            /// <param name="_receive">消费事件,空则不消费</param>
            /// <param name="_queue">消息路径最后一层名字,可用于区分业务</param>
            /// <param name="_penetrate">接收回调透传参数</param>
            public RabbitMQServerT(Action<string, object> _receive, string _queue = @"hello", object _penetrate = null)
            {
                queue = _queue;
                receive = _receive;
                penetrate = _penetrate;
                isBeginInvoke = false;
    
                durable = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_durable"].ToString());//
                exclusive = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_exclusive"].ToString());//
                autoDelete = bool.Parse(ConfigurationManager.AppSettings["RabbitMQ_autoDelete"].ToString());//
    
                factory = new ConnectionFactory();
                factory.HostName = ConfigurationManager.AppSettings["RabbitMQHostName"];//RabbitMQ服务器
                factory.UserName = ConfigurationManager.AppSettings["RabbitMQUserName"];//用户名
                factory.Password = ConfigurationManager.AppSettings["RabbitMQPassword"];//密码
                factory.Port = int.Parse(ConfigurationManager.AppSettings["RabbitMQPort"].ToString());//
                if (!string.IsNullOrWhiteSpace(ConfigurationManager.AppSettings["RabbitMQVirtualHost"]))
                {
                    factory.VirtualHost = ConfigurationManager.AppSettings["RabbitMQVirtualHost"];//
                }
            }
    
            /// <summary>
            /// 使用手动参数
            /// </summary>
            /// <param name="_receive">消费事件,空则不消费</param>
            /// <param name="_queue">消息路径最后一层名字,可用于区分业务</param>
            /// <param name="_penetrate">接收回调透传参数</param>
            /// <param name="factory">连接队列服务器</param>
            /// <param name="durable">持久化</param>
            /// <param name="exclusive">独占</param>
            /// <param name="autoDelete">自动删除</param>
            /// <param name="isBeginInvoke">接收是否异步//异步请独占,否则异常</param>
            public RabbitMQServerT(Action<string, object> _receive, string _queue, object _penetrate, ConnectionFactory factory
                ,bool durable,bool exclusive, bool autoDelete,bool isBeginInvoke)
            {
                queue = _queue;
                receive = _receive;
                penetrate = _penetrate;
    
                this.factory = factory;
                this.durable = durable;
                this.exclusive = exclusive;
                this.autoDelete = autoDelete;
                this.isBeginInvoke = isBeginInvoke;
    
                //异步请独占,不然会重复读
                if (isBeginInvoke == true && exclusive == false) 
                {
                    throw new Exception("接收消息队列对象RabbitMQServerT参数isBeginInvoke=true异步执行接收业务,如果要异步执行业务,请独占该消息exclusive=true,否则会被其他线程重复读取。");
                }
            }
    
            /// <summary>
            /// 发送消息
            /// </summary>
            /// <param name="message"></param>
            public void Send(string message)
            {
                //发送消息队列
                try
                {
                    using (var connection = factory.CreateConnection())
                    {
                        using (var channel = connection.CreateModel())
                        {
                            channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);//创建一个消息队列
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish("", queue, null, body); //开始传递
                            //TLogHelper.Info(message, "RabbitMQServerTSend");//发送的内容写进TXT
                        }
                    }
                }
                catch (Exception ex)
                {
                    TLogHelper.Error(ex.Message, "发送消息队列异常RabbitMQServerTSend:\n" + message);
                }
            }
            /// <summary>
            /// 发送消息
            /// </summary>
            /// <param name="message"></param>
            public void Send(RabbitMQMsgModel model)
            {
                //发送消息队列
                string message = JsonConvert.SerializeObject(model);
                Send(message);
            }
    
            /// <summary>
            /// 进行接收消息队列
            /// </summary>
            public void Receive()
            {
                if (receive == null)
                {
                    return;
                }
                IsReceive = true;
                try
                {
                    connection = factory.CreateConnection();
                    channel = connection.CreateModel();
                  
                    channel.QueueDeclare(queue, durable, exclusive, autoDelete, null);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
    
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        try
                        {
                            var body = ea.Body.ToArray();
                            var message = Encoding.UTF8.GetString(body);
    
                            //接收后业务
                            if (isBeginInvoke)
                            {
                                receive?.BeginInvoke(message, penetrate,(e)=>{
                                    //确认消息
                                    channel.BasicAck(ea.DeliveryTag, false);
                                },null);
                            }
                            else 
                            {
                                receive?.Invoke(message, penetrate);
                                //确认消息
                                channel.BasicAck(ea.DeliveryTag, false);
                            }
                        }
                        catch (Exception ex)
                        {
                            TLogHelper.Error(ex.Message, "接收消息队列业务异常Received:"+queue);
                        }
                        finally
                        {
                            //再次生成接收
                            Receive();
                        }
                    };
                    channel.BasicConsume(queue, true, consumer);
                }
                catch (Exception ex)
                {
                    TLogHelper.Error(ex.Message, "接收消息队列异常Receive");
                }
            }
    
            /// <summary>
            /// 取消接收
            /// </summary>
            public void EndReceive()
            {
                IsReceive=false;
                channel.Dispose();
                connection.Dispose();
            }
        }
    }
    View Code

     

    消息格式RabbitMQMsgModel

    namespace MQServer
    {
        public class RabbitMQMsgModel
        {
            /// <summary>
            /// 业务名
            /// </summary>
            public string BLLName { get; set; }
    
            /// <summary>
            /// 业务数据
            /// </summary>
            public object Data { get; set; }
        }
    }
    View Code

     

    2、BLL实现消息业务

    BaseMQBLL

    using System;
    
    namespace MQServer
    {
        /// <summary>
        /// 使用队列业务基类
        /// </summary>
        public abstract class BaseMQBLL : IDisposable
        {
            public bool IsReceive { get { return MQ.IsReceive; } }
    
            protected readonly RabbitMQServerT MQ;
    
            private BaseMQBLL(){ }
    
            protected BaseMQBLL(string queue,object _penetrate) 
            {
                
                MQ = new MQServer.RabbitMQServerT((string source, object o) => {
                    try
                    {
                        ReceiveBack(source, _penetrate);
                        ////test
                        //throw new Exception("测试消息异常");
                    }
                    catch (Exception)
                    {
                        throw;
                    }
                }, queue, _penetrate: null);
            }
            /// <summary>
            /// 开启接收
            /// </summary>
            public void Receive()
            {
                MQ.Receive();
            }
    
            /// <summary>
            /// 关闭接收
            /// </summary>
            public void EndReceive()
            {
                MQ.EndReceive();
            }
    
            /// <summary>
            /// 声明必须重写的接收回调
            /// </summary>
            /// <param name="source"></param>
            /// <param name="receiveO"></param>
            protected abstract void ReceiveBack(string source, object receiveO);
    
            public void Dispose() 
            {
                EndReceive();
            }
        }
    }
    View Code

     

    MQTestHello: BaseMQBLL

    using MQServer;
    using Newtonsoft.Json;
    
    namespace BLL
    {
        public class MQTestHello : BaseMQBLL
        {
            public MQTestHello()
                : base("hello", null)
            {
            }
    
            /// <summary>
            /// 重写接收回调
            /// </summary>
            /// <param name="source"></param>
            /// <param name="receiveO"></param>
            protected override void ReceiveBack(string source, object receiveO)
            {
                //解析source,根据source中的BLLName方法名,执行不同业务
                RabbitMQMsgModel model = JsonConvert.DeserializeObject<RabbitMQMsgModel>(source);
    
                switch (model.BLLName)
                {
                    case "Hello":
                        Hello(model.Data);
                        break;
                    default:
                        break;
                }
            }
    
            /// <summary>
            /// 发送Hello消息
            /// </summary>
            public void SendHello(string msg)
            {
                MQ.Send(new RabbitMQMsgModel() { BLLName = "Hello", Data = msg });
            }
    
            /// <summary>
            /// 接收到Hello消息回调
            /// </summary>
            /// <param name="data"></param>
            public void Hello(object data)
            {
                TLogHelper.Info(JsonConvert.SerializeObject(data), "读取消息在MQTestHello");
            }
        }
    }
    View Code

     

    3、记录日志

    TLogHelper

    using Newtonsoft.Json;
    using System;
    using System.IO;
    using System.Messaging;
    using System.Text;
    
    namespace MQServer.Log
    {
        public class TLogHelper
        {
            public static object _lock = new object();
            public static void MQ(Message myMessage,  string detail = "") 
            {
                string msg = JsonConvert.SerializeObject(myMessage.Body);
    
                Write(msg, detail, "MessageQueue");
            }
    
            public static void Info(string msg, string detail = "")
            {
                Write(msg, detail, "Info");
            }
            public static void Info(object msg, string detail = "")
            {
                Write(JsonConvert.SerializeObject(msg), detail, "Info");
            }
    
            public static void Error(string msg, string detail = "")
            {
                Write(msg, detail, "Error");
            }
    
            private static void Write(string msg,string detail="", string title = "Info") 
            {
                DateTime now = DateTime.Now;
                string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"];
    
                if (!Directory.Exists(logPath))
                {
                    Directory.CreateDirectory(logPath);
                }
    
                logPath += now.ToString("yyyyMMdd") + ".txt";
                lock (_lock)
                {
                    FileStream fs = new FileStream(@logPath, FileMode.OpenOrCreate, FileAccess.Write);
    
                    StreamWriter m_streamWriter = new StreamWriter(fs);
    
                    m_streamWriter.BaseStream.Seek(0, SeekOrigin.End);
    
                    m_streamWriter.WriteLine();
                    m_streamWriter.WriteLine(now.ToString("yyyyMMddHHmmssfff") + " " + title);
                    if (!string.IsNullOrWhiteSpace(detail))
                    {
                        m_streamWriter.WriteLine(detail);
                    }
    
                    m_streamWriter.WriteLine(msg);
    
                    m_streamWriter.Flush();
    
                    m_streamWriter.Close();
    
                    fs.Close();
                }
                
    
            }
    
            public static string Read() 
            {
                string res = "";
                string logPath = System.Configuration.ConfigurationManager.AppSettings["MQServerTLogPath"];
                
                logPath += DateTime.Now.ToString("yyyyMMdd") + ".txt";
                lock (_lock)
                {
                    StreamReader fs = new StreamReader(@logPath, Encoding.UTF8);
                    res = fs.ReadToEnd();
                    fs.Dispose();
                }
                return res;
            }
        }
    }
    View Code

     

    4、Form窗体测试

     RabbitMQForm : Form

    using BLL;
    using Newtonsoft.Json;
    using System;
    using System.Windows.Forms;
    
    namespace WinFormActiveMQ
    {
        public partial class RabbitMQForm : Form
        {
            static MQTestHello hello;
            static TCPTestHello tcpHello;
            static TCPTestHello tcpHello2;
            private Label la_main;
            private Timer tim;
            private System.Threading.Thread thrListener;
            private System.Threading.Thread thrListener2;
            public RabbitMQForm()
            {
                InitializeComponent();
    
                la_main = new Label();
                la_main.Name = "la_main";
                la_main.Width = 282;
                la_main.TabIndex = 0;
                Controls.Add(la_main);
    
                tim = new Timer();
                tim.Interval = 1000;
                tim.Tick += CheckReceive;
                tim.Start();
    
            }
    
            private void Form1_Load(object sender, EventArgs e)
            {
                textBox1.ScrollBars = ScrollBars.Vertical;
                la_main.Text = "RabbitMQ消息队列,点击开启接收。";
                TCPIp1.Text = "127.0.0.1";
                TCPPort1.Text = "90";
                TCPIp2.Text = "127.0.0.1";
                TCPPort2.Text = "91";
    
                hello = new MQTestHello(); 
            }
    
            /// <summary>
            /// 守护线程
            /// 检测接收线程是否还在,不在了重新接收
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void CheckReceive(object sender, EventArgs e)
            {
                //测试Hello队列
                if (TestHello.Checked)
                {
                    if (hello!=null && !hello.IsReceive)
                    {
                        hello.Receive();
                    }
                }
    
                if (TCPHello.Checked)
                {
                    if (tcpHello != null && !tcpHello.IsListening)
                    {
                        thrListener = tcpHello.ListenStart(ListenClientBack);
                    }
                }
                
                if (TCPListen2.Checked)
                {
                    if (tcpHello2 != null && !tcpHello2.IsListening)
                    {
                        thrListener2 = tcpHello2.ListenStart(ListenClientBack);
                    }
                }
    
            }
    
            /// <summary>
            /// 开启接收Hello队列,并发送一条测试消息
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void TestHello_CheckedChanged(object sender, EventArgs e)
            {
                if (TestHello.Checked)
                {
                    //开启接收
                    if (!hello.IsReceive)
                    {
                        hello.Receive();
                    }
    
                    //测试发送
                    hello.SendHello("测试第一消息");
                }
                else
                {
                    //关闭接收
                    hello.EndReceive();
                }
            }
    
            /// <summary>
            /// 开启接收TCP请求,hello业务的
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void TCPHello_CheckedChanged(object sender, EventArgs e)
            {
                //设置ip端口输入
                TCPIp1.Enabled = !TCPHello.Checked;
                TCPPort1.Enabled = !TCPHello.Checked;
    
                if (TCPHello.Checked)
                {
                    int port = 0;
                    try
                    {
                        port = int.Parse(TCPPort1.Text);
                    }
                    catch (Exception)
                    {
                        TCPHello.Checked = !TCPHello.Checked;
                        TCPIp1.Enabled = !TCPHello.Checked;
                        TCPPort1.Enabled = !TCPHello.Checked;
                        MessageBox.Show("请输入正确端口号");
                        return;
                    }
    
                    tcpHello = new TCPTestHello(TCPIp1.Text, port, "", port);
                    ///开启监听
                    thrListener = tcpHello.ListenStart(ListenClientBack);
                }
                else
                {
                    //关闭监听
                    tcpHello.CloseListen();
                }
            }
    
            /// <summary>
            /// 收到终端的TCP请求后回调显示
            /// </summary>
            private string ListenClientBack(object obj)
            {
                if (obj == null) 
                {
                    //监听执行回调出错
                    changeListenText("\r\n  监听执行回调出错");
    
                    //用选择框改变事件关闭监听
                    TCPHello_CheckedChanged(null,null);
                    return "";
                }
    
                string res = JsonConvert.SerializeObject(obj);
                changeListenText("\r\n收到终端的请求在Form:" + res);
                return res;
            }