rabbitmq学习5:Topics_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > rabbitmq学习5:Topics

rabbitmq学习5:Topics

 2013/12/10 12:26:12  yimeng528  程序员俱乐部  我要评论(0)
  • 摘要:在前面的《rabbitmq学习4:Routing》中使用一般的名字的路由,现在想通过一些路由规则让消费者来接受符合规则的消息?那应当怎么样呢?那就要用到类型为topic的Exchange了。Topics的工作示意图如下:我们可能从图中看到有*和#两个通配符。*表示通配一个词;#表示通配0个或多个词。下面让我们来看看Topics的程序如何实现的吧!P端的程序如下:Java代码packagecom.abin.rabbitmq;importcom.rabbitmq.client.Channel
  • 标签:学习

在前面的《rabbitmq学习4:Routing?》中使用一般的名字的路由,现在想通过一些路由规则让消费者来接受符合规则的消息?那应当怎么样呢?那就要用到类型为topic的Exchange了。

Topics的工作示意图如下:


我们可能从图中看到有*和#两个通配符。*表示通配一个词;#表示通配0个或多个词。

下面让我们来看看Topics的程序如何实现的吧!

P端的程序如下 :

?

Java代码??收藏代码class="star">
  1. package?com.abin.rabbitmq;??
  2. ??
  3. import?com.rabbitmq.client.Channel;??
  4. import?com.rabbitmq.client.Connection;??
  5. import?com.rabbitmq.client.ConnectionFactory;??
  6. ??
  7. public?class?EmitLogTopic?{??
  8. ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs";??
  9. ??
  10. ????public?static?void?main(String[]?argv)?throws?Exception?{??
  11. ??
  12. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  13. ????????factory.setHost("localhost");??
  14. ????????Connection?connection?=?factory.newConnection();??
  15. ????????Channel?channel?=?connection.createChannel();??
  16. ??
  17. ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");//声明topic类型的Exchange??
  18. ??
  19. ????????String?routingKeyOne?=?"logs.error.one";//?定义一个路由名为“error”??
  20. ????????for?(int?i?=?0;?i?<=?1;?i++)?{??
  21. ????????????String?messageOne?=?"this?is?one?error?logs:"?+?i;??
  22. ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyOne,?null,?messageOne??
  23. ????????????????????.getBytes());??
  24. ????????????System.out.println("?[x]?Sent?'"?+?routingKeyOne?+?"':'"??
  25. ????????????????????+?messageOne?+?"'");??
  26. ????????}??
  27. ??
  28. ????????System.out.println("################################");??
  29. ????????String?routingKeyTwo?=?"logs.error.two";??
  30. ????????for?(int?i?=?0;?i?<=?2;?i++)?{??
  31. ????????????String?messageTwo?=?"this?is?two?error?logs:"?+?i;??
  32. ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyTwo,?null,?messageTwo??
  33. ????????????????????.getBytes());??
  34. ????????????System.out.println("?[x]?Sent?'"?+?routingKeyTwo?+?"':'"??
  35. ????????????????????+?messageTwo?+?"'");??
  36. ????????}??
  37. ??
  38. ????????System.out.println("################################");??
  39. ????????String?routingKeyThree?=?"logs.info.one";??
  40. ????????for?(int?i?=?0;?i?<=?3;?i++)?{??
  41. ????????????String?messageThree?=?"this?is?one?info?logs:"?+?i;??
  42. ????????????channel.basicPublish(EXCHANGE_NAME,?routingKeyThree,?null,??
  43. ????????????????????messageThree.getBytes());??
  44. ????????????System.out.println("?[x]?Sent?'"?+?routingKeyThree?+?"':'"??
  45. ????????????????????+?messageThree?+?"'");??
  46. ????????}??
  47. ??
  48. ????????channel.close();??
  49. ????????connection.close();??
  50. ????}??
  51. }??

?

运行结果可能如下:

Java代码??收藏代码
  1. ?[x]?Sent?'logs.error.one':'this?is?one?error?logs:0'??
  2. ?[x]?Sent?'logs.error.one':'this?is?one?error?logs:1'??
  3. ################################??
  4. ?[x]?Sent?'logs.error.two':'this?is?two?error?logs:0'??
  5. ?[x]?Sent?'logs.error.two':'this?is?two?error?logs:1'??
  6. ?[x]?Sent?'logs.error.two':'this?is?two?error?logs:2'??
  7. ################################??
  8. ?[x]?Sent?'logs.info.one':'this?is?one?info?logs:0'??
  9. ?[x]?Sent?'logs.info.one':'this?is?one?info?logs:1'??
  10. ?[x]?Sent?'logs.info.one':'this?is?one?info?logs:2'??
  11. ?[x]?Sent?'logs.info.one':'this?is?one?info?logs:3'??

?

第一个C端的代码如下:

Java代码??收藏代码
  1. package?com.abin.rabbitmq;??
  2. ??
  3. import?com.rabbitmq.client.Channel;??
  4. import?com.rabbitmq.client.Connection;??
  5. import?com.rabbitmq.client.ConnectionFactory;??
  6. import?com.rabbitmq.client.QueueingConsumer;??
  7. ??
  8. public?class?ReceiveLogsTopic?{??
  9. ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs";//?定义Exchange名称??
  10. ??
  11. ????public?static?void?main(String[]?argv)?throws?Exception?{??
  12. ??
  13. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  14. ????????factory.setHost("localhost");??
  15. ????????Connection?connection?=?factory.newConnection();??
  16. ????????Channel?channel?=?connection.createChannel();??
  17. ??
  18. ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");//?声明topic类型的Exchange??
  19. ??
  20. ????????String?queueName?=?"queue_topic_logs1";//?定义队列名为“queue_topic_logs1”的Queue??
  21. ????????channel.queueDeclare(queueName,?false,?false,?false,?null);??
  22. //??????String?routingKeyOne?=?"*.error.two";//?"error"路由规则??
  23. //??????channel.queueBind(queueName,?EXCHANGE_NAME,?routingKeyOne);//?把Queue、Exchange及路由绑定??
  24. ????????String?routingKeyTwo?=?"logs.*.one";//通配所有logs下第三词(最后一个)词为one的消息??
  25. ????????channel.queueBind(queueName,?EXCHANGE_NAME,?routingKeyTwo);??
  26. ??
  27. ????????System.out.println("?[*]?Waiting?for?messages.");??
  28. ??
  29. ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
  30. ????????channel.basicConsume(queueName,?true,?consumer);??
  31. ??
  32. ????????while?(true)?{??
  33. ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
  34. ????????????String?message?=?new?String(delivery.getBody());??
  35. ????????????String?routingKey?=?delivery.getEnvelope().getRoutingKey();??
  36. ??
  37. ????????????System.out.println("?[x]?Received?'"?+?routingKey?+?"':'"?+?message??
  38. ????????????????????+?"'");??
  39. ????????}??
  40. ????}??
  41. }??

?

第一个C端的运行结果如下:

Java代码??收藏代码
  1. [*]?Waiting?for?messages.??
  2. [x]?Received?'logs.error.one':'this?is?one?error?logs:0'??
  3. [x]?Received?'logs.error.one':'this?is?one?error?logs:1'??
  4. [x]?Received?'logs.info.one':'this?is?one?info?logs:0'??
  5. [x]?Received?'logs.info.one':'this?is?one?info?logs:1'??
  6. [x]?Received?'logs.info.one':'this?is?one?info?logs:2'??
  7. [x]?Received?'logs.info.one':'this?is?one?info?logs:3'??

?

第二个C端的程序如下:?

Java代码??收藏代码
  1. package?com.abin.rabbitmq;??
  2. ??
  3. import?com.rabbitmq.client.Channel;??
  4. import?com.rabbitmq.client.Connection;??
  5. import?com.rabbitmq.client.ConnectionFactory;??
  6. import?com.rabbitmq.client.QueueingConsumer;??
  7. ??
  8. public?class?ReceiveLogsTopicTwo?{??
  9. ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs";//定义Exchange名称??
  10. ??
  11. ????public?static?void?main(String[]?argv)?throws?Exception?{??
  12. ??
  13. ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
  14. ????????factory.setHost("localhost");??
  15. ????????Connection?connection?=?factory.newConnection();??
  16. ????????Channel?channel?=?connection.createChannel();??
  17. ??
  18. ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic");//声明topic类型的Exchange??
  19. ??
  20. ????????String?queueName?=?"queue_topic_logs2";//定义队列名为“queue_topic_logs2”的Queue??
  21. ????????channel.queueDeclare(queueName,?false,?false,?false,?null);??
  22. ????????String?routingKeyOne?=?"logs.#";//通配所有logs下的消息??
  23. ????????channel.queueBind(queueName,?EXCHANGE_NAME,?routingKeyOne);//把Queue、Exchange及路由绑定??
  24. ??
  25. ????????System.out.println("?[*]?Waiting?for?messages.");??
  26. ??
  27. ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
  28. ????????channel.basicConsume(queueName,?true,?consumer);??
  29. ??
  30. ????????while?(true)?{??
  31. ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
  32. ????????????String?message?=?new?String(delivery.getBody());??
  33. ????????????String?routingKey?=?delivery.getEnvelope().getRoutingKey();??
  34. ??
  35. ????????????System.out.println("?[x]?Received?'"?+?routingKey?+?"':'"?+?message??
  36. ????????????????????+?"'");??
  37. ????????}??
  38. ????}??
  39. }??

?

第二个C端的运行结果如下:

Java代码??收藏代码
  1. [*]?Waiting?for?messages.??
  2. [x]?Received?'logs.error.one':'this?is?one?error?logs:0'??
  3. [x]?Received?'logs.error.one':'this?is?one?error?logs:1'??
  4. [x]?Received?'logs.error.two':'this?is?two?error?logs:0'??
  5. [x]?Received?'logs.error.two':'this?is?two?error?logs:1'??
  6. [x]?Received?'logs.error.two':'this?is?two?error?logs:2'??
  7. [x]?Received?'logs.info.one':'this?is?one?info?logs:0'??
  8. [x]?Received?'logs.info.one':'this?is?one?info?logs:1'??
  9. [x]?Received?'logs.info.one':'this?is?one?info?logs:2'??
  10. [x]?Received?'logs.info.one':'this?is?one?info?logs:3'??

?

发表评论
用户名: 匿名