From feca32e46c95cb576d9a4b02c103f1cd5c310ee0 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 08:19:10 +0300 Subject: [PATCH 01/10] fix(client): detect streamed ClickHouse exceptions --- src/Client/PsrClickHouseAsyncClient.php | 21 ++- src/Client/PsrClickHouseClient.php | 39 ++++- src/Exception/ServerError.php | 22 ++- ...rClickHouseClientStreamedExceptionTest.php | 159 ++++++++++++++++++ tests/Exception/ServerErrorTest.php | 41 +++++ 5 files changed, 267 insertions(+), 15 deletions(-) create mode 100644 tests/Client/PsrClickHouseClientStreamedExceptionTest.php diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index f0938bb..85fd1a7 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -71,15 +71,13 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (ResponseInterface $response): Output => $outputFormat::output( - $response->getBody()->__toString(), - ), + processResponse: static fn (string $bodyContent): Output => $outputFormat::output($bodyContent), ); } /** * @param array $params - * @param (callable(ResponseInterface):mixed)|null $processResponse + * @param (callable(string):mixed)|null $processResponse * * @throws Exception */ @@ -110,15 +108,26 @@ private function executeRequest( function (ResponseInterface $response) use ($id, $processResponse) { $this->sqlLogger?->stopQuery($id); + $bodyContent = $response->getBody()->__toString(); + if ($response->getStatusCode() !== 200) { - throw ServerError::fromResponse($response); + throw ServerError::fromBody($bodyContent, $response->getStatusCode()); + } + + if ( + ServerError::bodyContainsStreamedException( + $bodyContent, + $response->getHeaderLine('X-ClickHouse-Exception-Tag'), + ) + ) { + throw ServerError::fromBody($bodyContent, $response->getStatusCode()); } if ($processResponse === null) { return $response; } - return $processResponse($response); + return $processResponse($bodyContent); }, fn () => $this->sqlLogger?->stopQuery($id), ); diff --git a/src/Client/PsrClickHouseClient.php b/src/Client/PsrClickHouseClient.php index 08aa348..842d6aa 100644 --- a/src/Client/PsrClickHouseClient.php +++ b/src/Client/PsrClickHouseClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Client; +use GuzzleHttp\Psr7\Utils; use InvalidArgumentException; use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Client\ClientInterface; @@ -138,6 +139,7 @@ public function selectStreamWithParams( CLICKHOUSE, params: $params, settings: $settings, + detectStreamedException: false, ); return $response->getBody(); @@ -313,8 +315,12 @@ public function insertPayload( * @throws ClientExceptionInterface * @throws UnsupportedParamType */ - private function executeRequest(string $sql, array $params, SettingsProvider $settings): ResponseInterface - { + private function executeRequest( + string $sql, + array $params, + SettingsProvider $settings, + bool $detectStreamedException = true, + ): ResponseInterface { $request = $this->requestFactory->prepareSqlRequest( $sql, new RequestSettings( @@ -326,15 +332,18 @@ private function executeRequest(string $sql, array $params, SettingsProvider $se ), ); - return $this->sendHttpRequest($request, $sql); + return $this->sendHttpRequest($request, $sql, $detectStreamedException); } /** * @throws ClientExceptionInterface * @throws ServerError */ - private function sendHttpRequest(RequestInterface $request, string $sql): ResponseInterface - { + private function sendHttpRequest( + RequestInterface $request, + string $sql, + bool $detectStreamedException = true, + ): ResponseInterface { $id = uniqid('', true); $this->sqlLogger?->startQuery($id, $sql); @@ -348,6 +357,24 @@ private function sendHttpRequest(RequestInterface $request, string $sql): Respon throw ServerError::fromResponse($response); } - return $response; + if (! $detectStreamedException) { + return $response; + } + + $bodyContent = $response->getBody()->__toString(); + if ( + ServerError::bodyContainsStreamedException( + $bodyContent, + $response->getHeaderLine('X-ClickHouse-Exception-Tag'), + ) + ) { + throw ServerError::fromBody($bodyContent, $response->getStatusCode()); + } + + try { + return $response->withBody(Utils::streamFor($bodyContent)); + } catch (InvalidArgumentException) { + absurd(); + } } } diff --git a/src/Exception/ServerError.php b/src/Exception/ServerError.php index 1ffe240..3f1078f 100644 --- a/src/Exception/ServerError.php +++ b/src/Exception/ServerError.php @@ -8,6 +8,7 @@ use Psr\Http\Message\ResponseInterface; use function preg_match; +use function preg_quote; final class ServerError extends Exception { @@ -22,9 +23,12 @@ private function __construct( public static function fromResponse(ResponseInterface $response): self { - $bodyContent = $response->getBody()->__toString(); + return self::fromBody($response->getBody()->__toString(), $response->getStatusCode()); + } - $errorCode = preg_match('~^Code: (\d+). DB::Exception:~', $bodyContent, $codeMatches) === 1 + public static function fromBody(string $bodyContent, int $httpStatusCode): self + { + $errorCode = preg_match('~(?:^|\R)Code: (\d+)\. DB::Exception:~', $bodyContent, $codeMatches) === 1 ? (int) $codeMatches[1] : 0; @@ -35,8 +39,20 @@ public static function fromResponse(ResponseInterface $response): self return new self( $bodyContent, $errorCode, - $response->getStatusCode(), + $httpStatusCode, $exceptionName, ); } + + public static function bodyContainsStreamedException(string $bodyContent, string $exceptionTag = ''): bool + { + if ($exceptionTag !== '') { + return preg_match( + '~(?:^|\R)__exception__\R' . preg_quote($exceptionTag, '~') . '\RCode: \d+\. DB::Exception:~', + $bodyContent, + ) === 1; + } + + return preg_match('~(?:^|\R)__exception__\R[[:alnum:]]{16}\RCode: \d+\. DB::Exception:~', $bodyContent) === 1; + } } diff --git a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php new file mode 100644 index 0000000..5ad5593 --- /dev/null +++ b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php @@ -0,0 +1,159 @@ +createResponse(200) + ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + + $httpClient = new class ($response) implements ClientInterface { + public function __construct(private ResponseInterface $response) + { + } + + public function sendRequest(RequestInterface $request): ResponseInterface + { + return $this->response; + } + }; + + $logger = new class implements SqlLogger { + public int $startCount = 0; + + public int $stopCount = 0; + + public function startQuery(string $id, string $sql): void + { + ++$this->startCount; + } + + public function stopQuery(string $id): void + { + ++$this->stopCount; + } + }; + + $client = new PsrClickHouseClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + $logger, + ); + + try { + $client->select('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated()); + self::fail('ServerError was not thrown.'); + } catch (ServerError $serverError) { + self::assertSame(395, $serverError->getCode()); + self::assertSame(200, $serverError->httpStatusCode); + self::assertSame('FUNCTION_THROW_IF_VALUE_IS_NON_ZERO', $serverError->clickHouseExceptionName); + } + + self::assertSame(1, $logger->startCount); + self::assertSame(1, $logger->stopCount); + } + + public function testAsyncSelectThrowsServerErrorWhenOkResponseContainsStreamedException(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + + $httpClient = new class ($response) implements HttpAsyncClient { + public function __construct(private ResponseInterface $response) + { + } + + /** @return FulfilledPromise */ + public function sendAsyncRequest(RequestInterface $request): FulfilledPromise + { + return new FulfilledPromise($this->response); + } + }; + + $logger = new class implements SqlLogger { + public int $startCount = 0; + + public int $stopCount = 0; + + public function startQuery(string $id, string $sql): void + { + ++$this->startCount; + } + + public function stopQuery(string $id): void + { + ++$this->stopCount; + } + }; + + $client = new PsrClickHouseAsyncClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + $logger, + ); + + try { + $client->select('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated())->wait(); + self::fail('ServerError was not thrown.'); + } catch (ServerError $serverError) { + self::assertSame(395, $serverError->getCode()); + self::assertSame(200, $serverError->httpStatusCode); + self::assertSame('FUNCTION_THROW_IF_VALUE_IS_NON_ZERO', $serverError->clickHouseExceptionName); + } + + self::assertSame(1, $logger->startCount); + self::assertSame(1, $logger->stopCount); + } + + private static function streamedExceptionBody(): string + { + return <<<'CLICKHOUSE' +0 0 +1 0 +__exception__ +abcdefghijklmnop +Code: 395. DB::Exception: Error while streaming. (FUNCTION_THROW_IF_VALUE_IS_NON_ZERO) +111 abcdefghijklmnop +__exception__ + +CLICKHOUSE; + } +} diff --git a/tests/Exception/ServerErrorTest.php b/tests/Exception/ServerErrorTest.php index 5e726f1..8775d55 100644 --- a/tests/Exception/ServerErrorTest.php +++ b/tests/Exception/ServerErrorTest.php @@ -44,4 +44,45 @@ public function testParseWithoutExceptionName(): void self::assertSame(500, $serverError->httpStatusCode); self::assertNull($serverError->clickHouseExceptionName); } + + public function testParseStreamedException(): void + { + $serverError = ServerError::fromBody(self::streamedExceptionBody(), 200); + + self::assertSame(395, $serverError->getCode()); + self::assertSame(200, $serverError->httpStatusCode); + self::assertSame('FUNCTION_THROW_IF_VALUE_IS_NON_ZERO', $serverError->clickHouseExceptionName); + } + + public function testDetectStreamedExceptionWithHeaderTag(): void + { + self::assertTrue(ServerError::bodyContainsStreamedException( + self::streamedExceptionBody(), + 'abcdefghijklmnop', + )); + self::assertFalse(ServerError::bodyContainsStreamedException( + self::streamedExceptionBody(), + 'ponmlkjihgfedcba', + )); + } + + public function testDetectStreamedExceptionWithoutHeaderTag(): void + { + self::assertTrue(ServerError::bodyContainsStreamedException(self::streamedExceptionBody())); + self::assertFalse(ServerError::bodyContainsStreamedException("1\n2\n3\n")); + } + + private static function streamedExceptionBody(): string + { + return <<<'CLICKHOUSE' +0 0 +1 0 +__exception__ +abcdefghijklmnop +Code: 395. DB::Exception: Error while streaming. (FUNCTION_THROW_IF_VALUE_IS_NON_ZERO) +111 abcdefghijklmnop +__exception__ + +CLICKHOUSE; + } } From cc6b96f2abef0b7234d7a04ea214ddb229a76526 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 08:48:36 +0300 Subject: [PATCH 02/10] fix(client): satisfy static analysis --- src/Client/PsrClickHouseAsyncClient.php | 3 +-- tests/Client/PsrClickHouseClientStreamedExceptionTest.php | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 85fd1a7..f60d06e 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -15,7 +15,6 @@ use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Logger\SqlLogger; -use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; @@ -71,7 +70,7 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (string $bodyContent): Output => $outputFormat::output($bodyContent), + processResponse: static fn (string $bodyContent) => $outputFormat::output($bodyContent), ); } diff --git a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php index 5ad5593..f5130d2 100644 --- a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php +++ b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php @@ -96,7 +96,6 @@ public function __construct(private ResponseInterface $response) { } - /** @return FulfilledPromise */ public function sendAsyncRequest(RequestInterface $request): FulfilledPromise { return new FulfilledPromise($this->response); From 468b56d20f8407b732dd2ada591b26f1f324828d Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 08:56:01 +0300 Subject: [PATCH 03/10] test(client): cover streamed exception sync branches --- ...rClickHouseClientStreamedExceptionTest.php | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php index f5130d2..c3a055c 100644 --- a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php +++ b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php @@ -84,6 +84,71 @@ public function stopQuery(string $id): void self::assertSame(1, $logger->stopCount); } + public function testSelectReturnsSuccessfulOkResponseAfterStreamedExceptionInspection(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withBody($psr17Factory->createStream("1\n")); + + $httpClient = new class ($response) implements ClientInterface { + public function __construct(private ResponseInterface $response) + { + } + + public function sendRequest(RequestInterface $request): ResponseInterface + { + return $this->response; + } + }; + + $client = new PsrClickHouseClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + ); + + $output = $client->select('SELECT 1', new TabSeparated()); + + self::assertSame("1\n", $output->contents); + } + + public function testSelectStreamDoesNotPreScanResponseBody(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + + $httpClient = new class ($response) implements ClientInterface { + public function __construct(private ResponseInterface $response) + { + } + + public function sendRequest(RequestInterface $request): ResponseInterface + { + return $this->response; + } + }; + + $client = new PsrClickHouseClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + ); + + $stream = $client->selectStream('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated()); + + self::assertSame(self::streamedExceptionBody(), $stream->__toString()); + } + public function testAsyncSelectThrowsServerErrorWhenOkResponseContainsStreamedException(): void { $psr17Factory = new Psr17Factory(); From 1a05d3efda82151eb1ca0f18fe4dedacb78da408 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 09:00:53 +0300 Subject: [PATCH 04/10] refactor(client): rewind inspected response body --- src/Client/PsrClickHouseClient.php | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Client/PsrClickHouseClient.php b/src/Client/PsrClickHouseClient.php index 842d6aa..dc8fbdf 100644 --- a/src/Client/PsrClickHouseClient.php +++ b/src/Client/PsrClickHouseClient.php @@ -4,7 +4,7 @@ namespace SimPod\ClickHouseClient\Client; -use GuzzleHttp\Psr7\Utils; +use GuzzleHttp\Psr7\Message; use InvalidArgumentException; use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Client\ClientInterface; @@ -371,10 +371,8 @@ private function sendHttpRequest( throw ServerError::fromBody($bodyContent, $response->getStatusCode()); } - try { - return $response->withBody(Utils::streamFor($bodyContent)); - } catch (InvalidArgumentException) { - absurd(); - } + Message::rewindBody($response); + + return $response; } } From 80d08b7b8e40465f472dd966844e07c578011605 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 09:12:51 +0300 Subject: [PATCH 05/10] refactor(client): pass async response to processor --- src/Client/PsrClickHouseAsyncClient.php | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index f60d06e..382fee5 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -7,6 +7,7 @@ use Exception; use GuzzleHttp\Promise\Create; use GuzzleHttp\Promise\PromiseInterface; +use GuzzleHttp\Psr7\Message; use Http\Client\HttpAsyncClient; use Psr\Http\Message\ResponseInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; @@ -70,13 +71,15 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (string $bodyContent) => $outputFormat::output($bodyContent), + processResponse: static fn (ResponseInterface $response) => $outputFormat::output( + $response->getBody()->__toString(), + ), ); } /** * @param array $params - * @param (callable(string):mixed)|null $processResponse + * @param (callable(ResponseInterface):mixed)|null $processResponse * * @throws Exception */ @@ -122,11 +125,13 @@ function (ResponseInterface $response) use ($id, $processResponse) { throw ServerError::fromBody($bodyContent, $response->getStatusCode()); } + Message::rewindBody($response); + if ($processResponse === null) { return $response; } - return $processResponse($bodyContent); + return $processResponse($response); }, fn () => $this->sqlLogger?->stopQuery($id), ); From aba6af0b291989ec68598eb90aacf8996aebc43f Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 10:14:46 +0300 Subject: [PATCH 06/10] refactor(client): use response factory for async errors --- src/Client/PsrClickHouseAsyncClient.php | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 382fee5..6cf27b8 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -110,12 +110,11 @@ private function executeRequest( function (ResponseInterface $response) use ($id, $processResponse) { $this->sqlLogger?->stopQuery($id); - $bodyContent = $response->getBody()->__toString(); - if ($response->getStatusCode() !== 200) { - throw ServerError::fromBody($bodyContent, $response->getStatusCode()); + throw ServerError::fromResponse($response); } + $bodyContent = $response->getBody()->__toString(); if ( ServerError::bodyContainsStreamedException( $bodyContent, From 2363fe7c3ee06f32e41f48955a938e7ff8c7f31d Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 10:33:12 +0300 Subject: [PATCH 07/10] refactor(client): avoid redundant async rewind --- src/Client/PsrClickHouseAsyncClient.php | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 6cf27b8..d40649f 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -7,7 +7,6 @@ use Exception; use GuzzleHttp\Promise\Create; use GuzzleHttp\Promise\PromiseInterface; -use GuzzleHttp\Psr7\Message; use Http\Client\HttpAsyncClient; use Psr\Http\Message\ResponseInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; @@ -124,8 +123,6 @@ function (ResponseInterface $response) use ($id, $processResponse) { throw ServerError::fromBody($bodyContent, $response->getStatusCode()); } - Message::rewindBody($response); - if ($processResponse === null) { return $response; } From cc42ccd66351a0a1e49b7d275b361119cead68e9 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 10:48:05 +0300 Subject: [PATCH 08/10] refactor(exception): use response factory consistently --- src/Client/PsrClickHouseAsyncClient.php | 2 +- src/Client/PsrClickHouseClient.php | 5 +---- src/Exception/ServerError.php | 7 ++----- tests/Exception/ServerErrorTest.php | 6 +++++- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index d40649f..7965a1e 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -120,7 +120,7 @@ function (ResponseInterface $response) use ($id, $processResponse) { $response->getHeaderLine('X-ClickHouse-Exception-Tag'), ) ) { - throw ServerError::fromBody($bodyContent, $response->getStatusCode()); + throw ServerError::fromResponse($response); } if ($processResponse === null) { diff --git a/src/Client/PsrClickHouseClient.php b/src/Client/PsrClickHouseClient.php index dc8fbdf..da264f8 100644 --- a/src/Client/PsrClickHouseClient.php +++ b/src/Client/PsrClickHouseClient.php @@ -4,7 +4,6 @@ namespace SimPod\ClickHouseClient\Client; -use GuzzleHttp\Psr7\Message; use InvalidArgumentException; use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Client\ClientInterface; @@ -368,11 +367,9 @@ private function sendHttpRequest( $response->getHeaderLine('X-ClickHouse-Exception-Tag'), ) ) { - throw ServerError::fromBody($bodyContent, $response->getStatusCode()); + throw ServerError::fromResponse($response); } - Message::rewindBody($response); - return $response; } } diff --git a/src/Exception/ServerError.php b/src/Exception/ServerError.php index 3f1078f..c2ef5d1 100644 --- a/src/Exception/ServerError.php +++ b/src/Exception/ServerError.php @@ -23,11 +23,8 @@ private function __construct( public static function fromResponse(ResponseInterface $response): self { - return self::fromBody($response->getBody()->__toString(), $response->getStatusCode()); - } + $bodyContent = $response->getBody()->__toString(); - public static function fromBody(string $bodyContent, int $httpStatusCode): self - { $errorCode = preg_match('~(?:^|\R)Code: (\d+)\. DB::Exception:~', $bodyContent, $codeMatches) === 1 ? (int) $codeMatches[1] : 0; @@ -39,7 +36,7 @@ public static function fromBody(string $bodyContent, int $httpStatusCode): self return new self( $bodyContent, $errorCode, - $httpStatusCode, + $response->getStatusCode(), $exceptionName, ); } diff --git a/tests/Exception/ServerErrorTest.php b/tests/Exception/ServerErrorTest.php index 8775d55..760bb05 100644 --- a/tests/Exception/ServerErrorTest.php +++ b/tests/Exception/ServerErrorTest.php @@ -47,7 +47,11 @@ public function testParseWithoutExceptionName(): void public function testParseStreamedException(): void { - $serverError = ServerError::fromBody(self::streamedExceptionBody(), 200); + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + + $serverError = ServerError::fromResponse($response); self::assertSame(395, $serverError->getCode()); self::assertSame(200, $serverError->httpStatusCode); From 804ee0d5d840a596b9babe6546ee93976cb627c1 Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 11:35:00 +0300 Subject: [PATCH 09/10] fix(client): preserve inspected response bodies --- src/Client/PsrClickHouseAsyncClient.php | 23 ++++ src/Client/PsrClickHouseClient.php | 22 ++++ ...rClickHouseClientStreamedExceptionTest.php | 109 +++++++++++++++++- 3 files changed, 150 insertions(+), 4 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index 7965a1e..d17d360 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -7,6 +7,7 @@ use Exception; use GuzzleHttp\Promise\Create; use GuzzleHttp\Promise\PromiseInterface; +use GuzzleHttp\Psr7\Stream; use Http\Client\HttpAsyncClient; use Psr\Http\Message\ResponseInterface; use SimPod\ClickHouseClient\Client\Http\RequestFactory; @@ -20,6 +21,10 @@ use SimPod\ClickHouseClient\Sql\SqlFactory; use SimPod\ClickHouseClient\Sql\ValueFormatter; +use function fopen; +use function fwrite; +use function rewind; +use function SimPod\ClickHouseClient\absurd; use function uniqid; class PsrClickHouseAsyncClient implements ClickHouseAsyncClient @@ -114,6 +119,7 @@ function (ResponseInterface $response) use ($id, $processResponse) { } $bodyContent = $response->getBody()->__toString(); + $response = self::withBodyContent($response, $bodyContent); if ( ServerError::bodyContainsStreamedException( $bodyContent, @@ -132,4 +138,21 @@ function (ResponseInterface $response) use ($id, $processResponse) { fn () => $this->sqlLogger?->stopQuery($id), ); } + + private static function withBodyContent(ResponseInterface $response, string $bodyContent): ResponseInterface + { + $body = fopen('php://temp', 'r+'); + if ($body === false) { + absurd(); + } + + fwrite($body, $bodyContent); + rewind($body); + + /** @phpstan-ignore missingType.checkedException */ + $bodyStream = new Stream($body); + + /** @phpstan-ignore missingType.checkedException */ + return $response->withBody($bodyStream); + } } diff --git a/src/Client/PsrClickHouseClient.php b/src/Client/PsrClickHouseClient.php index da264f8..31f7465 100644 --- a/src/Client/PsrClickHouseClient.php +++ b/src/Client/PsrClickHouseClient.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Client; +use GuzzleHttp\Psr7\Stream; use InvalidArgumentException; use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Client\ClientInterface; @@ -31,9 +32,12 @@ use function array_keys; use function array_map; use function array_values; +use function fopen; +use function fwrite; use function implode; use function is_array; use function is_int; +use function rewind; use function SimPod\ClickHouseClient\absurd; use function sprintf; use function uniqid; @@ -361,6 +365,7 @@ private function sendHttpRequest( } $bodyContent = $response->getBody()->__toString(); + $response = self::withBodyContent($response, $bodyContent); if ( ServerError::bodyContainsStreamedException( $bodyContent, @@ -372,4 +377,21 @@ private function sendHttpRequest( return $response; } + + private static function withBodyContent(ResponseInterface $response, string $bodyContent): ResponseInterface + { + $body = fopen('php://temp', 'r+'); + if ($body === false) { + absurd(); + } + + fwrite($body, $bodyContent); + rewind($body); + + /** @phpstan-ignore missingType.checkedException */ + $bodyStream = new Stream($body); + + /** @phpstan-ignore missingType.checkedException */ + return $response->withBody($bodyStream); + } } diff --git a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php index c3a055c..e63d3e0 100644 --- a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php +++ b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php @@ -11,6 +11,8 @@ use Psr\Http\Client\ClientInterface; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\StreamInterface; +use RuntimeException; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; use SimPod\ClickHouseClient\Client\PsrClickHouseClient; @@ -20,6 +22,10 @@ use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use SimPod\ClickHouseClient\Tests\TestCaseBase; +use function strlen; + +use const SEEK_SET; + #[CoversClass(RequestFactory::class)] #[CoversClass(PsrClickHouseAsyncClient::class)] #[CoversClass(PsrClickHouseClient::class)] @@ -31,7 +37,7 @@ public function testSelectThrowsServerErrorWhenOkResponseContainsStreamedExcepti $psr17Factory = new Psr17Factory(); $response = $psr17Factory->createResponse(200) ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') - ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + ->withBody(self::nonSeekableStream(self::streamedExceptionBody())); $httpClient = new class ($response) implements ClientInterface { public function __construct(private ResponseInterface $response) @@ -88,7 +94,7 @@ public function testSelectReturnsSuccessfulOkResponseAfterStreamedExceptionInspe { $psr17Factory = new Psr17Factory(); $response = $psr17Factory->createResponse(200) - ->withBody($psr17Factory->createStream("1\n")); + ->withBody(self::nonSeekableStream("1\n")); $httpClient = new class ($response) implements ClientInterface { public function __construct(private ResponseInterface $response) @@ -121,7 +127,7 @@ public function testSelectStreamDoesNotPreScanResponseBody(): void $psr17Factory = new Psr17Factory(); $response = $psr17Factory->createResponse(200) ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') - ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + ->withBody(self::nonSeekableStream(self::streamedExceptionBody())); $httpClient = new class ($response) implements ClientInterface { public function __construct(private ResponseInterface $response) @@ -154,7 +160,7 @@ public function testAsyncSelectThrowsServerErrorWhenOkResponseContainsStreamedEx $psr17Factory = new Psr17Factory(); $response = $psr17Factory->createResponse(200) ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') - ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + ->withBody(self::nonSeekableStream(self::streamedExceptionBody())); $httpClient = new class ($response) implements HttpAsyncClient { public function __construct(private ResponseInterface $response) @@ -220,4 +226,99 @@ private static function streamedExceptionBody(): string CLICKHOUSE; } + + private static function nonSeekableStream(string $contents): StreamInterface + { + return new class ($contents) implements StreamInterface { + private bool $consumed = false; + + public function __construct(private string $contents) + { + } + + public function __toString(): string + { + return $this->getContents(); + } + + public function close(): void + { + $this->consumed = true; + } + + /** @return resource|null */ + public function detach() + { + $this->close(); + + return null; + } + + public function getSize(): int|null + { + return null; + } + + public function tell(): int + { + return $this->consumed ? strlen($this->contents) : 0; + } + + public function eof(): bool + { + return $this->consumed; + } + + public function isSeekable(): bool + { + return false; + } + + public function seek(int $offset, int $whence = SEEK_SET): void + { + throw new RuntimeException('Stream is not seekable.'); + } + + public function rewind(): void + { + throw new RuntimeException('Stream is not seekable.'); + } + + public function isWritable(): bool + { + return false; + } + + public function write(string $string): int + { + throw new RuntimeException('Stream is not writable.'); + } + + public function isReadable(): bool + { + return true; + } + + public function read(int $length): string + { + return $this->getContents(); + } + + public function getContents(): string + { + if ($this->consumed) { + return ''; + } + + $this->consumed = true; + + return $this->contents; + } + + public function getMetadata(string|null $key = null): mixed + { + return null; + } + }; + } } From 22e74d9791ab462e3f4edca13da13d2fc18c0bde Mon Sep 17 00:00:00 2001 From: Simon Podlipsky Date: Sat, 27 Jun 2026 11:48:23 +0300 Subject: [PATCH 10/10] fix(client): reject non-seekable inspected bodies --- src/Client/PsrClickHouseAsyncClient.php | 36 ++-- src/Client/PsrClickHouseClient.php | 35 ++-- ...rClickHouseClientStreamedExceptionTest.php | 175 +++++++----------- 3 files changed, 95 insertions(+), 151 deletions(-) diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index d17d360..de6b51c 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -7,9 +7,10 @@ use Exception; use GuzzleHttp\Promise\Create; use GuzzleHttp\Promise\PromiseInterface; -use GuzzleHttp\Psr7\Stream; +use GuzzleHttp\Psr7\Message; use Http\Client\HttpAsyncClient; use Psr\Http\Message\ResponseInterface; +use RuntimeException; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; @@ -21,10 +22,6 @@ use SimPod\ClickHouseClient\Sql\SqlFactory; use SimPod\ClickHouseClient\Sql\ValueFormatter; -use function fopen; -use function fwrite; -use function rewind; -use function SimPod\ClickHouseClient\absurd; use function uniqid; class PsrClickHouseAsyncClient implements ClickHouseAsyncClient @@ -118,8 +115,14 @@ function (ResponseInterface $response) use ($id, $processResponse) { throw ServerError::fromResponse($response); } - $bodyContent = $response->getBody()->__toString(); - $response = self::withBodyContent($response, $bodyContent); + $body = $response->getBody(); + if (! $body->isSeekable()) { + throw new RuntimeException( + 'Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.', + ); + } + + $bodyContent = $body->__toString(); if ( ServerError::bodyContainsStreamedException( $bodyContent, @@ -129,6 +132,8 @@ function (ResponseInterface $response) use ($id, $processResponse) { throw ServerError::fromResponse($response); } + Message::rewindBody($response); + if ($processResponse === null) { return $response; } @@ -138,21 +143,4 @@ function (ResponseInterface $response) use ($id, $processResponse) { fn () => $this->sqlLogger?->stopQuery($id), ); } - - private static function withBodyContent(ResponseInterface $response, string $bodyContent): ResponseInterface - { - $body = fopen('php://temp', 'r+'); - if ($body === false) { - absurd(); - } - - fwrite($body, $bodyContent); - rewind($body); - - /** @phpstan-ignore missingType.checkedException */ - $bodyStream = new Stream($body); - - /** @phpstan-ignore missingType.checkedException */ - return $response->withBody($bodyStream); - } } diff --git a/src/Client/PsrClickHouseClient.php b/src/Client/PsrClickHouseClient.php index 31f7465..fcad52c 100644 --- a/src/Client/PsrClickHouseClient.php +++ b/src/Client/PsrClickHouseClient.php @@ -4,13 +4,14 @@ namespace SimPod\ClickHouseClient\Client; -use GuzzleHttp\Psr7\Stream; +use GuzzleHttp\Psr7\Message; use InvalidArgumentException; use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Client\ClientInterface; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\StreamInterface; +use RuntimeException; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; @@ -32,12 +33,9 @@ use function array_keys; use function array_map; use function array_values; -use function fopen; -use function fwrite; use function implode; use function is_array; use function is_int; -use function rewind; use function SimPod\ClickHouseClient\absurd; use function sprintf; use function uniqid; @@ -364,8 +362,14 @@ private function sendHttpRequest( return $response; } - $bodyContent = $response->getBody()->__toString(); - $response = self::withBodyContent($response, $bodyContent); + $body = $response->getBody(); + if (! $body->isSeekable()) { + throw new RuntimeException( + 'Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.', + ); + } + + $bodyContent = $body->__toString(); if ( ServerError::bodyContainsStreamedException( $bodyContent, @@ -375,23 +379,8 @@ private function sendHttpRequest( throw ServerError::fromResponse($response); } - return $response; - } - - private static function withBodyContent(ResponseInterface $response, string $bodyContent): ResponseInterface - { - $body = fopen('php://temp', 'r+'); - if ($body === false) { - absurd(); - } - - fwrite($body, $bodyContent); - rewind($body); - - /** @phpstan-ignore missingType.checkedException */ - $bodyStream = new Stream($body); + Message::rewindBody($response); - /** @phpstan-ignore missingType.checkedException */ - return $response->withBody($bodyStream); + return $response; } } diff --git a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php index e63d3e0..456627a 100644 --- a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php +++ b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php @@ -4,6 +4,7 @@ namespace SimPod\ClickHouseClient\Tests\Client; +use GuzzleHttp\Psr7\NoSeekStream; use Http\Client\HttpAsyncClient; use Http\Promise\FulfilledPromise; use Nyholm\Psr7\Factory\Psr17Factory; @@ -11,7 +12,6 @@ use Psr\Http\Client\ClientInterface; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; -use Psr\Http\Message\StreamInterface; use RuntimeException; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\PsrClickHouseAsyncClient; @@ -22,10 +22,6 @@ use SimPod\ClickHouseClient\Param\ParamValueConverterRegistry; use SimPod\ClickHouseClient\Tests\TestCaseBase; -use function strlen; - -use const SEEK_SET; - #[CoversClass(RequestFactory::class)] #[CoversClass(PsrClickHouseAsyncClient::class)] #[CoversClass(PsrClickHouseClient::class)] @@ -37,7 +33,7 @@ public function testSelectThrowsServerErrorWhenOkResponseContainsStreamedExcepti $psr17Factory = new Psr17Factory(); $response = $psr17Factory->createResponse(200) ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') - ->withBody(self::nonSeekableStream(self::streamedExceptionBody())); + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); $httpClient = new class ($response) implements ClientInterface { public function __construct(private ResponseInterface $response) @@ -94,7 +90,7 @@ public function testSelectReturnsSuccessfulOkResponseAfterStreamedExceptionInspe { $psr17Factory = new Psr17Factory(); $response = $psr17Factory->createResponse(200) - ->withBody(self::nonSeekableStream("1\n")); + ->withBody($psr17Factory->createStream("1\n")); $httpClient = new class ($response) implements ClientInterface { public function __construct(private ResponseInterface $response) @@ -122,12 +118,45 @@ public function sendRequest(RequestInterface $request): ResponseInterface self::assertSame("1\n", $output->contents); } + public function testSelectThrowsWhenStreamedExceptionInspectionWouldConsumeNonSeekableBody(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withBody(new NoSeekStream($psr17Factory->createStream("1\n"))); + + $httpClient = new class ($response) implements ClientInterface { + public function __construct(private ResponseInterface $response) + { + } + + public function sendRequest(RequestInterface $request): ResponseInterface + { + return $this->response; + } + }; + + $client = new PsrClickHouseClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + ); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.'); + + $client->select('SELECT 1', new TabSeparated()); + } + public function testSelectStreamDoesNotPreScanResponseBody(): void { $psr17Factory = new Psr17Factory(); $response = $psr17Factory->createResponse(200) ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') - ->withBody(self::nonSeekableStream(self::streamedExceptionBody())); + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); $httpClient = new class ($response) implements ClientInterface { public function __construct(private ResponseInterface $response) @@ -160,7 +189,7 @@ public function testAsyncSelectThrowsServerErrorWhenOkResponseContainsStreamedEx $psr17Factory = new Psr17Factory(); $response = $psr17Factory->createResponse(200) ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') - ->withBody(self::nonSeekableStream(self::streamedExceptionBody())); + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); $httpClient = new class ($response) implements HttpAsyncClient { public function __construct(private ResponseInterface $response) @@ -213,112 +242,50 @@ public function stopQuery(string $id): void self::assertSame(1, $logger->stopCount); } - private static function streamedExceptionBody(): string - { - return <<<'CLICKHOUSE' -0 0 -1 0 -__exception__ -abcdefghijklmnop -Code: 395. DB::Exception: Error while streaming. (FUNCTION_THROW_IF_VALUE_IS_NON_ZERO) -111 abcdefghijklmnop -__exception__ - -CLICKHOUSE; - } - - private static function nonSeekableStream(string $contents): StreamInterface + public function testAsyncSelectThrowsWhenStreamedExceptionInspectionWouldConsumeNonSeekableBody(): void { - return new class ($contents) implements StreamInterface { - private bool $consumed = false; - - public function __construct(private string $contents) - { - } - - public function __toString(): string - { - return $this->getContents(); - } - - public function close(): void - { - $this->consumed = true; - } - - /** @return resource|null */ - public function detach() - { - $this->close(); - - return null; - } - - public function getSize(): int|null - { - return null; - } - - public function tell(): int - { - return $this->consumed ? strlen($this->contents) : 0; - } - - public function eof(): bool - { - return $this->consumed; - } - - public function isSeekable(): bool - { - return false; - } - - public function seek(int $offset, int $whence = SEEK_SET): void - { - throw new RuntimeException('Stream is not seekable.'); - } - - public function rewind(): void - { - throw new RuntimeException('Stream is not seekable.'); - } - - public function isWritable(): bool - { - return false; - } + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withBody(new NoSeekStream($psr17Factory->createStream("1\n"))); - public function write(string $string): int + $httpClient = new class ($response) implements HttpAsyncClient { + public function __construct(private ResponseInterface $response) { - throw new RuntimeException('Stream is not writable.'); } - public function isReadable(): bool + public function sendAsyncRequest(RequestInterface $request): FulfilledPromise { - return true; + return new FulfilledPromise($this->response); } + }; - public function read(int $length): string - { - return $this->getContents(); - } + $client = new PsrClickHouseAsyncClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + ); - public function getContents(): string - { - if ($this->consumed) { - return ''; - } + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.'); - $this->consumed = true; + $client->select('SELECT 1', new TabSeparated())->wait(); + } - return $this->contents; - } + private static function streamedExceptionBody(): string + { + return <<<'CLICKHOUSE' +0 0 +1 0 +__exception__ +abcdefghijklmnop +Code: 395. DB::Exception: Error while streaming. (FUNCTION_THROW_IF_VALUE_IS_NON_ZERO) +111 abcdefghijklmnop +__exception__ - public function getMetadata(string|null $key = null): mixed - { - return null; - } - }; +CLICKHOUSE; } }