diff --git a/Persister/QueuePagerPersister.php b/Persister/QueuePagerPersister.php index 9c2c471..378c182 100644 --- a/Persister/QueuePagerPersister.php +++ b/Persister/QueuePagerPersister.php @@ -119,10 +119,15 @@ public function insert(PagerInterface $pager, array $options = array()) } while ($page <= $lastPage); $consumer = $this->context->createConsumer($replyQueue); + // Treat limit_overall_reply_time as an idle timeout rather than a hard overall cap: as long as + // replies keep arriving the consumers are making progress, so the deadline is pushed forward on + // each reply. This lets large indexes (whose population takes far longer than the timeout) finish + // while still aborting if the consumers go silent (e.g. all workers died). $limitTime = microtime(true) + $options['limit_overall_reply_time']; while ($sentCount) { if ($message = $consumer->receive($options['reply_receive_timeout'])) { $sentCount--; + $limitTime = microtime(true) + $options['limit_overall_reply_time']; $data = JSON::decode($message->getBody()); @@ -139,10 +144,8 @@ public function insert(PagerInterface $pager, array $options = array()) $data['options'] ); $this->dispatcher->dispatch($event); - } - - if (microtime(true) > $limitTime) { - throw new \LogicException(sprintf('Overall reply time (%s seconds) has been exceeded.', $options['limit_overall_reply_time'])); + } elseif (microtime(true) > $limitTime) { + throw new \LogicException(sprintf('No populate reply received within %s seconds; the consumers appear to be stuck.', $options['limit_overall_reply_time'])); } }