Wednesday, 14 December 2016

C#: zeroMQ: Publish-subscribe model sockets

In publish-subscribe model, subscribers (clients) subscribe to a server. Server sends the information to all the subscribers. In simple terms, server broadcast the messages to all the registered clients.

A subscriber can connect to more than one publisher and subscribe to a pattern. Whenever publisher has a message with given pattern it sends the messages to all relevant subscribed subscribers.

If a publisher don't have any subscribers, then it drop the message.

If you are using TCP and a subscriber is slow, messages will queue up on the publisher.

Let me explain with an example.

Publisher: keep on publishing message that prefixed with Even, Odd.

oddSubScriber: Interested in receiving odd messages
evenSubScriber: Interested in receiving even messages

Designing a publisher
To create a socket of type publisher, you need to define ZSocket of type 'PUB'.

var context = new ZContext();
var publisher = new ZSocket(context, ZSocketType.PUB);

Designing a subscriber
You need to define a ZSocket of type 'SUB'.

var context = new ZContext()
var subscriber = new ZSocket(context, ZSocketType.SUB)

After that you need to specify the pattern that this subscriber is interested in.

subscriber.Subscribe(patternToSubscribe);

Following is the complete working application.

ZeroMQServer.cs
using System;
using System.Threading;
using ZeroMQ;

namespace zeroMQTutorial
{
    public class ZeroMQServer
    {
        private String protocol;
        private int port;
        private String url;

        public ZeroMQServer(String protocol, int port)
        {
            this.protocol = protocol;
            this.port = port;
            this.url = protocol + "://127.0.0.1:" + port;
        }

        public String processData(String data)
        {
            return data.ToUpper();
        }

        public void startService()
        {
            Console.WriteLine("Server Started");
            using (var context = new ZContext())
            using (var publisher = new ZSocket(context, ZSocketType.PUB))
            {
                publisher.Bind(url);
                Console.WriteLine("Sending messages to subscribers");

                int counter = 1;

                while (true)
                {
                    String update;

                    if (counter % 2 == 0)
                    {
                        update = string.Format("{0} {1}", "Even", "Message " + counter);
                    }
                    else
                    {
                        update = string.Format("{0} {1}", "Odd", "Message " + counter);
                    }
                     
                    using (var updateFrame = new ZFrame(update))
                    {
                        publisher.Send(updateFrame);
                    }
                    counter++;
                    Thread.Sleep(1000);
                }
            }
        }
    }
}

ZeroMQClient.cs
using System;
using ZeroMQ;

namespace zeroMQTutorial
{
    class ZeroMQClient
    {
        private String protocol;
        private int port;
        private String url;
        private String patternToSubscribe;
        private String subScriberName;

        public void setSubScriberName(String subScriberName)
        {
            this.subScriberName = subScriberName;
        }

        public ZeroMQClient(String protocol, int port, String patternToSubscribe)
        {
            this.protocol = protocol;
            this.port = port;
            this.url = protocol + "://127.0.0.1:" + port;
            this.patternToSubscribe = patternToSubscribe;
        }

        /* Send data to server and get the response */
        public void listen()
        {
            using (var context = new ZContext())
            using (var subscriber = new ZSocket(context, ZSocketType.SUB))
            {
                subscriber.Connect(url);
                subscriber.Subscribe(patternToSubscribe);
                Console.WriteLine("Subscribing to the url {0} & pattern {1}", url, patternToSubscribe);

                while (true)
                {
                    var replyFrame = subscriber.ReceiveFrame();
                    string message = replyFrame.ReadString();
                    Console.WriteLine("{0} received : {1}", subScriberName, message);
                }
                
            }
        }

    }
}

Program.cs
using System;
using System.Threading;

namespace zeroMQTutorial
{
    class Program
    {

        static void Main(string[] args)
        {
            ZeroMQServer publisher = new ZeroMQServer("tcp", 12345);
            ZeroMQClient evenSubScriber = new ZeroMQClient("tcp", 12345, "Even");
            ZeroMQClient oddSubScriber = new ZeroMQClient("tcp", 12345, "Odd");

            evenSubScriber.setSubScriberName("EvenSubScriber");
            oddSubScriber.setSubScriberName("OddSubScriber");

            Thread t1 = new Thread(publisher.startService);
            Thread t2 = new Thread(evenSubScriber.listen);
            Thread t3 = new Thread(oddSubScriber.listen);

            t1.Start();
            t2.Start();
            t3.Start();
        }
    }
}

Output
Server Started
Subscribing to the url tcp://127.0.0.1:12345 & pattern Odd
Sending messages to subscribers
Subscribing to the url tcp://127.0.0.1:12345 & pattern Even
EvenSubScriber received : Even Message 2
OddSubScriber received : Odd Message 3
EvenSubScriber received : Even Message 4
OddSubScriber received : Odd Message 5
EvenSubScriber received : Even Message 6
OddSubScriber received : Odd Message 7
EvenSubScriber received : Even Message 8
OddSubScriber received : Odd Message 9

What happen if more than one subscriber register for same pattern?
All the subscribers receive the messages. For example, update Program.cs like below.

Program.cs
using System;
using System.Threading;

namespace zeroMQTutorial
{
    class Program
    {

        static void Main(string[] args)
        {
            ZeroMQServer publisher = new ZeroMQServer("tcp", 12345);
            ZeroMQClient evenSubScriber1 = new ZeroMQClient("tcp", 12345, "Even");
            ZeroMQClient evenSubScriber2 = new ZeroMQClient("tcp", 12345, "Even");

            evenSubScriber1.setSubScriberName("EvenSubScriber1");
            evenSubScriber2.setSubScriberName("EvenSubScriber2");

            Thread t1 = new Thread(publisher.startService);
            Thread t2 = new Thread(evenSubScriber1.listen);
            Thread t3 = new Thread(evenSubScriber2.listen);

            t1.Start();
            t2.Start();
            t3.Start();
        }
    }
}

Both the subscribers evenSubScriber1, evenSubScriber2 register for same pattern 'Even'.

Run 'Program.cs', you can able to see following output.

Output
Server Started
Sending messages to subscribers
Subscribing to the url tcp://127.0.0.1:12345 & pattern Even
Subscribing to the url tcp://127.0.0.1:12345 & pattern Even
EvenSubScriber2 received : Even Message 2
EvenSubScriber1 received : Even Message 2
EvenSubScriber2 received : Even Message 4
EvenSubScriber1 received : Even Message 4
EvenSubScriber2 received : Even Message 6
EvenSubScriber1 received : Even Message 6
EvenSubScriber2 received : Even Message 8
EvenSubScriber1 received : Even Message 8







Previous                                                 Next                                                 Home

No comments:

Post a Comment