In
publish-subscribe model, subscribers (clients) subscribe to a server. Server
sends the information to all the subscribers. In simple terms, server broadcast
the messages to all the registered clients.
A
subscriber can connect to more than one publisher and subscribe to a pattern.
Whenever publisher has a message with given pattern it sends the messages to
all relevant subscribed subscribers.
If
a publisher don't have any subscribers, then it drop the message.
If
you are using TCP and a subscriber is slow, messages will queue up on the
publisher.
Let
me explain with an example.
Publisher: keep on publishing
message that prefixed with Even, Odd.
oddSubScriber:
Interested in receiving odd messages
evenSubScriber:
Interested in receiving even messages
Designing a publisher
To
create a socket of type publisher, you need to define ZSocket of type 'PUB'.
var
context = new ZContext();
var
publisher = new ZSocket(context, ZSocketType.PUB);
Designing a
subscriber
You
need to define a ZSocket of type 'SUB'.
var
context = new ZContext()
var
subscriber = new ZSocket(context, ZSocketType.SUB)
After
that you need to specify the pattern that this subscriber is interested in.
subscriber.Subscribe(patternToSubscribe);
Following
is the complete working application.
ZeroMQServer.cs
using System; using System.Threading; using ZeroMQ; namespace zeroMQTutorial { public class ZeroMQServer { private String protocol; private int port; private String url; public ZeroMQServer(String protocol, int port) { this.protocol = protocol; this.port = port; this.url = protocol + "://127.0.0.1:" + port; } public String processData(String data) { return data.ToUpper(); } public void startService() { Console.WriteLine("Server Started"); using (var context = new ZContext()) using (var publisher = new ZSocket(context, ZSocketType.PUB)) { publisher.Bind(url); Console.WriteLine("Sending messages to subscribers"); int counter = 1; while (true) { String update; if (counter % 2 == 0) { update = string.Format("{0} {1}", "Even", "Message " + counter); } else { update = string.Format("{0} {1}", "Odd", "Message " + counter); } using (var updateFrame = new ZFrame(update)) { publisher.Send(updateFrame); } counter++; Thread.Sleep(1000); } } } } }
ZeroMQClient.cs
using System; using ZeroMQ; namespace zeroMQTutorial { class ZeroMQClient { private String protocol; private int port; private String url; private String patternToSubscribe; private String subScriberName; public void setSubScriberName(String subScriberName) { this.subScriberName = subScriberName; } public ZeroMQClient(String protocol, int port, String patternToSubscribe) { this.protocol = protocol; this.port = port; this.url = protocol + "://127.0.0.1:" + port; this.patternToSubscribe = patternToSubscribe; } /* Send data to server and get the response */ public void listen() { using (var context = new ZContext()) using (var subscriber = new ZSocket(context, ZSocketType.SUB)) { subscriber.Connect(url); subscriber.Subscribe(patternToSubscribe); Console.WriteLine("Subscribing to the url {0} & pattern {1}", url, patternToSubscribe); while (true) { var replyFrame = subscriber.ReceiveFrame(); string message = replyFrame.ReadString(); Console.WriteLine("{0} received : {1}", subScriberName, message); } } } } }
Program.cs
using System; using System.Threading; namespace zeroMQTutorial { class Program { static void Main(string[] args) { ZeroMQServer publisher = new ZeroMQServer("tcp", 12345); ZeroMQClient evenSubScriber = new ZeroMQClient("tcp", 12345, "Even"); ZeroMQClient oddSubScriber = new ZeroMQClient("tcp", 12345, "Odd"); evenSubScriber.setSubScriberName("EvenSubScriber"); oddSubScriber.setSubScriberName("OddSubScriber"); Thread t1 = new Thread(publisher.startService); Thread t2 = new Thread(evenSubScriber.listen); Thread t3 = new Thread(oddSubScriber.listen); t1.Start(); t2.Start(); t3.Start(); } } }
Output
Server Started Subscribing to the url tcp://127.0.0.1:12345 & pattern Odd Sending messages to subscribers Subscribing to the url tcp://127.0.0.1:12345 & pattern Even EvenSubScriber received : Even Message 2 OddSubScriber received : Odd Message 3 EvenSubScriber received : Even Message 4 OddSubScriber received : Odd Message 5 EvenSubScriber received : Even Message 6 OddSubScriber received : Odd Message 7 EvenSubScriber received : Even Message 8 OddSubScriber received : Odd Message 9
What happen if more
than one subscriber register for same pattern?
All
the subscribers receive the messages. For example, update Program.cs like
below.
Program.cs
using System; using System.Threading; namespace zeroMQTutorial { class Program { static void Main(string[] args) { ZeroMQServer publisher = new ZeroMQServer("tcp", 12345); ZeroMQClient evenSubScriber1 = new ZeroMQClient("tcp", 12345, "Even"); ZeroMQClient evenSubScriber2 = new ZeroMQClient("tcp", 12345, "Even"); evenSubScriber1.setSubScriberName("EvenSubScriber1"); evenSubScriber2.setSubScriberName("EvenSubScriber2"); Thread t1 = new Thread(publisher.startService); Thread t2 = new Thread(evenSubScriber1.listen); Thread t3 = new Thread(evenSubScriber2.listen); t1.Start(); t2.Start(); t3.Start(); } } }
Both
the subscribers evenSubScriber1, evenSubScriber2 register for same pattern
'Even'.
Run
'Program.cs', you can able to see following output.
Output
Server Started Sending messages to subscribers Subscribing to the url tcp://127.0.0.1:12345 & pattern Even Subscribing to the url tcp://127.0.0.1:12345 & pattern Even EvenSubScriber2 received : Even Message 2 EvenSubScriber1 received : Even Message 2 EvenSubScriber2 received : Even Message 4 EvenSubScriber1 received : Even Message 4 EvenSubScriber2 received : Even Message 6 EvenSubScriber1 received : Even Message 6 EvenSubScriber2 received : Even Message 8 EvenSubScriber1 received : Even Message 8
No comments:
Post a Comment