diff --git a/src/Client/PsrClickHouseAsyncClient.php b/src/Client/PsrClickHouseAsyncClient.php index f0938bb..de6b51c 100644 --- a/src/Client/PsrClickHouseAsyncClient.php +++ b/src/Client/PsrClickHouseAsyncClient.php @@ -7,15 +7,16 @@ use Exception; use GuzzleHttp\Promise\Create; use GuzzleHttp\Promise\PromiseInterface; +use GuzzleHttp\Psr7\Message; use Http\Client\HttpAsyncClient; use Psr\Http\Message\ResponseInterface; +use RuntimeException; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; use SimPod\ClickHouseClient\Exception\ServerError; use SimPod\ClickHouseClient\Format\Format; use SimPod\ClickHouseClient\Logger\SqlLogger; -use SimPod\ClickHouseClient\Output\Output; use SimPod\ClickHouseClient\Settings\EmptySettingsProvider; use SimPod\ClickHouseClient\Settings\SettingsProvider; use SimPod\ClickHouseClient\Sql\SqlFactory; @@ -71,7 +72,7 @@ public function selectWithParams( CLICKHOUSE, params: $params, settings: $settings, - processResponse: static fn (ResponseInterface $response): Output => $outputFormat::output( + processResponse: static fn (ResponseInterface $response) => $outputFormat::output( $response->getBody()->__toString(), ), ); @@ -114,6 +115,25 @@ function (ResponseInterface $response) use ($id, $processResponse) { throw ServerError::fromResponse($response); } + $body = $response->getBody(); + if (! $body->isSeekable()) { + throw new RuntimeException( + 'Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.', + ); + } + + $bodyContent = $body->__toString(); + if ( + ServerError::bodyContainsStreamedException( + $bodyContent, + $response->getHeaderLine('X-ClickHouse-Exception-Tag'), + ) + ) { + throw ServerError::fromResponse($response); + } + + Message::rewindBody($response); + if ($processResponse === null) { return $response; } diff --git a/src/Client/PsrClickHouseClient.php b/src/Client/PsrClickHouseClient.php index 08aa348..fcad52c 100644 --- a/src/Client/PsrClickHouseClient.php +++ b/src/Client/PsrClickHouseClient.php @@ -4,12 +4,14 @@ namespace SimPod\ClickHouseClient\Client; +use GuzzleHttp\Psr7\Message; use InvalidArgumentException; use Psr\Http\Client\ClientExceptionInterface; use Psr\Http\Client\ClientInterface; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\StreamInterface; +use RuntimeException; use SimPod\ClickHouseClient\Client\Http\RequestFactory; use SimPod\ClickHouseClient\Client\Http\RequestOptions; use SimPod\ClickHouseClient\Client\Http\RequestSettings; @@ -138,6 +140,7 @@ public function selectStreamWithParams( CLICKHOUSE, params: $params, settings: $settings, + detectStreamedException: false, ); return $response->getBody(); @@ -313,8 +316,12 @@ public function insertPayload( * @throws ClientExceptionInterface * @throws UnsupportedParamType */ - private function executeRequest(string $sql, array $params, SettingsProvider $settings): ResponseInterface - { + private function executeRequest( + string $sql, + array $params, + SettingsProvider $settings, + bool $detectStreamedException = true, + ): ResponseInterface { $request = $this->requestFactory->prepareSqlRequest( $sql, new RequestSettings( @@ -326,15 +333,18 @@ private function executeRequest(string $sql, array $params, SettingsProvider $se ), ); - return $this->sendHttpRequest($request, $sql); + return $this->sendHttpRequest($request, $sql, $detectStreamedException); } /** * @throws ClientExceptionInterface * @throws ServerError */ - private function sendHttpRequest(RequestInterface $request, string $sql): ResponseInterface - { + private function sendHttpRequest( + RequestInterface $request, + string $sql, + bool $detectStreamedException = true, + ): ResponseInterface { $id = uniqid('', true); $this->sqlLogger?->startQuery($id, $sql); @@ -348,6 +358,29 @@ private function sendHttpRequest(RequestInterface $request, string $sql): Respon throw ServerError::fromResponse($response); } + if (! $detectStreamedException) { + return $response; + } + + $body = $response->getBody(); + if (! $body->isSeekable()) { + throw new RuntimeException( + 'Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.', + ); + } + + $bodyContent = $body->__toString(); + if ( + ServerError::bodyContainsStreamedException( + $bodyContent, + $response->getHeaderLine('X-ClickHouse-Exception-Tag'), + ) + ) { + throw ServerError::fromResponse($response); + } + + Message::rewindBody($response); + return $response; } } diff --git a/src/Exception/ServerError.php b/src/Exception/ServerError.php index 1ffe240..c2ef5d1 100644 --- a/src/Exception/ServerError.php +++ b/src/Exception/ServerError.php @@ -8,6 +8,7 @@ use Psr\Http\Message\ResponseInterface; use function preg_match; +use function preg_quote; final class ServerError extends Exception { @@ -24,7 +25,7 @@ public static function fromResponse(ResponseInterface $response): self { $bodyContent = $response->getBody()->__toString(); - $errorCode = preg_match('~^Code: (\d+). DB::Exception:~', $bodyContent, $codeMatches) === 1 + $errorCode = preg_match('~(?:^|\R)Code: (\d+)\. DB::Exception:~', $bodyContent, $codeMatches) === 1 ? (int) $codeMatches[1] : 0; @@ -39,4 +40,16 @@ public static function fromResponse(ResponseInterface $response): self $exceptionName, ); } + + public static function bodyContainsStreamedException(string $bodyContent, string $exceptionTag = ''): bool + { + if ($exceptionTag !== '') { + return preg_match( + '~(?:^|\R)__exception__\R' . preg_quote($exceptionTag, '~') . '\RCode: \d+\. DB::Exception:~', + $bodyContent, + ) === 1; + } + + return preg_match('~(?:^|\R)__exception__\R[[:alnum:]]{16}\RCode: \d+\. DB::Exception:~', $bodyContent) === 1; + } } diff --git a/tests/Client/PsrClickHouseClientStreamedExceptionTest.php b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php new file mode 100644 index 0000000..456627a --- /dev/null +++ b/tests/Client/PsrClickHouseClientStreamedExceptionTest.php @@ -0,0 +1,291 @@ +createResponse(200) + ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + + $httpClient = new class ($response) implements ClientInterface { + public function __construct(private ResponseInterface $response) + { + } + + public function sendRequest(RequestInterface $request): ResponseInterface + { + return $this->response; + } + }; + + $logger = new class implements SqlLogger { + public int $startCount = 0; + + public int $stopCount = 0; + + public function startQuery(string $id, string $sql): void + { + ++$this->startCount; + } + + public function stopQuery(string $id): void + { + ++$this->stopCount; + } + }; + + $client = new PsrClickHouseClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + $logger, + ); + + try { + $client->select('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated()); + self::fail('ServerError was not thrown.'); + } catch (ServerError $serverError) { + self::assertSame(395, $serverError->getCode()); + self::assertSame(200, $serverError->httpStatusCode); + self::assertSame('FUNCTION_THROW_IF_VALUE_IS_NON_ZERO', $serverError->clickHouseExceptionName); + } + + self::assertSame(1, $logger->startCount); + self::assertSame(1, $logger->stopCount); + } + + public function testSelectReturnsSuccessfulOkResponseAfterStreamedExceptionInspection(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withBody($psr17Factory->createStream("1\n")); + + $httpClient = new class ($response) implements ClientInterface { + public function __construct(private ResponseInterface $response) + { + } + + public function sendRequest(RequestInterface $request): ResponseInterface + { + return $this->response; + } + }; + + $client = new PsrClickHouseClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + ); + + $output = $client->select('SELECT 1', new TabSeparated()); + + self::assertSame("1\n", $output->contents); + } + + public function testSelectThrowsWhenStreamedExceptionInspectionWouldConsumeNonSeekableBody(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withBody(new NoSeekStream($psr17Factory->createStream("1\n"))); + + $httpClient = new class ($response) implements ClientInterface { + public function __construct(private ResponseInterface $response) + { + } + + public function sendRequest(RequestInterface $request): ResponseInterface + { + return $this->response; + } + }; + + $client = new PsrClickHouseClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + ); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.'); + + $client->select('SELECT 1', new TabSeparated()); + } + + public function testSelectStreamDoesNotPreScanResponseBody(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + + $httpClient = new class ($response) implements ClientInterface { + public function __construct(private ResponseInterface $response) + { + } + + public function sendRequest(RequestInterface $request): ResponseInterface + { + return $this->response; + } + }; + + $client = new PsrClickHouseClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + ); + + $stream = $client->selectStream('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated()); + + self::assertSame(self::streamedExceptionBody(), $stream->__toString()); + } + + public function testAsyncSelectThrowsServerErrorWhenOkResponseContainsStreamedException(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withHeader('X-ClickHouse-Exception-Tag', 'abcdefghijklmnop') + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + + $httpClient = new class ($response) implements HttpAsyncClient { + public function __construct(private ResponseInterface $response) + { + } + + public function sendAsyncRequest(RequestInterface $request): FulfilledPromise + { + return new FulfilledPromise($this->response); + } + }; + + $logger = new class implements SqlLogger { + public int $startCount = 0; + + public int $stopCount = 0; + + public function startQuery(string $id, string $sql): void + { + ++$this->startCount; + } + + public function stopQuery(string $id): void + { + ++$this->stopCount; + } + }; + + $client = new PsrClickHouseAsyncClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + $logger, + ); + + try { + $client->select('SELECT throwIf(number = 2) FROM numbers(5)', new TabSeparated())->wait(); + self::fail('ServerError was not thrown.'); + } catch (ServerError $serverError) { + self::assertSame(395, $serverError->getCode()); + self::assertSame(200, $serverError->httpStatusCode); + self::assertSame('FUNCTION_THROW_IF_VALUE_IS_NON_ZERO', $serverError->clickHouseExceptionName); + } + + self::assertSame(1, $logger->startCount); + self::assertSame(1, $logger->stopCount); + } + + public function testAsyncSelectThrowsWhenStreamedExceptionInspectionWouldConsumeNonSeekableBody(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withBody(new NoSeekStream($psr17Factory->createStream("1\n"))); + + $httpClient = new class ($response) implements HttpAsyncClient { + public function __construct(private ResponseInterface $response) + { + } + + public function sendAsyncRequest(RequestInterface $request): FulfilledPromise + { + return new FulfilledPromise($this->response); + } + }; + + $client = new PsrClickHouseAsyncClient( + $httpClient, + new RequestFactory( + new ParamValueConverterRegistry(), + $psr17Factory, + $psr17Factory, + $psr17Factory, + ), + ); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.'); + + $client->select('SELECT 1', new TabSeparated())->wait(); + } + + private static function streamedExceptionBody(): string + { + return <<<'CLICKHOUSE' +0 0 +1 0 +__exception__ +abcdefghijklmnop +Code: 395. DB::Exception: Error while streaming. (FUNCTION_THROW_IF_VALUE_IS_NON_ZERO) +111 abcdefghijklmnop +__exception__ + +CLICKHOUSE; + } +} diff --git a/tests/Exception/ServerErrorTest.php b/tests/Exception/ServerErrorTest.php index 5e726f1..760bb05 100644 --- a/tests/Exception/ServerErrorTest.php +++ b/tests/Exception/ServerErrorTest.php @@ -44,4 +44,49 @@ public function testParseWithoutExceptionName(): void self::assertSame(500, $serverError->httpStatusCode); self::assertNull($serverError->clickHouseExceptionName); } + + public function testParseStreamedException(): void + { + $psr17Factory = new Psr17Factory(); + $response = $psr17Factory->createResponse(200) + ->withBody($psr17Factory->createStream(self::streamedExceptionBody())); + + $serverError = ServerError::fromResponse($response); + + self::assertSame(395, $serverError->getCode()); + self::assertSame(200, $serverError->httpStatusCode); + self::assertSame('FUNCTION_THROW_IF_VALUE_IS_NON_ZERO', $serverError->clickHouseExceptionName); + } + + public function testDetectStreamedExceptionWithHeaderTag(): void + { + self::assertTrue(ServerError::bodyContainsStreamedException( + self::streamedExceptionBody(), + 'abcdefghijklmnop', + )); + self::assertFalse(ServerError::bodyContainsStreamedException( + self::streamedExceptionBody(), + 'ponmlkjihgfedcba', + )); + } + + public function testDetectStreamedExceptionWithoutHeaderTag(): void + { + self::assertTrue(ServerError::bodyContainsStreamedException(self::streamedExceptionBody())); + self::assertFalse(ServerError::bodyContainsStreamedException("1\n2\n3\n")); + } + + private static function streamedExceptionBody(): string + { + return <<<'CLICKHOUSE' +0 0 +1 0 +__exception__ +abcdefghijklmnop +Code: 395. DB::Exception: Error while streaming. (FUNCTION_THROW_IF_VALUE_IS_NON_ZERO) +111 abcdefghijklmnop +__exception__ + +CLICKHOUSE; + } }