*/ final class StreamTransport implements TransportInterface { /** * @var resource|null */ private $stream; private string $contentType; /** * @param resource $stream * * @psalm-param CONTENT_TYPE $contentType */ public function __construct($stream, string $contentType) { $this->stream = $stream; $this->contentType = $contentType; } public function contentType(): string { return $this->contentType; } public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface { if (!$this->stream) { return new ErrorFuture(new BadMethodCallException('Transport closed')); } set_error_handler(static function (int $errno, string $errstr, string $errfile, int $errline): bool { throw new ErrorException($errstr, 0, $errno, $errfile, $errline); }); try { $bytesWritten = fwrite($this->stream, $payload); } catch (Throwable $e) { return new ErrorFuture($e); } finally { restore_error_handler(); } if ($bytesWritten !== strlen($payload)) { return new ErrorFuture(new RuntimeException(sprintf('Write failure, wrote %d of %d bytes', $bytesWritten, strlen($payload)))); } return new CompletedFuture(null); } public function shutdown(?CancellationInterface $cancellation = null): bool { if (!$this->stream) { return false; } $flush = @fflush($this->stream); $this->stream = null; return $flush; } public function forceFlush(?CancellationInterface $cancellation = null): bool { if (!$this->stream) { return false; } return @fflush($this->stream); } }