From cdd7ad020e165fe680703b6d3319b908b682fb7a Mon Sep 17 00:00:00 2001 From: Andrew Dolgov Date: Fri, 20 Oct 2023 17:12:29 +0300 Subject: jaeger-client -> opentelemetry --- .../sdk/Common/Export/Http/PsrTransport.php | 168 +++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 vendor/open-telemetry/sdk/Common/Export/Http/PsrTransport.php (limited to 'vendor/open-telemetry/sdk/Common/Export/Http/PsrTransport.php') diff --git a/vendor/open-telemetry/sdk/Common/Export/Http/PsrTransport.php b/vendor/open-telemetry/sdk/Common/Export/Http/PsrTransport.php new file mode 100644 index 000000000..a53e5b80a --- /dev/null +++ b/vendor/open-telemetry/sdk/Common/Export/Http/PsrTransport.php @@ -0,0 +1,168 @@ + + */ +final class PsrTransport implements TransportInterface +{ + private ClientInterface $client; + private RequestFactoryInterface $requestFactory; + private StreamFactoryInterface $streamFactory; + + private string $endpoint; + private string $contentType; + private array $headers; + private array $compression; + private int $retryDelay; + private int $maxRetries; + + private bool $closed = false; + + /** + * @psalm-param CONTENT_TYPE $contentType + */ + public function __construct( + ClientInterface $client, + RequestFactoryInterface $requestFactory, + StreamFactoryInterface $streamFactory, + string $endpoint, + string $contentType, + array $headers, + array $compression, + int $retryDelay, + int $maxRetries + ) { + $this->client = $client; + $this->requestFactory = $requestFactory; + $this->streamFactory = $streamFactory; + $this->endpoint = $endpoint; + $this->contentType = $contentType; + $this->headers = $headers; + $this->compression = $compression; + $this->retryDelay = $retryDelay; + $this->maxRetries = $maxRetries; + } + + public function contentType(): string + { + return $this->contentType; + } + + public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface + { + if ($this->closed) { + return new ErrorFuture(new BadMethodCallException('Transport closed')); + } + + $body = PsrUtils::encode($payload, $this->compression, $appliedEncodings); + $request = $this->requestFactory + ->createRequest('POST', $this->endpoint) + ->withBody($this->streamFactory->createStream($body)) + ->withHeader('Content-Type', $this->contentType) + ; + if ($appliedEncodings) { + $request = $request->withHeader('Content-Encoding', $appliedEncodings); + } + foreach ($this->headers as $header => $value) { + $request = $request->withAddedHeader($header, $value); + } + + for ($retries = 0;; $retries++) { + $response = null; + $e = null; + + try { + $response = $this->client->sendRequest($request); + if ($response->getStatusCode() >= 200 && $response->getStatusCode() < 300) { + break; + } + + if ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500 && !in_array($response->getStatusCode(), [408, 429], true)) { + throw new RuntimeException($response->getReasonPhrase(), $response->getStatusCode()); + } + } catch (NetworkExceptionInterface $e) { + } catch (Throwable $e) { + return new ErrorFuture($e); + } + + if ($retries >= $this->maxRetries) { + return new ErrorFuture(new RuntimeException('Export retry limit exceeded', 0, $e)); + } + + $delay = PsrUtils::retryDelay($retries, $this->retryDelay, $response); + $sec = (int) $delay; + $nsec = (int) (($delay - $sec) * 1e9); + + /** @psalm-suppress ArgumentTypeCoercion */ + if (time_nanosleep($sec, $nsec) !== true) { + return new ErrorFuture(new RuntimeException('Export cancelled', 0, $e)); + } + } + + assert(isset($response)); + + try { + $body = PsrUtils::decode( + $response->getBody()->__toString(), + self::parseContentEncoding($response), + ); + } catch (Throwable $e) { + return new ErrorFuture($e); + } + + return new CompletedFuture($body); + } + + private static function parseContentEncoding(ResponseInterface $response): array + { + $encodings = []; + foreach (explode(',', $response->getHeaderLine('Content-Encoding')) as $encoding) { + if (($encoding = trim($encoding, " \t")) !== '') { + $encodings[] = strtolower($encoding); + } + } + + return $encodings; + } + + public function shutdown(?CancellationInterface $cancellation = null): bool + { + if ($this->closed) { + return false; + } + + $this->closed = true; + + return true; + } + + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + return !$this->closed; + } +} -- cgit v1.2.3