Skip to content
24 changes: 22 additions & 2 deletions src/Client/PsrClickHouseAsyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@
use Exception;
use GuzzleHttp\Promise\Create;
use GuzzleHttp\Promise\PromiseInterface;
use GuzzleHttp\Psr7\Message;
use Http\Client\HttpAsyncClient;
use Psr\Http\Message\ResponseInterface;
use RuntimeException;
use SimPod\ClickHouseClient\Client\Http\RequestFactory;
use SimPod\ClickHouseClient\Client\Http\RequestOptions;
use SimPod\ClickHouseClient\Client\Http\RequestSettings;
use SimPod\ClickHouseClient\Exception\ServerError;
use SimPod\ClickHouseClient\Format\Format;
use SimPod\ClickHouseClient\Logger\SqlLogger;
use SimPod\ClickHouseClient\Output\Output;
use SimPod\ClickHouseClient\Settings\EmptySettingsProvider;
use SimPod\ClickHouseClient\Settings\SettingsProvider;
use SimPod\ClickHouseClient\Sql\SqlFactory;
Expand Down Expand Up @@ -71,7 +72,7 @@
CLICKHOUSE,
params: $params,
settings: $settings,
processResponse: static fn (ResponseInterface $response): Output => $outputFormat::output(
processResponse: static fn (ResponseInterface $response) => $outputFormat::output(
$response->getBody()->__toString(),
),
);
Expand Down Expand Up @@ -114,13 +115,32 @@
throw ServerError::fromResponse($response);
}

$body = $response->getBody();
if (! $body->isSeekable()) {
throw new RuntimeException(
'Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.',
);
}

$bodyContent = $body->__toString();
if (
ServerError::bodyContainsStreamedException(
$bodyContent,
$response->getHeaderLine('X-ClickHouse-Exception-Tag'),
)
) {
throw ServerError::fromResponse($response);
}

Message::rewindBody($response);

Check warning on line 135 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "MethodCallRemoval": @@ @@ throw ServerError::fromResponse($response); } - Message::rewindBody($response); + if ($processResponse === null) { return $response;

if ($processResponse === null) {
return $response;
}

return $processResponse($response);
},
fn () => $this->sqlLogger?->stopQuery($id),

Check warning on line 143 in src/Client/PsrClickHouseAsyncClient.php

View workflow job for this annotation

GitHub Actions / Infection

Escaped Mutant for Mutator "NullSafeMethodCall": @@ @@ return $processResponse($response); }, - fn () => $this->sqlLogger?->stopQuery($id), + fn () => $this->sqlLogger->stopQuery($id), ); } }
);
}
}
43 changes: 38 additions & 5 deletions src/Client/PsrClickHouseClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

namespace SimPod\ClickHouseClient\Client;

use GuzzleHttp\Psr7\Message;
use InvalidArgumentException;
use Psr\Http\Client\ClientExceptionInterface;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamInterface;
use RuntimeException;
use SimPod\ClickHouseClient\Client\Http\RequestFactory;
use SimPod\ClickHouseClient\Client\Http\RequestOptions;
use SimPod\ClickHouseClient\Client\Http\RequestSettings;
Expand Down Expand Up @@ -138,6 +140,7 @@ public function selectStreamWithParams(
CLICKHOUSE,
params: $params,
settings: $settings,
detectStreamedException: false,
);

return $response->getBody();
Expand Down Expand Up @@ -313,8 +316,12 @@ public function insertPayload(
* @throws ClientExceptionInterface
* @throws UnsupportedParamType
*/
private function executeRequest(string $sql, array $params, SettingsProvider $settings): ResponseInterface
{
private function executeRequest(
string $sql,
array $params,
SettingsProvider $settings,
bool $detectStreamedException = true,
): ResponseInterface {
$request = $this->requestFactory->prepareSqlRequest(
$sql,
new RequestSettings(
Expand All @@ -326,15 +333,18 @@ private function executeRequest(string $sql, array $params, SettingsProvider $se
),
);

return $this->sendHttpRequest($request, $sql);
return $this->sendHttpRequest($request, $sql, $detectStreamedException);
}

/**
* @throws ClientExceptionInterface
* @throws ServerError
*/
private function sendHttpRequest(RequestInterface $request, string $sql): ResponseInterface
{
private function sendHttpRequest(
RequestInterface $request,
string $sql,
bool $detectStreamedException = true,
): ResponseInterface {
$id = uniqid('', true);
$this->sqlLogger?->startQuery($id, $sql);

Expand All @@ -348,6 +358,29 @@ private function sendHttpRequest(RequestInterface $request, string $sql): Respon
throw ServerError::fromResponse($response);
}

if (! $detectStreamedException) {
return $response;
}

$body = $response->getBody();
if (! $body->isSeekable()) {
throw new RuntimeException(
'Cannot inspect streamed ClickHouse exceptions on a non-seekable response body.',
);
}

$bodyContent = $body->__toString();
if (
ServerError::bodyContainsStreamedException(
$bodyContent,
$response->getHeaderLine('X-ClickHouse-Exception-Tag'),
)
) {
throw ServerError::fromResponse($response);
}

Message::rewindBody($response);

return $response;
}
}
15 changes: 14 additions & 1 deletion src/Exception/ServerError.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Psr\Http\Message\ResponseInterface;

use function preg_match;
use function preg_quote;

final class ServerError extends Exception
{
Expand All @@ -24,7 +25,7 @@ public static function fromResponse(ResponseInterface $response): self
{
$bodyContent = $response->getBody()->__toString();

$errorCode = preg_match('~^Code: (\d+). DB::Exception:~', $bodyContent, $codeMatches) === 1
$errorCode = preg_match('~(?:^|\R)Code: (\d+)\. DB::Exception:~', $bodyContent, $codeMatches) === 1
? (int) $codeMatches[1]
: 0;

Expand All @@ -39,4 +40,16 @@ public static function fromResponse(ResponseInterface $response): self
$exceptionName,
);
}

public static function bodyContainsStreamedException(string $bodyContent, string $exceptionTag = ''): bool
{
if ($exceptionTag !== '') {
return preg_match(
'~(?:^|\R)__exception__\R' . preg_quote($exceptionTag, '~') . '\RCode: \d+\. DB::Exception:~',
$bodyContent,
) === 1;
}

return preg_match('~(?:^|\R)__exception__\R[[:alnum:]]{16}\RCode: \d+\. DB::Exception:~', $bodyContent) === 1;
}
}
Loading
Loading