$transport */ public function __construct(TransportInterface $transport) { if (!class_exists('\Google\Protobuf\Api')) { throw new RuntimeException('No protobuf implementation found (ext-protobuf or google/protobuf)'); } $this->transport = $transport; $this->serializer = ProtobufSerializer::forTransport($transport); } /** * @param iterable $batch */ public function export(iterable $batch, ?CancellationInterface $cancellation = null): FutureInterface { return $this->transport ->send($this->serializer->serialize((new LogsConverter($this->serializer))->convert($batch)), $cancellation) ->map(function (?string $payload): bool { if ($payload === null) { return true; } $serviceResponse = new ExportLogsServiceResponse(); $this->serializer->hydrate($serviceResponse, $payload); $partialSuccess = $serviceResponse->getPartialSuccess(); if ($partialSuccess !== null && $partialSuccess->getRejectedLogRecords()) { self::logError('Export partial success', [ 'rejected_logs' => $partialSuccess->getRejectedLogRecords(), 'error_message' => $partialSuccess->getErrorMessage(), ]); return false; } if ($partialSuccess !== null && $partialSuccess->getErrorMessage()) { self::logWarning('Export success with warnings/suggestions', ['error_message' => $partialSuccess->getErrorMessage()]); } return true; }) ->catch(static function (Throwable $throwable): bool { self::logError('Export failure', ['exception' => $throwable]); return false; }); } public function forceFlush(?CancellationInterface $cancellation = null): bool { return $this->transport->forceFlush($cancellation); } public function shutdown(?CancellationInterface $cancellation = null): bool { return $this->transport->shutdown($cancellation); } }