From 8777ae7dfde57f47151aa201654d911b49074186 Mon Sep 17 00:00:00 2001 From: Sami Mussbach Date: Fri, 26 Jun 2026 10:07:56 +0200 Subject: [PATCH] fix: make populate reply wait an idle timeout so large indexes can finish --- Persister/QueuePagerPersister.php | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/Persister/QueuePagerPersister.php b/Persister/QueuePagerPersister.php index 9c2c471..378c182 100644 --- a/Persister/QueuePagerPersister.php +++ b/Persister/QueuePagerPersister.php @@ -119,10 +119,15 @@ public function insert(PagerInterface $pager, array $options = array()) } while ($page <= $lastPage); $consumer = $this->context->createConsumer($replyQueue); + // Treat limit_overall_reply_time as an idle timeout rather than a hard overall cap: as long as + // replies keep arriving the consumers are making progress, so the deadline is pushed forward on + // each reply. This lets large indexes (whose population takes far longer than the timeout) finish + // while still aborting if the consumers go silent (e.g. all workers died). $limitTime = microtime(true) + $options['limit_overall_reply_time']; while ($sentCount) { if ($message = $consumer->receive($options['reply_receive_timeout'])) { $sentCount--; + $limitTime = microtime(true) + $options['limit_overall_reply_time']; $data = JSON::decode($message->getBody()); @@ -139,10 +144,8 @@ public function insert(PagerInterface $pager, array $options = array()) $data['options'] ); $this->dispatcher->dispatch($event); - } - - if (microtime(true) > $limitTime) { - throw new \LogicException(sprintf('Overall reply time (%s seconds) has been exceeded.', $options['limit_overall_reply_time'])); + } elseif (microtime(true) > $limitTime) { + throw new \LogicException(sprintf('No populate reply received within %s seconds; the consumers appear to be stuck.', $options['limit_overall_reply_time'])); } }