diff options
Diffstat (limited to 'vendor/open-telemetry/sdk/Common/Export/Http')
3 files changed, 417 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Common/Export/Http/PsrTransport.php b/vendor/open-telemetry/sdk/Common/Export/Http/PsrTransport.php new file mode 100644 index 000000000..a53e5b80a --- /dev/null +++ b/vendor/open-telemetry/sdk/Common/Export/Http/PsrTransport.php @@ -0,0 +1,168 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Common\Export\Http; + +use function assert; +use BadMethodCallException; +use function explode; +use function in_array; +use OpenTelemetry\SDK\Common\Export\TransportInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Common\Future\CompletedFuture; +use OpenTelemetry\SDK\Common\Future\ErrorFuture; +use OpenTelemetry\SDK\Common\Future\FutureInterface; +use Psr\Http\Client\ClientInterface; +use Psr\Http\Client\NetworkExceptionInterface; +use Psr\Http\Message\RequestFactoryInterface; +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\StreamFactoryInterface; +use RuntimeException; +use function strtolower; +use Throwable; +use function time_nanosleep; +use function trim; + +/** + * @psalm-template CONTENT_TYPE of string + * @template-implements TransportInterface<CONTENT_TYPE> + */ +final class PsrTransport implements TransportInterface +{ + private ClientInterface $client; + private RequestFactoryInterface $requestFactory; + private StreamFactoryInterface $streamFactory; + + private string $endpoint; + private string $contentType; + private array $headers; + private array $compression; + private int $retryDelay; + private int $maxRetries; + + private bool $closed = false; + + /** + * @psalm-param CONTENT_TYPE $contentType + */ + public function __construct( + ClientInterface $client, + RequestFactoryInterface $requestFactory, + StreamFactoryInterface $streamFactory, + string $endpoint, + string $contentType, + array $headers, + array $compression, + int $retryDelay, + int $maxRetries + ) { + $this->client = $client; + $this->requestFactory = $requestFactory; + $this->streamFactory = $streamFactory; + $this->endpoint = $endpoint; + $this->contentType = $contentType; + $this->headers = $headers; + $this->compression = $compression; + $this->retryDelay = $retryDelay; + $this->maxRetries = $maxRetries; + } + + public function contentType(): string + { + return $this->contentType; + } + + public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface + { + if ($this->closed) { + return new ErrorFuture(new BadMethodCallException('Transport closed')); + } + + $body = PsrUtils::encode($payload, $this->compression, $appliedEncodings); + $request = $this->requestFactory + ->createRequest('POST', $this->endpoint) + ->withBody($this->streamFactory->createStream($body)) + ->withHeader('Content-Type', $this->contentType) + ; + if ($appliedEncodings) { + $request = $request->withHeader('Content-Encoding', $appliedEncodings); + } + foreach ($this->headers as $header => $value) { + $request = $request->withAddedHeader($header, $value); + } + + for ($retries = 0;; $retries++) { + $response = null; + $e = null; + + try { + $response = $this->client->sendRequest($request); + if ($response->getStatusCode() >= 200 && $response->getStatusCode() < 300) { + break; + } + + if ($response->getStatusCode() >= 400 && $response->getStatusCode() < 500 && !in_array($response->getStatusCode(), [408, 429], true)) { + throw new RuntimeException($response->getReasonPhrase(), $response->getStatusCode()); + } + } catch (NetworkExceptionInterface $e) { + } catch (Throwable $e) { + return new ErrorFuture($e); + } + + if ($retries >= $this->maxRetries) { + return new ErrorFuture(new RuntimeException('Export retry limit exceeded', 0, $e)); + } + + $delay = PsrUtils::retryDelay($retries, $this->retryDelay, $response); + $sec = (int) $delay; + $nsec = (int) (($delay - $sec) * 1e9); + + /** @psalm-suppress ArgumentTypeCoercion */ + if (time_nanosleep($sec, $nsec) !== true) { + return new ErrorFuture(new RuntimeException('Export cancelled', 0, $e)); + } + } + + assert(isset($response)); + + try { + $body = PsrUtils::decode( + $response->getBody()->__toString(), + self::parseContentEncoding($response), + ); + } catch (Throwable $e) { + return new ErrorFuture($e); + } + + return new CompletedFuture($body); + } + + private static function parseContentEncoding(ResponseInterface $response): array + { + $encodings = []; + foreach (explode(',', $response->getHeaderLine('Content-Encoding')) as $encoding) { + if (($encoding = trim($encoding, " \t")) !== '') { + $encodings[] = strtolower($encoding); + } + } + + return $encodings; + } + + public function shutdown(?CancellationInterface $cancellation = null): bool + { + if ($this->closed) { + return false; + } + + $this->closed = true; + + return true; + } + + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + return !$this->closed; + } +} diff --git a/vendor/open-telemetry/sdk/Common/Export/Http/PsrTransportFactory.php b/vendor/open-telemetry/sdk/Common/Export/Http/PsrTransportFactory.php new file mode 100644 index 000000000..5ef78d82c --- /dev/null +++ b/vendor/open-telemetry/sdk/Common/Export/Http/PsrTransportFactory.php @@ -0,0 +1,74 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Common\Export\Http; + +use const FILTER_VALIDATE_URL; +use function filter_var; +use Http\Discovery\Psr17FactoryDiscovery; +use Http\Discovery\Psr18ClientDiscovery; +use InvalidArgumentException; +use OpenTelemetry\SDK\Common\Export\TransportFactoryInterface; +use Psr\Http\Client\ClientInterface; +use Psr\Http\Message\RequestFactoryInterface; +use Psr\Http\Message\StreamFactoryInterface; + +final class PsrTransportFactory implements TransportFactoryInterface +{ + private ClientInterface $client; + private RequestFactoryInterface $requestFactory; + private StreamFactoryInterface $streamFactory; + + public function __construct( + ClientInterface $client, + RequestFactoryInterface $requestFactory, + StreamFactoryInterface $streamFactory + ) { + $this->client = $client; + $this->requestFactory = $requestFactory; + $this->streamFactory = $streamFactory; + } + + /** + * @phan-suppress PhanTypeMismatchArgumentNullable + */ + public function create( + string $endpoint, + string $contentType, + array $headers = [], + $compression = null, + float $timeout = 10., + int $retryDelay = 100, + int $maxRetries = 3, + ?string $cacert = null, + ?string $cert = null, + ?string $key = null + ): PsrTransport { + if (!filter_var($endpoint, FILTER_VALIDATE_URL)) { + throw new InvalidArgumentException(sprintf('Invalid endpoint url "%s"', $endpoint)); + } + assert(!empty($endpoint)); + + return new PsrTransport( + $this->client, + $this->requestFactory, + $this->streamFactory, + $endpoint, + $contentType, + $headers, + PsrUtils::compression($compression), + $retryDelay, + $maxRetries, + ); + } + + public static function discover(): self + { + return new self( + Psr18ClientDiscovery::find(), + Psr17FactoryDiscovery::findRequestFactory(), + Psr17FactoryDiscovery::findStreamFactory(), + ); + } +} diff --git a/vendor/open-telemetry/sdk/Common/Export/Http/PsrUtils.php b/vendor/open-telemetry/sdk/Common/Export/Http/PsrUtils.php new file mode 100644 index 000000000..eaf2f3b47 --- /dev/null +++ b/vendor/open-telemetry/sdk/Common/Export/Http/PsrUtils.php @@ -0,0 +1,175 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Common\Export\Http; + +use function array_filter; +use function array_map; +use function count; +use ErrorException; +use LogicException; +use function max; +use OpenTelemetry\SDK\Common\Export\TransportFactoryInterface; +use Psr\Http\Message\ResponseInterface; +use function rand; +use function restore_error_handler; +use function set_error_handler; +use function sprintf; +use function strcasecmp; +use function strtotime; +use Throwable; +use function time; +use function trim; +use UnexpectedValueException; + +/** + * @internal + */ +final class PsrUtils +{ + /** + * @param int $retry zero-indexed attempt number + * @param int $retryDelay initial delay in milliseconds + * @param ResponseInterface|null $response response of failed request + * @return float delay in seconds + */ + public static function retryDelay(int $retry, int $retryDelay, ?ResponseInterface $response = null): float + { + $delay = $retryDelay << $retry; + $delay = rand($delay >> 1, $delay) / 1000; + + return max($delay, self::parseRetryAfter($response)); + } + + private static function parseRetryAfter(?ResponseInterface $response): int + { + if (!$response || !$retryAfter = $response->getHeaderLine('Retry-After')) { + return 0; + } + + $retryAfter = trim($retryAfter, " \t"); + if ($retryAfter === (string) (int) $retryAfter) { + return (int) $retryAfter; + } + + if (($time = strtotime($retryAfter)) !== false) { + return $time - time(); + } + + return 0; + } + + /** + * @param list<string> $encodings + * @param array<int, string>|null $appliedEncodings + */ + public static function encode(string $value, array $encodings, ?array &$appliedEncodings = null): string + { + for ($i = 0, $n = count($encodings); $i < $n; $i++) { + if (!$encoder = self::encoder($encodings[$i])) { + unset($encodings[$i]); + + continue; + } + + try { + $value = $encoder($value); + } catch (Throwable $e) { + unset($encodings[$i]); + } + } + + $appliedEncodings = $encodings; + + return $value; + } + + /** + * @param list<string> $encodings + */ + public static function decode(string $value, array $encodings): string + { + for ($i = count($encodings); --$i >= 0;) { + if (strcasecmp($encodings[$i], 'identity') === 0) { + continue; + } + if (!$decoder = self::decoder($encodings[$i])) { + throw new UnexpectedValueException(sprintf('Not supported decompression encoding "%s"', $encodings[$i])); + } + + $value = $decoder($value); + } + + return $value; + } + + /** + * Resolve an array or CSV of compression types to a list + */ + public static function compression($compression): array + { + if (is_array($compression)) { + return $compression; + } + if (!$compression) { + return []; + } + if (strpos($compression, ',') === false) { + return [$compression]; + } + + return array_map('trim', explode(',', $compression)); + } + + private static function encoder(string $encoding): ?callable + { + static $encoders; + + /** @noinspection SpellCheckingInspection */ + $encoders ??= array_map(fn (callable $callable): callable => self::throwOnErrorOrFalse($callable), array_filter([ + TransportFactoryInterface::COMPRESSION_GZIP => 'gzencode', + TransportFactoryInterface::COMPRESSION_DEFLATE => 'gzcompress', + TransportFactoryInterface::COMPRESSION_BROTLI => 'brotli_compress', + ], 'function_exists')); + + return $encoders[$encoding] ?? null; + } + + private static function decoder(string $encoding): ?callable + { + static $decoders; + + /** @noinspection SpellCheckingInspection */ + $decoders ??= array_map(fn (callable $callable): callable => self::throwOnErrorOrFalse($callable), array_filter([ + TransportFactoryInterface::COMPRESSION_GZIP => 'gzdecode', + TransportFactoryInterface::COMPRESSION_DEFLATE => 'gzuncompress', + TransportFactoryInterface::COMPRESSION_BROTLI => 'brotli_uncompress', + ], 'function_exists')); + + return $decoders[$encoding] ?? null; + } + + private static function throwOnErrorOrFalse(callable $callable): callable + { + return static function (...$args) use ($callable) { + set_error_handler(static function (int $errno, string $errstr, string $errfile, int $errline): bool { + throw new ErrorException($errstr, 0, $errno, $errfile, $errline); + }); + + try { + $result = $callable(...$args); + } finally { + restore_error_handler(); + } + + /** @phan-suppress-next-line PhanPossiblyUndeclaredVariable */ + if ($result === false) { + throw new LogicException(); + } + + /** @phan-suppress-next-line PhanPossiblyUndeclaredVariable */ + return $result; + }; + } +} |