Only one thread is running concurrently in executor service, RabbitMQ

I have create a connection with a specified thread pool with 20 cores.

        ConnectionFactory factory = new ConnectionFactory();
        //specified es
        ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory);
        con = factory.newConnection(consumerExecutor, addresses);

then create a channel from this connection:

        final Channel channel = connection.createChannel();

and use this to create a DefaultConsumer.

While I find that though the threads can be used to consume messages, always, only one thread is consuming messages even though messages are massive accumulated in servers.

I look into the source code and find :

private final class WorkPoolRunnable implements Runnable {

    public void run() {
        int size = MAX_RUNNABLE_BLOCK_SIZE;
        List<Runnable> block = new ArrayList<Runnable>(size);
        try {
            Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
            if (key == null) return; // nothing ready to run
            try {
                for (Runnable runnable : block) {
            } finally {
                if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
                    ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
        } catch (RuntimeException e) {

/* Basic work selector and state transition step */
private K readyToInProgress() {
    K key = this.ready.poll();
    if (key != null) {
    return key;

 * Return the next <i>ready</i> client,
 * and transfer a collection of that client's items to process.
 * Mark client <i>in progress</i>.
 * If there is no <i>ready</i> client, return <code><b>null</b></code>.
 * @param to collection object in which to transfer items
 * @param size max number of items to transfer
 * @return key of client to whom items belong, or <code><b>null</b></code> if there is none.
public K nextWorkBlock(Collection<W> to, int size) {
    synchronized (this) {
        K nextKey = readyToInProgress();
        if (nextKey != null) {
            VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey);
            drainTo(queue, to, size);
        return nextKey;

The trick should be in ConsumerWorkService.this.workPool.nextWorkBlock, it poll the channel from the ready queue, and add to the read queue in the finish block after running the callback run(). Please correct me if I am wrong.

This is confusing since a consumer is bound to one channel, and the channel is not released to the queue until last task block is finished, which means the thread pool is always offering only one thread for that consumer.


  1. Why RabbitMQ designs this model
  2. How do we optimize this issue
  3. Is it good to submit the task to a standalone thread pool in handleDelivery to consume messages as well as ack(to ensure message ack only after task finishes)