agentClient = $agentClient; $this->logger = $logger ?? new NullLogger(); $this->mapper = $mapper ?? new SpanToJaegerMapper(); } public function flush(): int { $count = count($this->spans); if ($count === 0) { return 0; } $jaegerThriftSpans = $this->makeJaegerBatch($this->spans); try { $this->send($jaegerThriftSpans); } catch (\Exception $e) { $this->logger->warning($e->getMessage()); } $this->spans = []; return $count; } public function setMaxBufferLength($maxBufferLength) { $this->maxBufferLength = $maxBufferLength; } /** * @param JaegerSpan[] $spans * @return array */ private function makeJaegerBatch(array $spans) : array { /** @var JaegerThriftSpan[] $jaegerSpans */ $jaegerSpans = []; foreach ($spans as $span) { if (empty($this->tracer)) { $this->tracer = $span->getTracer(); } $jaegerSpans[] = $this->mapper->mapSpanToJaeger($span); } return $jaegerSpans; } /** * @param JaegerThriftSpan[] $spans */ private function send(array $spans) { if (empty($this->tracer)) { return ; } $chunks = $this->chunkSplit($spans); foreach ($chunks as $chunk) { /** @var JaegerThriftSpan[] $chunk */ $this->emitJaegerBatch($chunk); } } /** * @param JaegerThriftSpan $span */ private function getBufferLength($span) { $memoryBuffer = new TMemoryBuffer(); $span->write(new TBinaryProtocol($memoryBuffer)); return $memoryBuffer->available(); } private function chunkSplit(array $spans): array { $actualBufferSize = $this->jaegerBatchOverheadLength; $chunkId = 0; $chunks = []; foreach ($spans as $span) { $spanBufferLength = $this->getBufferLength($span); if (!empty($chunks[$chunkId]) && ($actualBufferSize + $spanBufferLength) > $this->maxBufferLength) { // point to next chunk ++$chunkId; // reset buffer size $actualBufferSize = $this->jaegerBatchOverheadLength; } if (!isset($chunks[$chunkId])) { $chunks[$chunkId] = []; } $chunks[$chunkId][] = $span; $actualBufferSize += $spanBufferLength; } return $chunks; } protected function emitJaegerBatch(array $spans) { /** @var Tag[] $tags */ $tags = []; foreach ($this->tracer->getTags() as $k => $v) { if (!in_array($k, $this->mapper->getSpecialSpanTags())) { if (strpos($k, $this->mapper->getProcessTagsPrefix()) !== 0) { continue ; } $quoted = preg_quote($this->mapper->getProcessTagsPrefix()); $k = preg_replace(sprintf('/^%s/', $quoted), '', $k); } if ($k === JAEGER_HOSTNAME_TAG_KEY) { $k = "hostname"; } $tags[] = new Tag([ "key" => $k, "vType" => TagType::STRING, "vStr" => $v ]); } $tags[] = new Tag([ "key" => "format", "vType" => TagType::STRING, "vStr" => "jaeger.thrift" ]); $tags[] = new Tag([ "key" => "ip", "vType" => TagType::STRING, "vStr" => $this->tracer->getIpAddress() ]); $batch = new Batch([ "spans" => $spans, "process" => new Process([ "serviceName" => $this->tracer->getServiceName(), "tags" => $tags ]) ]); $this->agentClient->emitBatch($batch); } /** * @param JaegerSpan $span */ public function append(JaegerSpan $span) { $this->spans[] = $span; } public function close() { } }