如何重rabbitmq 数据持久化中读取大量数据然后将读取的数据同过接口发送

rabbitmq可靠确认模式的java封装及示例 - 简书
rabbitmq可靠确认模式的java封装及示例
rabbitmq.png
最近的一个计费项目,在rpc调用和流式处理之间徘徊了许久,后来选择流式处理。一是可以增加吞吐量,二是事务的控制相比于rpc要容易很多。
确定了流式处理的方式,后续是技术的选型。刚开始倾向于用storm,无奈文档实在太少,折腾起来着实费劲。最终放弃,改用消息队列+微服务的方式实现。
消息队列的选型上,有activemq,rabbitmq,kafka等。最开始倾向于用activemq,因为以前的项目用过,很多代码都是可直接复用的。后来看了不少文章对比,发现rabbitmq对多语言的支持更好一点,同时相比于kafka,牺牲了部分的性能换取了更好的稳定性安全性以及持久化。
最终决定使用rabbitmq。rabbitmq的官网如下:
对rabbitmq的封装,有几个目标:1 提供send接口2 提供consume接口3 保证消息的事务性处理
所谓事务性处理,是指对一个消息的处理必须严格可控,必须满足原子性,只有两种可能的处理结果:(1) 处理成功,从队列中删除消息(2) 处理失败(网络问题,程序问题,服务挂了),将消息重新放回队列为了做到这点,我们使用rabbitmq的手动ack模式,这个后面细说。
1 send接口
public interface MessageSender {
DetailRes send(Object message);
send接口相对简单,我们使用spring的RabbitTemplate来实现,代码如下:
//1 构造template, exchange, routingkey等
//2 设置message序列化方法
//3 设置发送确认
//4 构造sender方法
public MessageSender buildMessageSender(final String exchange, final String routingKey, final String queue) throws IOException, TimeoutException {
Connection connection = connectionFactory.createConnection();
buildQueue(exchange, routingKey, queue, connection);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(routingKey);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
("send message failed: " + cause); //+ correlationData.toString());
throw new RuntimeException("send error " + cause);
return new MessageSender() {
public DetailRes send(Object message) {
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException e) {
e.printStackTrace();
("send failed " + e);
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException error) {
error.printStackTrace();
("send failed again " + error);
return new DetailRes(false, error.toString());
return new DetailRes(true, "");
2 consume接口
public interface MessageConsumer {
DetailRes consume();
在consume接口中,会调用用户自己的MessageProcess,接口定义如下:
public interface MessageProcess&T& {
DetailRes process(T message);
consume的实现相对来说复杂一点,代码如下:
//1 创建连接和channel
//2 设置message序列化方法
//3 构造consumer
public &T& MessageConsumer buildMessageConsumer(String exchange, String routingKey,
final String queue, final MessageProcess&T& messageProcess) throws IOException {
final Connection connection = connectionFactory.createConnection();
buildQueue(exchange, routingKey, queue, connection);
final MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
final MessageConverter messageConverter = new Jackson2JsonMessageConverter();
return new MessageConsumer() {
consumer = buildQueueConsumer(connection, queue);
//1 通过delivery获取原始数据
//2 将原始数据转换为特定类型的包
//3 处理数据
//4 手动发送ack确认
public DetailRes consume() {
QueueingConsumer.Delivery delivery =
Channel channel = consumer.getChannel();
delivery = consumer.nextDelivery();
Message message = new Message(delivery.getBody(),
messagePropertiesConverter.toMessageProperties(delivery.getProperties(), delivery.getEnvelope(), "UTF-8"));
@SuppressWarnings("unchecked")
T messageBean = (T) messageConverter.fromMessage(message);
DetailRes detailRes = messageProcess.process(messageBean);
if (detailRes.isSuccess()) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
("send message failed: " + detailRes.getErrMsg());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
return detailR
} catch (InterruptedException e) {
e.printStackTrace();
return new DetailRes(false, "interrupted exception " + e.toString());
} catch (IOException e) {
e.printStackTrace();
retry(delivery, channel);
("io exception : " + e);
return new DetailRes(false, "io exception " + e.toString());
} catch (ShutdownSignalException e) {
e.printStackTrace();
channel.close();
} catch (IOException io) {
io.printStackTrace();
} catch (TimeoutException timeout) {
timeout.printStackTrace();
consumer = buildQueueConsumer(connection, queue);
return new DetailRes(false, "shutdown exception " + e.toString());
} catch (Exception e) {
e.printStackTrace();
("exception : " + e);
retry(delivery, channel);
return new DetailRes(false, "exception " + e.toString());
3 保证消息的事务性处理rabbitmq默认的处理方式为auto ack,这意味着当你从消息队列取出一个消息时,ack自动发送,mq就会将消息删除。而为了保证消息的正确处理,我们需要将消息处理修改为手动确认的方式。(1) sender的手工确认模式首先将ConnectionFactory的模式设置为publisherConfirms,如下
connectionFactory.setPublisherConfirms(true);
之后设置rabbitTemplate的confirmCallback,如下:
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
("send message failed: " + cause); //+ correlationData.toString());
throw new RuntimeException("send error " + cause);
(2) consume的手工确认模式首先在queue创建中指定模式
channel.exchangeDeclare(exchange, "direct", true, false, null);
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
channel.queueDeclare(queue, true, false, false, null);
只有在消息处理成功后发送ack确认,或失败后发送nack使信息重新投递
if (detailRes.isSuccess()) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
("send message failed: " + detailRes.getErrMsg());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
4 自动重连机制为了保证rabbitmq的高可用性,我们使用rabbitmq Cluster模式,并配合haproxy。这样,在一台机器down掉时或者网络发生抖动时,就会发生当前连接失败的情况,如果不对这种情况做处理,就会造成当前的服务不可用。在spring-rabbitmq中,已实现了connection的自动重连,但是connection重连后,channel的状态并不正确。因此我们需要自己捕捉ShutdownSignalException异常,并重新生成channel。如下:
catch (ShutdownSignalException e) {
e.printStackTrace();
channel.close();
//recreate channel
consumer = buildQueueConsumer(connection, queue);
5 consumer线程池在对消息处理的过程中,我们期望多线程并行执行来增加效率,因此对consumer做了一个线程池的封装。线程池通过builder模式构造,需要准备如下参数:
//线程数量
int threadC
//处理间隔(每个线程处理完成后休息的时间)
long intervalM
//exchange及queue信息
String routingK
//用户自定义处理接口
MessageProcess&T& messageP
核心循环也较为简单,代码如下:
public void run() {
while (!stop) {
DetailRes detailRes = messageConsumer.consume();
if (infoHolder.intervalMils & 0) {
Thread.sleep(infoHolder.intervalMils);
} catch (InterruptedException e) {
e.printStackTrace();
("interrupt " + e);
if (!detailRes.isSuccess()) {
("run error " + detailRes.getErrMsg());
} catch (Exception e) {
e.printStackTrace();
("run exception " + e);
6 使用示例最后,我们还是用一个例子做结。(1) 定义model
//参考lombok
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage {
(2) rabbitmq配置配置我们使用@Configuration实现,如下:
@Configuration
public class RabbitMQConf {
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPublisherConfirms(true); // enable confirm mode
return connectionF
(3) sender示例
public class SenderExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionF
private MessageSender messageS
@PostConstruct
public void init() throws IOException, TimeoutException {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
messageSender = mqAccessBuilder.buildMessageSender(EXCHANGE, ROUTING, QUEUE);
public DetailRes send(UserMessage userMessage) {
return messageSender.send(userMessage);
(4) MessageProcess(用户自定义处理接口)示例,本例中我们只是简单的将信息打印出来
public class UserMessageProcess implements MessageProcess&UserMessage& {
public DetailRes process(UserMessage userMessage) {
System.out.println(userMessage);
return new DetailRes(true, "");
(5) consumer示例
public class ConsumerExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionF
private MessageConsumer messageC
@PostConstruct
public void init() throws IOException, TimeoutException {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
messageConsumer = mqAccessBuilder.buildMessageConsumer(EXCHANGE, ROUTING, QUEUE, new UserMessageProcess());
public DetailRes consume() {
return messageConsumer.consume();
(6) 线程池consumer示例在main函数中,我们使用一个独立线程发送数据,并使用线程池接收数据。
public class PoolExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionF
private ThreadPoolConsumer&UserMessage& threadPoolC
@PostConstruct
public void init() {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
MessageProcess&UserMessage& messageProcess = new UserMessageProcess();
threadPoolConsumer = new ThreadPoolConsumer.ThreadPoolConsumerBuilder&UserMessage&()
.setThreadCount(Constants.THREAD_COUNT).setIntervalMils(Constants.INTERVAL_MILS)
.setExchange(EXCHANGE).setRoutingKey(ROUTING).setQueue(QUEUE)
.setMQAccessBuilder(mqAccessBuilder).setMessageProcess(messageProcess)
public void start() throws IOException {
threadPoolConsumer.start();
public void stop() {
threadPoolConsumer.stop();
public static void main(String[] args) throws IOException {
ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
PoolExample poolExample = ac.getBean(PoolExample.class);
final SenderExample senderExample = ac.getBean(SenderExample.class);
poolExample.start();
new Thread(new Runnable() {
int id = 0;
public void run() {
while (true) {
senderExample.send(new UserMessage(id++, "" + System.nanoTime()));
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}).start();
7 github地址,路过的帮忙点个星星,谢谢^_^。
附:rabbitmq安装过程:mac版安装可以使用homebrew。brew install就可以,安装好之后通过brew services start rabbitmq启动服务。通过
就可以在页面端看到rabbitmq了,如下:
rabbitmq_manager.png
想来快手的同学可以把简历发到我邮箱:
“大圣,此去欲何?”
“踏南天,碎凌霄。”
“若一去不回……”
“便一去不回!”2012年8月 总版技术专家分月排行榜第三2012年7月 总版技术专家分月排行榜第三
2014年8月 高性能开发大版内专家分月排行榜第一2013年11月 高性能开发大版内专家分月排行榜第一2012年12月 Java大版内专家分月排行榜第一2012年11月 Java大版内专家分月排行榜第一2012年10月 Java大版内专家分月排行榜第一2012年9月 Java大版内专家分月排行榜第一2012年8月 Java大版内专家分月排行榜第一2012年7月 Java大版内专家分月排行榜第一2012年6月 Java大版内专家分月排行榜第一2012年5月 Java大版内专家分月排行榜第一2012年4月 Java大版内专家分月排行榜第一2012年3月 Java大版内专家分月排行榜第一2012年2月 Java大版内专家分月排行榜第一2012年1月 Java大版内专家分月排行榜第一
2012年8月 总版技术专家分月排行榜第三2012年7月 总版技术专家分月排行榜第三
2014年8月 高性能开发大版内专家分月排行榜第一2013年11月 高性能开发大版内专家分月排行榜第一2012年12月 Java大版内专家分月排行榜第一2012年11月 Java大版内专家分月排行榜第一2012年10月 Java大版内专家分月排行榜第一2012年9月 Java大版内专家分月排行榜第一2012年8月 Java大版内专家分月排行榜第一2012年7月 Java大版内专家分月排行榜第一2012年6月 Java大版内专家分月排行榜第一2012年5月 Java大版内专家分月排行榜第一2012年4月 Java大版内专家分月排行榜第一2012年3月 Java大版内专家分月排行榜第一2012年2月 Java大版内专家分月排行榜第一2012年1月 Java大版内专家分月排行榜第一
2012年8月 总版技术专家分月排行榜第三2012年7月 总版技术专家分月排行榜第三
2014年8月 高性能开发大版内专家分月排行榜第一2013年11月 高性能开发大版内专家分月排行榜第一2012年12月 Java大版内专家分月排行榜第一2012年11月 Java大版内专家分月排行榜第一2012年10月 Java大版内专家分月排行榜第一2012年9月 Java大版内专家分月排行榜第一2012年8月 Java大版内专家分月排行榜第一2012年7月 Java大版内专家分月排行榜第一2012年6月 Java大版内专家分月排行榜第一2012年5月 Java大版内专家分月排行榜第一2012年4月 Java大版内专家分月排行榜第一2012年3月 Java大版内专家分月排行榜第一2012年2月 Java大版内专家分月排行榜第一2012年1月 Java大版内专家分月排行榜第一
2012年8月 总版技术专家分月排行榜第三2012年7月 总版技术专家分月排行榜第三
2014年8月 高性能开发大版内专家分月排行榜第一2013年11月 高性能开发大版内专家分月排行榜第一2012年12月 Java大版内专家分月排行榜第一2012年11月 Java大版内专家分月排行榜第一2012年10月 Java大版内专家分月排行榜第一2012年9月 Java大版内专家分月排行榜第一2012年8月 Java大版内专家分月排行榜第一2012年7月 Java大版内专家分月排行榜第一2012年6月 Java大版内专家分月排行榜第一2012年5月 Java大版内专家分月排行榜第一2012年4月 Java大版内专家分月排行榜第一2012年3月 Java大版内专家分月排行榜第一2012年2月 Java大版内专家分月排行榜第一2012年1月 Java大版内专家分月排行榜第一
2015年7月 Web 开发大版内专家分月排行榜第三
2017年2月 总版技术专家分月排行榜第三
2017年5月 .NET技术大版内专家分月排行榜第一2017年4月 .NET技术大版内专家分月排行榜第一2017年3月 .NET技术大版内专家分月排行榜第一2017年2月 .NET技术大版内专家分月排行榜第一2016年10月 .NET技术大版内专家分月排行榜第一2016年8月 .NET技术大版内专家分月排行榜第一2016年7月 .NET技术大版内专家分月排行榜第一
2017年2月 总版技术专家分月排行榜第三
2017年5月 .NET技术大版内专家分月排行榜第一2017年4月 .NET技术大版内专家分月排行榜第一2017年3月 .NET技术大版内专家分月排行榜第一2017年2月 .NET技术大版内专家分月排行榜第一2016年10月 .NET技术大版内专家分月排行榜第一2016年8月 .NET技术大版内专家分月排行榜第一2016年7月 .NET技术大版内专家分月排行榜第一
2017年2月 总版技术专家分月排行榜第三
2017年5月 .NET技术大版内专家分月排行榜第一2017年4月 .NET技术大版内专家分月排行榜第一2017年3月 .NET技术大版内专家分月排行榜第一2017年2月 .NET技术大版内专家分月排行榜第一2016年10月 .NET技术大版内专家分月排行榜第一2016年8月 .NET技术大版内专家分月排行榜第一2016年7月 .NET技术大版内专家分月排行榜第一
2017年2月 总版技术专家分月排行榜第三
2017年5月 .NET技术大版内专家分月排行榜第一2017年4月 .NET技术大版内专家分月排行榜第一2017年3月 .NET技术大版内专家分月排行榜第一2017年2月 .NET技术大版内专家分月排行榜第一2016年10月 .NET技术大版内专家分月排行榜第一2016年8月 .NET技术大版内专家分月排行榜第一2016年7月 .NET技术大版内专家分月排行榜第一
本帖子已过去太久远了,不再提供回复功能。NET下RabbitMQ实践 [WCF发布篇]-阿里云资讯网
NET下RabbitMQ实践 [WCF发布篇]
发布时间:
更新时间:
来源:网络
上传者:用户
  在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写。今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布。  注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代码演示,很适合初学者,所以我就偷了个懒,直接对照它的SAMPLE来说明了,算是借花献佛吧,呵呵。我们下载相应源码(基于.NET 3.0),本文主要对该源码包中的代码进行讲解,链接如下:&&&   Binary, compiled for .NET 3.0 and newer (zip) - includes example code, the WCF binding and WCF examples   当然官方还提供了基本.NET 2.0 版本的示例版本,但其中只是一些简单的示例,并不包括WCF部分,这里只发个链接,感兴趣的朋友可自行研究。&&&   Binary, compiled for .NET 2.0 (zip) - includes example code&& &&&   下载基于.NET 3.0的版本源码之后,解压其中的projects\examples\wcf目录,可看到如下的项目:&&&   几个文件夹分别对应如下应用场景:  OneWay: 单向通信(无返回值)  TwoWay: 双向通信(请求/响应)  Session:会话方式  Duplex: 双向通信(可以指定一个Callback回调函数)  OneWay&&   在OneWayTest示例中,演示了插入日志数据,因为日志操作一般只是单纯的写入操作,不考虑返回值,所以使用OneWay方式。下面是其WCF接口声明和实例代码,如下:&& &&&&&&&[ServiceContract]
&&&&public&interface&ILogServiceContract
&&&&&&&&[OperationContract(IsOneWay=true)]
&&&&&&&&void&Log(LogData&entry);
&&&&[ServiceBehavior(InstanceContextMode&=&InstanceContextMode.Single)]
&&&&public&class&LogService&:&ILogServiceContract
&&&&&&&&public&int&m_i;
&&&&&&&&public&void&Log(LogData&entry)
&&&&&&&&&&&&Util.WriteLine(ConsoleColor.Magenta,&&&&[SVC]&{3}&[{0,-6}]&{1,&12}:&{2}&,&entry.Level,&entry.TimeStamp,&entry.Message,&m_i++);
  其只包含一个方法:Log(LogData entry) ---用于添加日志记录,可以看出它与我们以往写WCF代码没什么两样。这里要说明一下,在类属性InstanceContextMode枚举类型中,使用了“Single”模式,而该枚举提供了如下三种情况: &&&&&&   Single - 为所有客户端调用分配一个服务实例。  PerCall – 为每个客户端调用分配一个服务实例。  PerSession – 为每个客户端会话分配一个服务实例。每个Session内多线程操作实例的话会有并发问题。 &&&&&&   InstanceContextMode 的默认设置为 PerSession。  这三个值通常是要与并发模式(ConcurrencyMode)搭配使用,以解决并发效率,共享资源等场景下的问题的。下面是并发模式的说明:  ConcurrencyMode 控制一次允许个线程进入服务。ConcurrencyMode 可以设置为以下值之一:&&&   Single - 一次可以有一个线程进入服务。  Reentrant - 一次可以有一个线程进入服务,但允许回调。  Multiple - 一次可以有多个线程进入服务。&& &&&&   ConcurrencyMode 的默认设置为 Single。&&&   InstanceContextMode 和 ConcurrencyMode 设置会相互影响,因此为了提升并发效能,必须协调这两项设置。&&&   ,将 InstanceContextMode 设置为 PerCall 时,会 ConcurrencyMode 设置。这是因为,每个客户端调用都将路由到新的服务实例,因此一次只会有一个线程在服务实例中运行。对于PerCall的实例模型,每个客户端请求都会与服务端的一个独立的服务实例进行交互,就不会出现多个客户端请求争用一个服务实例的情况,也就不会出现并发冲突,不会影响吞吐量的问题。但对于实例内部的共享变量(static)还是会可能出现冲突。  但对于当前Single设置,原因很多,可能包括:  1. 创建服务实例需要的处理工作。当多个客户端访问服务时,仅允许创建一个服务实例可以降低所需处理量。  2. 可以降低垃圾回收成本,因为不必为每个调用创建和销毁服务创建的对象。  3. 可以在多个客户端之间共享服务实例。  4. 避免对static静态属性的访问冲突。&&&   但如果使用Single,问题也就出来了---就是性能,因为如果 ConcurrencyMode也同时设置成Single时,当前示例中的(唯一)服务实例不会同时处理多个(单线程客户端)请求。因为服务在处理请求时会对当前服务加锁,如果再有其它请求需要该服务处理的时候,需要排队等候。如果有大量客户端访问,这可能会导致较大的瓶颈。  当然如果考虑到多线程客户端使用的情况,可能问题会更严重。&   聊了这些,无非就是要结合具体应用场景来灵活搭配ConcurrencyMode,InstanceContextMode这两个枚举值。下面言归正传,一下如何将该服务与RabbitMQ进行绑定,以实现以WCF方式访问RabbitMQ服务的效果。这里暂且略过LogData数据结构信息类,直接看一下如果绑定服务代码(位于OneWayTest.cs):&&&private&ServiceHost&m_
public&void&StartService(Binding&binding)
&&&&m_host&=&new&ServiceHost(typeof(LogService),&new&Uri(&soap.amqp:///&));
&&&&((RabbitMQBinding)binding).OneWayOnly&=&&&&&
&&&&m_host.AddServiceEndpoint(typeof(ILogServiceContract),&binding,&&LogService&);
&&&&m_host.Open();
&&&&m_serviceStarted&=&&&
  StartService方法的主体与我们平时启动WCF服务的方法差不多,只不过是将其中的URL协议部分换成了“soap.amqp”形式,而其中的传入参数binding则是RabbitMQBinding类型,该类型是rabbitmq客户端类库提供的用于对应Binding类的RabbitMQBinding实现。下面就是其类实始化代码:&&&&&&&&return&new&RabbitMQBinding(System.Configuration.ConfigurationManager.AppSettings[&manual-test-broker-uri&],RabbitMQ.Client.Protocols.FromConfiguration(&manual-test-broker-protocol&));
&&& 其包括两个参数,一个是rabbitmq服务地址,一个是所用的协议,其对应示例app.config文件中的如下结点:&&&&&add&key=&manual-test-broker-uri&&value=&amqp://10.0.4.79:5672/&/&&&!--本系列第一篇中的环境设置--&
&add&key=&manual-test-broker-protocol&&value=&AMQP_0_8&/&
  这样,我们就完成了初始化服务实例工作。接着来构造客户端代码,如下:&&&private&ChannelFactory&ILogServiceContract&&m_
private&ILogServiceContract&m_&&&&&&&&&&&&&&&&&&&&&&&&&&
public&ILogServiceContract&GetClient(Binding&binding)
&&&&((RabbitMQBinding)binding).OneWayOnly&=&
&&&&m_factory&=&new&ChannelFactory&ILogServiceContract&(binding,&&soap.amqp:///LogService&);
&&&&m_factory.Open();
&&&&return&m_factory.CreateChannel();
  与平时写的代码相似,但传入参数就是上面提到的那个RabbitMQBinding实例,这样通过下面代码访问WCF中的LOG方法:&&&&m_client&=&GetClient(Program.GetBinding());
&&&&m_client.Log(new&LogData(LogLevel.High,&&Hello&Rabbit&));
&&&&m_client.Log(new&LogData(LogLevel.Medium,&&Hello&Rabbit&));
  到这里,我们可以看出,它的实现还是很简单的。我们只要把10.0.4.79:5672上的rabbitmq的环境跑起来,就可以看出最终的效果了。 之后我将C#的服务端(startservice)与客户端(getclient)分开布署到不同IP的主机上,也实现了示例中的结果。&&&   TwoWay&&&   下面介绍一下 TwoWay双向通信示例,首先是WCF接口声明和实现:&&&&&&&&&[ServiceContract]
&&&&public&interface&ICalculator
&&&&&&&&[OperationContract]
&&&&&&&&int&Add(int&x,&int&y);
&&&&&&&&[OperationContract]
&&&&&&&&int&Subtract(int&x,&int&y);
&& [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerCall)]&/*为每个客户端调用分配一个服务实例*/
&&&&public&sealed&class&Calculator&:&ICalculator
&&&&&&&&public&int&Add(int&x,&int&y)
&&&&&&&&&&&&return&x&+&y;
&&&&&&&&public&int&Subtract(int&x,&int&y)
&&&&&&&&&&&&return&x&-&y;
  因为其服务的启动startservice和客户端实例构造与oneway方法类似,为了节约篇幅,这时就略过了,下面是其最终调用代码(位于TwoWayTest.cs):& public&void&Run()
&&&&&StartService(Program.GetBinding());
&&&&&ICalculator&calc&=&GetClient(Program.GetBinding());
&&&&&int&result&=&0,&x&=&3,&y&=&4;
&&&&&Util.WriteLine(ConsoleColor.Magenta,&&&&{0}&+&{1}&=&{2}&,&x,&y,&result&=&calc.Add(x,&y));
&&&&&if&(result&!=&x&+&y)
&&&&&&&&&throw&new&Exception(&Test&Failed&);
&&&&......
  与普通的WCF TWOWAY 返回调用方式相同,就不多说了。&&&   Session&&&   下面是基于Session会话方式的代码,WCF接口声明和实现:&&&& [ServiceContract(SessionMode=&SessionMode.Required)]
&&&&public&interface&ICart
&&&&&&&&[OperationContract]
&&&&&&&&void&Add(CartItem&item);
&&&&&&&&[OperationContract]
&&&&&&&&double&GetTotal();
&&&&&&&&Guid&Id&{&[OperationContract]&}
&&&&[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
&&&&public&class&Cart&:&ICart
&&&&&&&&public&Cart()
&&&&&&&&&&&&Items&=&new&List&CartItem&();
&&&&&&&&&&&&m_id&=&Guid.NewGuid();
&&&&&&&&private&Guid&m_
&&&&&&&&private&List&CartItem&&m_
&&&&&&&&private&List&CartItem&&Items&{
&&&&&&&&&&&&get&{&return&m_&}
&&&&&&&&&&&&set&{&m_items&=&&}
&&&&&&&&public&void&Add(CartItem&item)
&&&&&&&&&&&&Items.Add(item);
&&&&&&&&public&double&GetTotal()
&&&&&&&&&&&&double&total&=&0;
&&&&&&&&&&&&foreach&(CartItem&i&in&Items)
&&&&&&&&&&&&&&&&total&+=&i.P
&&&&&&&&&&&&return&
&&&&&&&&public&Guid&Id&{&get&{&return&m_&}&}
  该接口实现一个购物车功能,可以添加商品并计算总价,考虑到并发场景,这里将例为PerSession枚举类型,即为每个客户端会话分配一个服务实例。这样就可以在用户点击购买一件商品里,为其购物车商品列表List&CartItem&添加一条信息,而不会与其它用户的购物车商品列表相冲突。其最终的调用方法如下:&&&public&void&Run()
&&&&StartService(Program.GetBinding());
&&&&ICart&cart&=&GetClient(Program.GetBinding());
&&&&AddToCart(cart,&&Beans&,&0.49);//添加商品到购物车
&&&&AddToCart(cart,&&Bread&,&0.89);
&&&&AddToCart(cart,&&Toaster&,&4.99);
&&&&double&total&=&cart.GetTotal();//计算总价
&&&&if&(total&!=&(0.49&+&0.89&+&4.99))
&&&&&&&&throw&new&Exception(&Incorrect&Total&);
&&&&......
  Duplex&&&   最后,再介绍一下如何基于Duplex双向通信模式进行开发,DuplexTest这是个“PIZZA订单”的场景,用户下单之后,等待服务端将PIZZA加工完毕,然后服务端用callback方法通知客户端PIZZA已做好,相应WCF接口声明和实现如下:&&& [ServiceContract(CallbackContract=typeof(IPizzaCallback))]&/*绑定回调接口*/
&&&&public&interface&IPizzaService
&&&&&&&&[OperationContract(IsOneWay=true)]
&&&&&&&&void&PlaceOrder(Order&order);
&&&&[ServiceContract]
&&&&public&interface&IPizzaCallback
&&&&&&&&[OperationContract(IsOneWay=true)]
&&&&&&&&void&OrderReady(Guid&id);&/*用于通知客户端*/
&&&&public&class&PizzaService&:&IPizzaService
&&&&&&&&public&void&PlaceOrder(Order&order)
&&&&&&&&&&&&foreach&(Pizza&p&in&order.Items)
&&&&&&&&&&&&{
&&&&&&&&&&&&&&&&Util.WriteLine(ConsoleColor.Magenta,&&&&[SVC]&Cooking&a&{0}&{1}&Pizza...&,&p.Base,&p.Toppings);
&&&&&&&&&&&&}
&&&&&&&&&&&&Util.WriteLine(ConsoleColor.Magenta,&&&&[SVC]&Order&{0}&is&Ready!&,&order.Id);
&&&&&&&&&&&&Callback.OrderReady(order.Id);
&&&&&&&&IPizzaCallback&Callback
&&&&&&&&&&&&get&{&return&OperationContext.Current.GetCallbackChannel&IPizzaCallback&();&}&//当前上下文中调用客户端绑定的回调方法
  这里要说明的是IPizzaCallback接口的OrderReady方法被绑定了IsOneWay=true属性,主要是因为如果使用“请求-响应”模式,客户端必须等服务端“响应”完成上一次“请求”后才能发出下一步“请求”。因此虽然客户端可以使用多线程方式来调用服务,但最后的执行结果仍然表现出顺序处理(效率低)。要想使服务端能够并行处理客户端请求的话,那我们就不能使用“请求-响应”的调用模式,所以这里使用One-Way的方式来调用服务。  下面是客户端回调接口实现:&&&&&&&&& public&class&PizzaClient&:&DuplexClientBase&IPizzaService&,&IPizzaService
&&&&&&&&public&PizzaClient(InstanceContext&context,&Binding&binding,&EndpointAddress&remoteAddress)
&&&&&&&&&&&&:&base(context,&binding,&remoteAddress)&{&}
&&&&&&&&public&void&PlaceOrder(Order&order)
&&&&&&&&&&&&Channel.PlaceOrder(order);
  最终客户端实例化(startservice)略过,因与之前示例类似。&&&&&& public&IPizzaService&GetClient(Binding&binding)
&&&&&&&&PizzaClient&client&=&new&PizzaClient(new&InstanceContext(this),&binding,&new&EndpointAddress(serverUri.ToString()));
&&&&&&&&client.Open();
&&&&&&&&return&
  上面的方法中将当前客户端实例this(实现了IServiceTest&IPizzaService&, IPizzaCallback接口)注册到上下文中,目的是为了将其方法的回传调用传递到服务端(还记得服务端的这行代码吗?=&Callback.OrderReady(order.Id))public&void&OrderReady(Guid&id)
&&&&Util.WriteLine(ConsoleColor.Magenta,&&&&[CLI]&Order&{0}&has&been&delivered.&,id);
&&&&mre.Set();
  这样,服务端完成pizza时,就可以调用客户端的OrderReady方法来实现通知功能了。下面就是一个整个的下单流程实现:
本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:zixun-group@ 进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。
1、递规法:利用递归一层一层的删。 以下为引用的内容: deleteDir($dir) { if (rmdir($dir)==false && is_dir($dir)) { &if ($dp = opendir($dir)) { & while (($file=readdir($dp)) != false) { && if ...
PHP中从一个页面重定向到另外一个页面三种办法,供参考。 一、用HTTP头信息 也就是用PHP的HEADER函数。PHP里的HEADER函数的作用就是向浏览器发出由HTTP协议规定的本来应该通过WEB服务器的控制指令,例如声明返回信息的类型(&Context-type: xxx/xxx&),页面的属性(&No cache&, &Expire&...
前天去面试的时候也是这样,不过我当时记不起来了.就记得MYSQL里面的date_sub(now(),‘interval 1 day‘);date(‘Y/m/d h:i:s‘,mktime(date(‘h‘), date(‘i‘), date(‘s‘), date(‘m‘) , date(‘d‘)+1, date(‘Y‘))); --------------------------------...
下面带大家走进PHP的入门之路 说明: 我这里暂时是以 Apache web server 和 MY SQL 作为WEB服务器和数据库,在php-4.3.3下的环境做的程序。当然要简单的构建和访问查看数据库 PHPMYADMIN 不可少 这里需要懂得HTML基础知识!没有HTML基础知识的!可以去百度或者GOOGLE搜下!很简单的!这里就不多说了 好了我们开始吧!我们就把PHP入门当成一个苹...
十四条令PHP初学者头疼问题大总结 今天为大家奉上令PHP初学者头疼问题大总结,下面提出的14个问题希望对PHP初学者有所帮助。      1、页面之间无法传递变量      get,post,session在最新的php版本中自动全局变量是关闭的,所以要从上一页面取得提交过来得变量要使用$_GET[‘foo‘],$_POST[‘foo‘],$_SESSION[‘foo‘]来得到。当然也可以...
学习php中10个基础知识总结 看了些PHP的基础知识,自己在这里总结下: 1,在HTML嵌入PHP脚本有三种办法: 以下为引用的内容: &script language=&php&& //嵌入方式一 echo(&test&); &/script& &? //嵌入方式二 echo &&br&g...
若您要投稿、删除文章请联系邮箱:zixun-group@,工作人员会在5个工作日内回复。
售前咨询热线
服务与支持
账号与支持
关注阿里云
International

我要回帖

更多关于 rabbitmq 数据一致性 的文章

 

随机推荐