ZeroMQ入门教程(三)
作者 伊可 | 发布于 2015-08-09
ZeroMQ 消息队列 套接字多线程

基本通信模型的实现

      ZMQ支持多种语言,ZMQ原生语言是C,工作中使用的也是对ZMQ进行C的应用,这里也使用来实现。下面分别实现与以上对应的三种模型

1、Hello World

      最简单的例子,即 HelloWorld 项目讲起。这就是网络请求中最基本的“请求-响应”模式(Request-Reply),客户端往服务端发送“Hello”,服务端回应“World”。

      以下是 HelloWorld 项目的服务端代码(hello_world_server.c),熟悉 Socket 编程的同学应该很容易理解其中的语法,即使用 TCP 协议,监听 5555 端口,然后不停地接受、打印并返回信息,每次处理后停止 1 秒。

    // //Hello World server //Bind REP socket to tcp://*:5555 //Expects "Hello" from client ,replies with "World" // // Hello World server #include #include #include #include #include int main (void) { // Socket to talk to clients void *context = zmq_ctx_new (); void *responder = zmq_socket (context, ZMQ_REP); int rc = zmq_bind (responder, "tcp://*:5555"); assert (rc == 0); while (1) { char buffer [10]; zmq_recv (responder, buffer, 10, 0); printf ("Received Hello\n"); zmq_send (responder, "World", 5, 0); sleep (1); // Do some 'work' } return 0; }

      以下是 HelloWorld 项目的客户端代码(hello_world_client.c),逻辑也很简单,向服务端连续发送 10 条消息,接受并打印返回信息。

    // //Hello World server //Bind REP socket to tcp://*:5555 //Expects "Hello" from client ,replies with "World" // // Hello World client #include #include #include #include int main (void) { printf ("Connecting to hello world server…\n"); void *context = zmq_ctx_new (); void *requester = zmq_socket (context, ZMQ_REQ); zmq_connect (requester, "tcp://localhost:5555"); int request_nbr; for (request_nbr = 0; request_nbr != 10; request_nbr++) { char buffer [10]; printf ("Sending Hello %d…\n", request_nbr); zmq_send (requester, "Hello", 5, 0); zmq_recv (requester, buffer, 10, 0); printf ("Received World %d\n", request_nbr); } zmq_close (requester); zmq_ctx_destroy (context); return 0; }

2、Weather-update

      订阅发布模型是一种数据分发的方法,一台服务器把数据发送到一系列的客户机上。让我们来看一个发送有地点,温度和相对湿度组成的气候消息。我们将产生随机值,如真正的气象站所做的一样。

      以下是这是服务器应用程序,它监听新的消息流,并且抓取所有与特定地址有关的消息。这个消息留没有起点也没有终点,它像一个不会终止的广播。

    // Weather update server // Binds PUB socket to tcp://*:5556 // Publishes random weather updates #include "zhelpers.h" int main (void) { // Prepare our context and publisher void *context = zmq_ctx_new (); void *publisher = zmq_socket (context, ZMQ_PUB); int rc = zmq_bind (publisher, "tcp://*:5556"); assert (rc == 0); // Initialize random number generator srandom ((unsigned) time (NULL)); while (1) { // Get values that will fool the boss int zipcode, temperature, relhumidity; zipcode = randof (100000); temperature = randof (215) - 80; relhumidity = randof (50) + 10; // Send message to all subscribers char update [20]; sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity); s_send (publisher, update); } zmq_close (publisher); zmq_ctx_destroy (context); return 0; }

      PUB-SUB套接字对是异步的。客户机循环调用函数zmq_recv(3)(或者只有一次,根据需要来定)。试图发送一个消息给一个SUB套接字就会出错。相似地,服务器根据需要调用函数zmq_send(3),但是不能用PUB套接字调用函数zmq_recv(3)。

    // Weather update client // Connects SUB socket to tcp://localhost:5556 // Collects weather updates and finds avg temp in zipcode #include "zhelpers.h" int main (int argc, char *argv []) { // Socket to talk to server printf ("Collecting updates from weather server…\n"); void *context = zmq_ctx_new (); void *subscriber = zmq_socket (context, ZMQ_SUB); int rc = zmq_connect (subscriber, "tcp://localhost:5556"); assert (rc == 0); // Subscribe to zipcode, default is NYC, 10001 char *filter = (argc > 1)? argv [1]: "10001 "; rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, filter, strlen (filter)); assert (rc == 0); // Process 100 updates int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { char *string = s_recv (subscriber); int zipcode, temperature, relhumidity; sscanf (string, "%d %d %d", &zipcode, &temperature, &relhumidity); total_temp += temperature; free (string); } printf ("Average temperature for zipcode '%s' was %dF\n", filter, (int) (total_temp / update_nbr)); zmq_close (subscriber); zmq_ctx_destroy (context); return 0; }

3、Ventilator-sink

      这是风扇(ventilator)的程序。。它产生100个任务,每个任务都是一个通知工作进程产生几微秒休眠(sleep)的消息。

    // Task ventilator // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket #include "zhelpers.h" int main (void) { void *context = zmq_ctx_new (); // Socket to send messages on void *sender = zmq_socket (context, ZMQ_PUSH); zmq_bind (sender, "tcp://*:5557"); // Socket to send start of batch message on void *sink = zmq_socket (context, ZMQ_PUSH); zmq_connect (sink, "tcp://localhost:5558"); printf ("Press Enter when the workers are ready: "); getchar (); printf ("Sending tasks to workers…\n"); // The first message is "0" and signals start of batch s_send (sink, "0"); // Initialize random number generator srandom ((unsigned) time (NULL)); // Send 100 tasks int task_nbr; int total_msec = 0; // Total expected cost in msecs for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; // Random workload from 1 to 100msecs workload = randof (100) + 1; total_msec += workload; char string [10]; sprintf (string, "%d", workload); s_send (sender, string); } printf ("Total expected cost: %d msec\n", total_msec); zmq_close (sink); zmq_close (sender); zmq_ctx_destroy (context); return 0; }

      这是工作进程应用程序,它接收一个消息,休眠(sleep)几秒后发送它已完成了的信号。

    // Task worker // Connects PULL socket to tcp://localhost:5557 // Collects workloads from ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to sink via that socket #include "zhelpers.h" int main (void) { // Socket to receive messages on void *context = zmq_ctx_new (); void *receiver = zmq_socket (context, ZMQ_PULL); zmq_connect (receiver, "tcp://localhost:5557"); // Socket to send messages to void *sender = zmq_socket (context, ZMQ_PUSH); zmq_connect (sender, "tcp://localhost:5558"); // Process tasks forever while (1) { char *string = s_recv (receiver); printf ("%s.", string); // Show progress fflush (stdout); s_sleep (atoi (string)); // Do the work free (string); s_send (sender, ""); // Send results to sink } zmq_close (receiver); zmq_close (sender); zmq_ctx_destroy (context); return 0; }

      这是监听器的应用程序。它收集这100个任务,计算整体处理花费的时间,这样我们就能确定,当不止一个工作进程的时候,它们能够并行运行。

    // Task sink // Binds PULL socket to tcp://localhost:5558 // Collects results from workers via that socket #include "zhelpers.h" int main (void) { // Prepare our context and socket void *context = zmq_ctx_new (); void *receiver = zmq_socket (context, ZMQ_PULL); zmq_bind (receiver, "tcp://*:5558"); // Wait for start of batch char *string = s_recv (receiver); free (string); // Start our clock now int64_t start_time = s_clock (); // Process 100 confirmations int task_nbr; for (task_nbr = 0; task_nbr < 100; task_nbr++) { char *string = s_recv (receiver); free (string); if ((task_nbr / 10) * 10 == task_nbr) printf (":"); else printf ("."); fflush (stdout); } // Calculate and report duration of batch printf ("Total elapsed time: %d msec\n", (int) (s_clock () - start_time)); zmq_close (receiver); zmq_ctx_destroy (context); return 0; }