SimpleMessageListenerContainer与DirectMessageListenerContainer详细介绍
SimpleMessageListenerContainer

消费者消费消息-单线程
- 通常一个应用创建一个connection连接(通过TCP实现)当然可以创建多个
- 一个connection可被多个channel复用
- 一个channel是单线程的
- channel单线程串行消费消息
消费者消费消息-多线程
- 通常一个应用创建一个connection连接(通过TCP实现)当然可以创建多个
- 分布式服务创建各自对应的connection
- 一个connection可被多个channel复用
- 设置参数concurrency来控制线程数,相当于为每一个消费者创建了多个channel
- 消费者对应的多个channel可以被不同的微服务共享,例如针对消费者consumer-a的Channel-a/Channel-b可以被微服务Application-b/Application-b1公用,只是线程不同
- channel单线程串行消费消息
基本参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer { private static final long DEFAULT_START_CONSUMER_MIN_INTERVAL = 10000L; private static final long DEFAULT_STOP_CONSUMER_MIN_INTERVAL = 60000L; private static final long DEFAULT_CONSUMER_START_TIMEOUT = 60000L; private static final int DEFAULT_CONSECUTIVE_ACTIVE_TRIGGER = 10; private static final int DEFAULT_CONSECUTIVE_IDLE_TRIGGER = 10; public static final long DEFAULT_RECEIVE_TIMEOUT = 1000L; private final AtomicLong lastNoMessageAlert = new AtomicLong(); private final AtomicReference<Thread> containerStoppingForAbort = new AtomicReference(); private final BlockingQueue<ListenerContainerConsumerFailedEvent> abortEvents = new LinkedBlockingQueue(); private volatile long startConsumerMinInterval = 10000L; private volatile long stopConsumerMinInterval = 60000L; private volatile int consecutiveActiveTrigger = 10; private volatile int consecutiveIdleTrigger = 10; private volatile int txSize = 1; private volatile int concurrentConsumers = 1; private volatile Integer maxConcurrentConsumers; private volatile long lastConsumerStarted; private volatile long lastConsumerStopped; private long receiveTimeout = 1000L; private Set<BlockingQueueConsumer> consumers; private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock = new ActiveObjectCounter(); private Integer declarationRetries; private Long retryDeclarationInterval; private TransactionTemplate transactionTemplate; private long consumerStartTimeout = 60000L;
|
- concurrentConsumers:设置当前消费者数量,默认一个,初始化时自动设置
- maxConcurrentConsumers:设置最大消费者数量,根据消息数量动态浮动
- consumers:消费者集合,每一个consumer对应一个channel对应一个线程
Consumers初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| protected int initializeConsumers() { int count = 0; synchronized(this.consumersMonitor) { if (this.consumers == null) { this.cancellationLock.reset(); this.consumers = new HashSet(this.concurrentConsumers);
for(int i = 0; i < this.concurrentConsumers; ++i) { BlockingQueueConsumer consumer = this.createBlockingQueueConsumer(); this.consumers.add(consumer); ++count; } }
return count; } }
|
根据设定的concurrentConsumers来初始化数量
BlockingQueueConsumer基本参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| private static Log logger = LogFactory.getLog(BlockingQueueConsumer.class); private final BlockingQueue<Delivery> queue; private volatile ShutdownSignalException shutdown; private final String[] queues; private final int prefetchCount; private final boolean transactional; private Channel channel; private RabbitResourceHolder resourceHolder; private BlockingQueueConsumer.InternalConsumer consumer; private final AtomicBoolean cancelled; private final AcknowledgeMode acknowledgeMode; private final ConnectionFactory connectionFactory; private final MessagePropertiesConverter messagePropertiesConverter; private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter; private final Map<String, Object> consumerArgs; private final boolean noLocal; private final boolean exclusive; private final Set<Long> deliveryTags; private final boolean defaultRequeueRejected; private final Map<String, String> consumerTags; private final Set<String> missingQueues; private long retryDeclarationInterval; private long failedDeclarationRetryInterval; private int declarationRetries; private long lastRetryDeclaration; private ConsumerTagStrategy tagStrategy; private BackOffExecution backOffExecution; private long shutdownTimeout; private boolean locallyTransacted; private ApplicationEventPublisher applicationEventPublisher; private volatile long abortStarted; private volatile boolean normalCancel; volatile Thread thread; volatile boolean declaring;
|
- 一个消费者可以监听多个队列
- prefetchCount:用于控制每次从队列抓取消息的数量
- Channel:对应的channel信息
- thread:消费者对应的线程对象
线程处理核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public void start() throws AmqpException { ...... try { if (!this.cancelled()) { String[] var2 = this.queues; int var3 = var2.length;
for(int var4 = 0; var4 < var3; ++var4) { String queueName = var2[var4]; if (!this.missingQueues.contains(queueName)) { this.consumeFromQueue(queueName); } } }
} catch (IOException var9) { throw RabbitExceptionTranslator.convertRabbitAccessException(var9); } ...... }
private void consumeFromQueue(String queue) throws IOException { String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : "", this.noLocal, this.exclusive, this.consumerArgs, new BlockingQueueConsumer.ConsumerDecorator(queue, this.consumer, this.applicationEventPublisher)); if (consumerTag != null) { this.consumerTags.put(consumerTag, queue); if (logger.isDebugEnabled()) { logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this); } } else { logger.error("Null consumer tag received for queue " + queue); }
}
}
|
消息消费的底层逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public String basicConsume(String queue, final boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException { Method m = (new com.rabbitmq.client.AMQP.Basic.Consume.Builder()).queue(queue).consumerTag(consumerTag).noLocal(noLocal).noAck(autoAck).exclusive(exclusive).arguments(arguments).build(); BlockingRpcContinuation<String> k = new BlockingRpcContinuation<String>(m) { public String transformReply(AMQCommand replyCommand) { String actualConsumerTag = ((ConsumeOk)replyCommand.getMethod()).getConsumerTag(); ChannelN.this._consumers.put(actualConsumerTag, callback); ChannelN.this.metricsCollector.basicConsume(ChannelN.this, actualConsumerTag, autoAck); ChannelN.this.dispatcher.handleConsumeOk(callback, actualConsumerTag); return actualConsumerTag; } }; this.rpc(m, k);
try { if (this._rpcTimeout == 0) { return (String)k.getReply(); } else { try { return (String)k.getReply(this._rpcTimeout); } catch (TimeoutException var11) { throw this.wrapTimeoutException(m, var11); } } } catch (ShutdownSignalException var12) { throw wrap(var12); } }
|
- 利用rpc进行消息消费的远程通信
- 底层利用反射机制生产消费者类和处理方法
总结:
- SimpleMessageListenerContainer是最简单的消息监听容器,使用简单,适合并发要求不高的场景
- 单线程串行执行,如果消费者处理时间过长,很有可能造成消息积压
- 如果代码中发生重复ack,channel将会被关闭并报异常,通道关闭后造成消息积压
- 由于没有使用线程池,多线程切换会影响效率
1
| java.lang.IllegalStateException: Channel closed; cannot ack/nack^@#\tat org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:982)^@#\tat com.sun.proxy.$Proxy248.basicAck(Unknown Source)^@#\tat com.yqn.pac.zuul.dispatch.mq.TaskResultConsumerTest.dispatch(TaskResultConsumerTest.java:27)^@#\tat sun.reflect.GeneratedMethodAccessor130.invoke(Unknown Source)^@#\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)^@#\tat java.lang.reflect.Method.invoke(Method.java:498)^@#\tat org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)^@#\tat org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)^@#\tat org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)^@#\tat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182)^@#\tat org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)^@#\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)^@#\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)^@#\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)^@#\tat org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)^@#\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)^@#\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)^@#\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)^@#\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)^@#\tat java.lang.Thread.run(Thread.java:748)^@#
|
DirectMessageListenerContainer

共用线程池
- 被@RabbitListener标记的消费者方法对应独立的RabbitListenerContainer
- 应用配置一个RabbitListenerContainerFactory且所有的queue使用相同消费者方法(@RabbitListener标记)时,每一个queue使用相同的线程池
- 消费者queue依然可以利用consumersPerQueue参数指定消费者并发数量,即多个channel
- channel对应的线程交由线程池管理
独立线程池
- 被@RabbitListener标记的消费者方法对应独立的RabbitListenerContainer
- 应用配置多个RabbitListenerContainerFactory,每个消费者指定不同的监听容器工厂可以实现独立线程池使用
- 应用配置一个RabbitListenerContainerFactory,每个队列拥有不同的消费者方法可以实现独立线程池使用
基本参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private static final int DEFAULT_MONITOR_INTERVAL = 10000; private static final int DEFAULT_ACK_TIMEOUT = 20000; protected final List<DirectMessageListenerContainer.SimpleConsumer> consumers = new LinkedList(); private final List<DirectMessageListenerContainer.SimpleConsumer> consumersToRestart = new LinkedList(); private final MultiValueMap<String, DirectMessageListenerContainer.SimpleConsumer> consumersByQueue = new LinkedMultiValueMap(); private final ActiveObjectCounter<DirectMessageListenerContainer.SimpleConsumer> cancellationLock = new ActiveObjectCounter(); private TaskScheduler taskScheduler; private boolean taskSchedulerSet; private long monitorInterval = 10000L; private int messagesPerAck; private long ackTimeout = 20000L; private volatile boolean started; private volatile boolean aborted; private volatile boolean hasStopped; private volatile CountDownLatch startedLatch = new CountDownLatch(1); private volatile int consumersPerQueue = 1; private volatile ScheduledFuture<?> consumerMonitorTask; private volatile long lastAlertAt; private volatile long lastRestartAttempt;
|
- consumers:容器内的消费者集合,每一个消费者对象都有对应的channel、queue
- consumersByQueue:以队列名称为key的消费者map,一个queue可能对应多个消费者
- taskScheduler:线程池ThreadPoolTaskScheduler,支持容器内所有消费者
- consumersPerQueue:每一个队列设定的消费者数量,即channel数量
- monitorInterval:尝试重启消费者的时间间隔
线程池初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public void setTaskScheduler(TaskScheduler taskScheduler) { this.taskScheduler = taskScheduler; this.taskSchedulerSet = true; }
protected void doInitialize() throws Exception { if (this.taskScheduler == null) { ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); threadPoolTaskScheduler.setThreadNamePrefix(this.getBeanName() + "-consumerMonitor-"); threadPoolTaskScheduler.afterPropertiesSet(); this.taskScheduler = threadPoolTaskScheduler; }
if (this.messagesPerAck > 0) { Assert.state(!this.isChannelTransacted(), "'messagesPerAck' is not allowed with transactions"); }
}
|
可以自由设定TaskScheduler,如果不设定则默认使用ThreadPoolTaskScheduler
线程池使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
| protected void actualStart() throws Exception { this.aborted = false; this.hasStopped = false; if (this.getPrefetchCount() < this.messagesPerAck) { this.setPrefetchCount(this.messagesPerAck); }
super.doStart(); String[] queueNames = this.getQueueNames(); this.checkMissingQueues(queueNames); if (this.getTaskExecutor() == null) { this.afterPropertiesSet(); }
long idleEventInterval = this.getIdleEventInterval(); if (this.taskScheduler == null) { this.afterPropertiesSet(); }
if (idleEventInterval > 0L && this.monitorInterval > idleEventInterval) { this.monitorInterval = idleEventInterval / 2L; }
if (this.getFailedDeclarationRetryInterval() < this.monitorInterval) { this.monitorInterval = this.getFailedDeclarationRetryInterval(); }
this.lastRestartAttempt = System.currentTimeMillis(); this.consumerMonitorTask = this.taskScheduler.scheduleAtFixedRate(() -> { long now = System.currentTimeMillis(); if (idleEventInterval > 0L && now - this.getLastReceive() > idleEventInterval && now - this.lastAlertAt > idleEventInterval) { this.publishIdleContainerEvent(now - this.getLastReceive()); this.lastAlertAt = now; }
List consumersToCancel; synchronized(this.consumersMonitor) { consumersToCancel = (List)this.consumers.stream().filter((c) -> { boolean open = c.getChannel().isOpen(); if (open && this.messagesPerAck > 1) { try { c.ackIfNecessary(now); } catch (IOException var6) { this.logger.error("Exception while sending delayed ack", var6); } }
return !open; }).collect(Collectors.toList()); }
consumersToCancel.forEach((c) -> { try { RabbitUtils.closeMessageConsumer(c.getChannel(), Collections.singletonList(c.getConsumerTag()), this.isChannelTransacted()); } catch (Exception var3) { if (this.logger.isDebugEnabled()) { this.logger.debug("Error closing consumer " + c, var3); } }
this.logger.error("Consumer canceled - channel closed " + c); c.cancelConsumer("Consumer " + c + " channel closed"); }); if (this.lastRestartAttempt + this.getFailedDeclarationRetryInterval() < now) { synchronized(this.consumersMonitor) { List<DirectMessageListenerContainer.SimpleConsumer> restartableConsumers = new ArrayList(this.consumersToRestart); this.consumersToRestart.clear(); if (this.started) { if (restartableConsumers.size() > 0) { this.doRedeclareElementsIfNecessary(); }
Iterator var8 = restartableConsumers.iterator();
while(var8.hasNext()) { DirectMessageListenerContainer.SimpleConsumer consumer = (DirectMessageListenerContainer.SimpleConsumer)var8.next(); if (this.logger.isDebugEnabled() && restartableConsumers.size() > 0) { this.logger.debug("Attempting to restart consumer " + consumer); }
try { this.doConsumeFromQueue(consumer.getQueue()); } catch (AmqpIOException | AmqpConnectException var13) { this.logger.error("Cannot connect to server", var13); if (var13.getCause() instanceof AmqpApplicationContextClosedException) { this.logger.error("Application context is closed, terminating"); this.taskScheduler.schedule(this::stop, new Date()); } break; } }
this.lastRestartAttempt = now; } } }
this.processMonitorTask(); }, this.monitorInterval); if (queueNames.length > 0) { this.doRedeclareElementsIfNecessary(); this.getTaskExecutor().execute(() -> { synchronized(this.consumersMonitor) { if (this.hasStopped) { if (this.logger.isDebugEnabled()) { this.logger.debug("Consumer start aborted - container stopping"); } } else { BackOffExecution backOffExecution = this.getRecoveryBackOff().start();
while(true) { label54: while(true) { if (this.started || !this.isRunning()) { return; }
this.cancellationLock.reset();
try { String[] var4 = queueNames; int var12 = queueNames.length; int var6 = 0;
while(true) { if (var6 >= var12) { break label54; }
String queue = var4[var6]; this.consumeFromQueue(queue); ++var6; } } catch (AmqpIOException | AmqpConnectException var10) { long nextBackOff = backOffExecution.nextBackOff(); if (nextBackOff < 0L || var10.getCause() instanceof AmqpApplicationContextClosedException) { this.aborted = true; this.shutdown(); this.logger.error("Failed to start container - fatal error or backOffs exhausted", var10); this.taskScheduler.schedule(this::stop, new Date()); return; }
this.logger.error("Error creating consumer; retrying in " + nextBackOff, var10); this.doShutdown();
try { Thread.sleep(nextBackOff); } catch (InterruptedException var9) { Thread.currentThread().interrupt(); } } }
this.started = true; this.startedLatch.countDown(); } }
} }); } else { this.started = true; this.startedLatch.countDown(); }
if (this.logger.isInfoEnabled()) { this.logger.info("Container initialized for queues: " + Arrays.asList(queueNames)); }
}
|
消费者并发
1 2 3 4 5 6 7 8 9 10 11 12
| private void consumeFromQueue(String queue) { List<DirectMessageListenerContainer.SimpleConsumer> list = (List)this.consumersByQueue.get(queue); if (CollectionUtils.isEmpty(list)) { for(int i = 0; i < this.consumersPerQueue; ++i) { this.doConsumeFromQueue(queue); } }
}
|
消费者消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| private void doConsumeFromQueue(String queue) { if (!this.isActive()) { if (this.logger.isDebugEnabled()) { this.logger.debug("Consume from queue " + queue + " ignore, container stopping"); }
} else { String routingLookupKey = this.getRoutingLookupKey(); if (routingLookupKey != null) { SimpleResourceHolder.bind(this.getRoutingConnectionFactory(), routingLookupKey); }
Connection connection = null;
try { connection = this.getConnectionFactory().createConnection(); } catch (Exception var14) { this.addConsumerToRestart(new DirectMessageListenerContainer.SimpleConsumer((Connection)null, (Channel)null, queue)); throw var14 instanceof AmqpConnectException ? (AmqpConnectException)var14 : new AmqpConnectException(var14); } finally { if (routingLookupKey != null) { SimpleResourceHolder.unbind(this.getRoutingConnectionFactory()); }
}
Channel channel = null; DirectMessageListenerContainer.SimpleConsumer consumer = null;
try { channel = connection.createChannel(this.isChannelTransacted()); channel.basicQos(this.getPrefetchCount()); consumer = new DirectMessageListenerContainer.SimpleConsumer(connection, channel, queue); channel.queueDeclarePassive(queue); consumer.consumerTag = channel.basicConsume(queue, this.getAcknowledgeMode().isAutoAck(), this.getConsumerTagStrategy() != null ? this.getConsumerTagStrategy().createConsumerTag(queue) : "", this.isNoLocal(), this.isExclusive(), this.getConsumerArguments(), consumer); } catch (AmqpApplicationContextClosedException var16) { throw new AmqpConnectException(var16); } catch (AmqpConnectException | IOException var17) { RabbitUtils.closeChannel(channel); RabbitUtils.closeConnection(connection); if (var17.getCause() instanceof ShutdownSignalException && var17.getCause().getMessage().contains("in exclusive use")) { this.getExclusiveConsumerExceptionLogger().log(this.logger, "Exclusive consumer failure", var17.getCause()); this.publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, var17); } else if (var17.getCause() instanceof ShutdownSignalException && RabbitUtils.isPassiveDeclarationChannelClose((ShutdownSignalException)var17.getCause())) { this.logger.error("Queue not present, scheduling consumer " + consumer + " for restart", var17); } else if (this.logger.isWarnEnabled()) { this.logger.warn("basicConsume failed, scheduling consumer " + consumer + " for restart", var17); }
if (consumer == null) { this.addConsumerToRestart(new DirectMessageListenerContainer.SimpleConsumer((Connection)null, (Channel)null, queue)); } else { this.addConsumerToRestart(consumer); consumer = null; } }
synchronized(this.consumersMonitor) { if (consumer != null) { this.cancellationLock.add(consumer); this.consumers.add(consumer); this.consumersByQueue.add(queue, consumer); if (this.logger.isInfoEnabled()) { this.logger.info(consumer + " started"); }
if (this.getApplicationEventPublisher() != null) { this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } }
} } }
|
总结
DirectMessageListenerContainer利用线程池来管理channel线程,但每一个线程的底层消息消费逻辑与SimpleMessageListenerContainer相同,都是借助channel利用反射实现消费者实例化。
SimpleMessageListenerContainer提供了以下特性,但DirectMessageListenerContainer不提供:
- txSize—使用SimpleMessageListenerContainer,您可以将其设置为控制事务中传递的消息数量和/或减少ack的数量,但这可能会导致失败后重复传递的数量增加。(与txSize和SimpleMessageListenerContainer一样,DirectMessageListenerContainer也有mesagesPerAck,可以用来减少ack,但不能用于事务—每个消息都在单独的事务中交付和打包)。
- maxconcurrentconsumer和consumer伸缩间隔/触发器—DirectMessageListenerContainer中没有自动伸缩;但是,它允许您以编程方式更改consumersPerQueue属性,并相应地调整使用者。
然而,与SimpleMessageListenerContainer相比,DirectMessageListenerContainer有以下优点:
- 在运行时添加和删除队列更有效;使用SimpleMessageListenerContainer,整个使用者线程重新启动(所有使用者取消并重新创建);对于DirectMessageListenerContainer,不受影响的使用者不会被取消。
- 避免了RabbitMQ客户机线程和使用者线程之间的上下文切换。
- 线程是跨使用者共享的,而不是为SimpleMessageListenerContainer中的每个使用者都有一个专用线程。但是,请参阅“线程和异步使用者”一节中有关连接工厂配置的重要说明。
版权声明:本文为博主原创文章,欢迎转载,转载请注明作者、原文超链接,感谢各位看官!!!
本文出自:monkeyGeek
座右铭:生于忧患,死于安乐
欢迎志同道合的朋友一起交流、探讨!
monkeyGeek