diff options
Diffstat (limited to 'vendor/open-telemetry/sdk/Common/Export/Stream')
-rw-r--r-- | vendor/open-telemetry/sdk/Common/Export/Stream/StreamTransport.php | 97 | ||||
-rw-r--r-- | vendor/open-telemetry/sdk/Common/Export/Stream/StreamTransportFactory.php | 118 |
2 files changed, 215 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Common/Export/Stream/StreamTransport.php b/vendor/open-telemetry/sdk/Common/Export/Stream/StreamTransport.php new file mode 100644 index 000000000..4b99cf756 --- /dev/null +++ b/vendor/open-telemetry/sdk/Common/Export/Stream/StreamTransport.php @@ -0,0 +1,97 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Common\Export\Stream; + +use BadMethodCallException; +use ErrorException; +use function fflush; +use function fwrite; +use OpenTelemetry\SDK\Common\Export\TransportInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Common\Future\CompletedFuture; +use OpenTelemetry\SDK\Common\Future\ErrorFuture; +use OpenTelemetry\SDK\Common\Future\FutureInterface; +use function restore_error_handler; +use RuntimeException; +use function set_error_handler; +use function strlen; +use Throwable; + +/** + * @internal + * + * @psalm-template CONTENT_TYPE of string + * @template-implements TransportInterface<CONTENT_TYPE> + */ +final class StreamTransport implements TransportInterface +{ + /** + * @var resource|null + */ + private $stream; + private string $contentType; + + /** + * @param resource $stream + * + * @psalm-param CONTENT_TYPE $contentType + */ + public function __construct($stream, string $contentType) + { + $this->stream = $stream; + $this->contentType = $contentType; + } + + public function contentType(): string + { + return $this->contentType; + } + + public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface + { + if (!$this->stream) { + return new ErrorFuture(new BadMethodCallException('Transport closed')); + } + + set_error_handler(static function (int $errno, string $errstr, string $errfile, int $errline): bool { + throw new ErrorException($errstr, 0, $errno, $errfile, $errline); + }); + + try { + $bytesWritten = fwrite($this->stream, $payload); + } catch (Throwable $e) { + return new ErrorFuture($e); + } finally { + restore_error_handler(); + } + + if ($bytesWritten !== strlen($payload)) { + return new ErrorFuture(new RuntimeException(sprintf('Write failure, wrote %d of %d bytes', $bytesWritten, strlen($payload)))); + } + + return new CompletedFuture(null); + } + + public function shutdown(?CancellationInterface $cancellation = null): bool + { + if (!$this->stream) { + return false; + } + + $flush = @fflush($this->stream); + $this->stream = null; + + return $flush; + } + + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + if (!$this->stream) { + return false; + } + + return @fflush($this->stream); + } +} diff --git a/vendor/open-telemetry/sdk/Common/Export/Stream/StreamTransportFactory.php b/vendor/open-telemetry/sdk/Common/Export/Stream/StreamTransportFactory.php new file mode 100644 index 000000000..59e411318 --- /dev/null +++ b/vendor/open-telemetry/sdk/Common/Export/Stream/StreamTransportFactory.php @@ -0,0 +1,118 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Common\Export\Stream; + +use ErrorException; +use function fopen; +use function implode; +use function is_resource; +use LogicException; +use OpenTelemetry\SDK\Common\Export\TransportFactoryInterface; +use OpenTelemetry\SDK\Common\Export\TransportInterface; +use function restore_error_handler; +use function set_error_handler; +use function sprintf; +use function stream_context_create; + +/** + * @psalm-internal \OpenTelemetry + */ +final class StreamTransportFactory implements TransportFactoryInterface +{ + /** + * @param string|resource $endpoint + * @param array<string, string|string[]> $headers + * @param string|string[]|null $compression + * + * @psalm-template CONTENT_TYPE of string + * @psalm-param CONTENT_TYPE $contentType + * @psalm-return TransportInterface<CONTENT_TYPE> + */ + public function create( + $endpoint, + string $contentType, + array $headers = [], + $compression = null, + float $timeout = 10., + int $retryDelay = 100, + int $maxRetries = 3, + ?string $cacert = null, + ?string $cert = null, + ?string $key = null + ): TransportInterface { + assert(!empty($endpoint)); + $stream = is_resource($endpoint) + ? $endpoint + : self::createStream( + $endpoint, + $contentType, + $headers, + $timeout, + $cacert, + $cert, + $key, + ); + + return new StreamTransport($stream, $contentType); + } + + /** + * @throws ErrorException + * @return resource + */ + private static function createStream( + string $endpoint, + string $contentType, + array $headers = [], + float $timeout = 10., + ?string $cacert = null, + ?string $cert = null, + ?string $key = null + ) { + $context = stream_context_create([ + 'http' => [ + 'method' => 'POST', + 'header' => self::createHeaderArray($contentType, $headers), + 'timeout' => $timeout, + ], + 'ssl' => [ + 'cafile' => $cacert, + 'local_cert' => $cert, + 'local_pk' => $key, + ], + ]); + + set_error_handler(static function (int $errno, string $errstr, string $errfile, int $errline): bool { + throw new ErrorException($errstr, 0, $errno, $errfile, $errline); + }); + + /** + * @psalm-suppress PossiblyNullArgument + */ + try { + $stream = fopen($endpoint, 'ab', false, $context); + } finally { + restore_error_handler(); + } + + /** @phan-suppress-next-line PhanPossiblyUndeclaredVariable */ + if (!$stream) { + throw new LogicException(sprintf('Failed opening stream "%s"', $endpoint)); + } + + return $stream; + } + + private static function createHeaderArray(string $contentType, array $headers): array + { + $header = []; + $header[] = sprintf('Content-Type: %s', $contentType); + foreach ($headers as $name => $value) { + $header[] = sprintf('%s: %s', $name, implode(', ', (array) $value)); + } + + return $header; + } +} |