From 8f3646a9c93a06f76f6abb31020fdb74b4b1fc59 Mon Sep 17 00:00:00 2001 From: Andrew Dolgov Date: Sun, 9 Apr 2023 20:50:33 +0300 Subject: exp: jaeger tracing --- .../src/Jaeger/Sender/UdpSender.php | 305 +++++++++++++++++++++ 1 file changed, 305 insertions(+) create mode 100644 vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/UdpSender.php (limited to 'vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/UdpSender.php') diff --git a/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/UdpSender.php b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/UdpSender.php new file mode 100644 index 000000000..ff0e4cf3e --- /dev/null +++ b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/UdpSender.php @@ -0,0 +1,305 @@ +client = $client; + $this->maxBufferLength = $maxBufferLength; + $this->logger = $logger ?? new NullLogger(); + } + + /** + * @param JaegerSpan $span + */ + public function append(JaegerSpan $span) + { + $this->spans[] = $span; + } + + /** + * @return int the number of flushed spans + */ + public function flush(): int + { + $count = count($this->spans); + if ($count === 0) { + return 0; + } + + $zipkinSpans = $this->makeZipkinBatch($this->spans); + + try { + $this->send($zipkinSpans); + } catch (Exception $e) { + $this->logger->warning($e->getMessage()); + } + + $this->spans = []; + + return $count; + } + + public function close() + { + } + + /** + * Emits the thrift-objects. + * + * @param array|ThriftSpan[]|TBase[] $thrifts + */ + private function send(array $thrifts) + { + foreach ($this->chunkSplit($thrifts) as $chunk) { + /* @var $chunk ThriftSpan[] */ + $this->client->emitZipkinBatch($chunk); + } + } + + /** + * @param JaegerSpan[] $spans + * @return ThriftSpan[] + */ + private function makeZipkinBatch(array $spans): array + { + /** @var ThriftSpan[] */ + $zipkinSpans = []; + + foreach ($spans as $span) { + /** @var JaegerSpan $span */ + + $endpoint = $this->makeEndpoint( + $span->getTracer()->getIpAddress(), + 0, // span.port, + $span->getTracer()->getServiceName() + ); + + $timestamp = $span->getStartTime(); + $duration = $span->getEndTime() - $span->getStartTime(); + + $this->addZipkinAnnotations($span, $endpoint); + + $zipkinSpan = new ThriftSpan([ + 'name' => $span->getOperationName(), + 'id' => $span->getContext()->getSpanId(), + 'parent_id' => $span->getContext()->getParentId() ?? null, + 'trace_id' => $span->getContext()->getTraceId(), + 'annotations' => $this->createAnnotations($span, $endpoint), + 'binary_annotations' => $span->getTags(), + 'debug' => $span->isDebug(), + 'timestamp' => $timestamp, + 'duration' => $duration, + ]); + + $zipkinSpans[] = $zipkinSpan; + } + + return $zipkinSpans; + } + + private function addZipkinAnnotations(JaegerSpan $span, Endpoint $endpoint) + { + if ($span->isRpc() && $span->peer) { + $isClient = $span->isRpcClient(); + + $host = $this->makeEndpoint( + $span->peer['ipv4'] ?? 0, + $span->peer['port'] ?? 0, + $span->peer['service_name'] ?? '' + ); + + $key = ($isClient) ? self::SERVER_ADDR : self::CLIENT_ADDR; + + $peer = $this->makePeerAddressTag($key, $host); + $span->tags[$key] = $peer; + } else { + $tag = $this->makeLocalComponentTag( + $span->getComponent() ?? $span->getTracer()->getServiceName(), + $endpoint + ); + + $span->tags[COMPONENT] = $tag; + } + } + + private function makeLocalComponentTag(string $componentName, Endpoint $endpoint): BinaryAnnotation + { + return new BinaryAnnotation([ + 'key' => "lc", + 'value' => $componentName, + 'annotation_type' => AnnotationType::STRING, + 'host' => $endpoint, + ]); + } + + private function makeEndpoint(string $ipv4, int $port, string $serviceName): Endpoint + { + $ipv4 = $this->ipv4ToInt($ipv4); + + return new Endpoint([ + 'ipv4' => $ipv4, + 'port' => $port, + 'service_name' => $serviceName, + ]); + } + + private function ipv4ToInt(string $ipv4): int + { + if ($ipv4 == 'localhost') { + $ipv4 = '127.0.0.1'; + } elseif ($ipv4 == '::1') { + $ipv4 = '127.0.0.1'; + } + + $long = ip2long($ipv4); + if (PHP_INT_SIZE === 8) { + return $long >> 31 ? $long - (1 << 32) : $long; + } + return $long; + } + + // Used for Zipkin binary annotations like CA/SA (client/server address). + // They are modeled as Boolean type with '0x01' as the value. + private function makePeerAddressTag(string $key, Endpoint $host): BinaryAnnotation + { + return new BinaryAnnotation([ + "key" => $key, + "value" => '0x01', + "annotation_type" => AnnotationType::BOOL, + "host" => $host, + ]); + } + + /** + * Splits an array of thrift-objects into several chunks when the buffer limit has been reached. + * + * @param array|ThriftSpan[]|TBase[] $thrifts + * + * @return array + */ + private function chunkSplit(array $thrifts): array + { + $actualBufferSize = $this->zipkinBatchOverheadLength; + $chunkId = 0; + $chunks = []; + + foreach ($thrifts as $thrift) { + $spanBufferLength = $this->getBufferLength($thrift); + + if (!empty($chunks[$chunkId]) && ($actualBufferSize + $spanBufferLength) > $this->maxBufferLength) { + // point to next chunk + ++$chunkId; + + // reset buffer size + $actualBufferSize = $this->zipkinBatchOverheadLength; + } + + if (!isset($chunks[$chunkId])) { + $chunks[$chunkId] = []; + } + + $chunks[$chunkId][] = $thrift; + $actualBufferSize += $spanBufferLength; + } + + return $chunks; + } + + /** + * Returns the length of a thrift-object. + * + * @param ThriftSpan|TBase $thrift + * + * @return int + */ + private function getBufferLength($thrift): int + { + $memoryBuffer = new TMemoryBuffer(); + + $thrift->write(new TCompactProtocol($memoryBuffer)); + + return $memoryBuffer->available(); + } + + /* + * @param JaegerSpan $span + * @param Endpoint $endpoint + * + * @return array|Annotation[] + */ + private function createAnnotations(JaegerSpan $span, Endpoint $endpoint): array + { + $annotations = []; + + foreach ($span->getLogs() as $values) { + $annotations[] = new Annotation([ + 'timestamp' => $values['timestamp'], + 'value' => json_encode($values['fields']), + 'host' => $endpoint, + ]); + } + + return $annotations; + } +} -- cgit v1.2.3