Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] The number of threads in an ordered consumer threadpool cannot be expanded. #8356

Open
3 tasks done
CLFutureX opened this issue Jul 3, 2024 · 0 comments · May be fixed by #8357
Open
3 tasks done

[Bug] The number of threads in an ordered consumer threadpool cannot be expanded. #8356

CLFutureX opened this issue Jul 3, 2024 · 0 comments · May be fixed by #8357

Comments

@CLFutureX
Copy link
Contributor

CLFutureX commented Jul 3, 2024

Before Creating the Bug Report

  • I found a bug, not just asking a question, which should be created in GitHub Discussions.

  • I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.

  • I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.

Runtime platform environment

windows

RocketMQ version

develop

JDK Version

jdk 1.8

Describe the Bug

issue: pull消费模式下有序消费时,线程数无法扩充到最大线程数
背景:
结合第一篇的描述可以看到相关问题前提;这里的问题同样是线程数无法扩充到最大线程数
问题:
rocketmq中为了保证有序消费,在消费任务中,以队列分区的粒度进行了加锁,并在run方法中拉取当前分区前面的消息进行消费,保证了分区的有序消费。
为了避免竞争,在对拉取的消息进行消费之前,会判断当前分区是否已经在进行消费了,如果已经在进行消费了,就不会继续交给线程池消费了。
这样其实可以发现,线程池中的任务一直会比较少,应该会<= 当前消费者所定义的分区数。
所以这种模式下,最大并行度应该也是<= 当前分区数。
这样一来,就不能采用和前面一样的方式了,需要区别对待。
org.apache.rocketmq.client.impl.consumer.ConsumeMessagePopConcurrentlyService

方案:
以分区的维度来动态调整线程池核心线程数的大小
参数:
consumeMessageBatchMaxSize = 1
pullBatchSize = 32
pullThresholdSizeForQueue = 1000
consumeThreadMin=4
consumeThreadMax=10
订阅主题数=5, 单个主题分区分配数=2
以上面的参数为例,消费者实例分配到的分区数=10,按照当前的逻辑,线程数最多扩充到4,此时最大的并行度为4,其实并行度可以达到10的。
由于主题分区数会动态变化的,所以我们可以考虑通过定时任务更新线程池核心线程数,由于变更频率很低,定时任务可能存在大量的无效执行,所以也可以考虑添加变更后置事件:rebalance之后,调用后置事件,完成核心线程数的变更。
正好rebalance已经提供了后置处理接口,
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#messageQueueChanged
于是调整逻辑: 调整核心线程数=当前分配的主题分区数
this.defaultMQPushConsumer.setMessageQueueListener(new MessageQueueListener() {
@OverRide
public void messageQueueChanged(String topic, Set mqAll, Set mqAssigned) {
int queueSize = defaultMQPushConsumerImpl.getRebalanceImpl().processQueueTable.size();
updateCorePoolSize(queueSize);
}
});

 又由于在消费者实例启动时,会出发重平衡,此时会不会导致频繁的更新线程池核心线程数?
 当前的更新逻辑确实会导致这样的情况:只要corePoolSize 在0~ Math.min(ConsumeThreadMax, Short.MAX_VALUE) 之间就会。

public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
}

优化思路: corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin()
针对当前的场景,我们应该控制corePoolSize的范围在设置的min~max之间才是合理的。
这样一来,也不用担心在启动时,corePoolSize被频繁更新了。

简单测试:

1 在有序消费ConsumeMessageOrderlyService的重平衡监听中打印当前线程池的情况。
2 启动时,重平衡;
3 在控制台调整主题分区,触发重平衡。
测试结果,符合预期效果

Connected to the target VM, address: '127.0.0.1:54211', transport: 'socket'
Consumer Started.
consumeExecutor queueSize: 24, corePoolSize: 24
consumeExecutor queueSize: 25, corePoolSize: 25
consumeExecutor queueSize: 29, corePoolSize: 29

Steps to Reproduce

Start the consumer for orderly consumption

What Did You Expect to See?

The number of threads can dynamically change with the quantity of subscribed topic partitions to enhance the parallelism of orderly consumption.

What Did You See Instead?

read code

Additional Context

No response

CLFutureX added a commit to CLFutureX/rocketmq that referenced this issue Jul 3, 2024
@CLFutureX CLFutureX changed the title [Bug] In pull consumption mode, during orderly consumption, the number of threads cannot be expanded to the maximum thread count. Jul 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
1 participant