summaryrefslogtreecommitdiff
path: root/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender')
-rw-r--r--vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php236
-rw-r--r--vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/SenderInterface.php12
-rw-r--r--vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/UdpSender.php305
3 files changed, 553 insertions, 0 deletions
diff --git a/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php
new file mode 100644
index 000000000..dfce510b8
--- /dev/null
+++ b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php
@@ -0,0 +1,236 @@
+<?php
+
+
+namespace Jaeger\Sender;
+
+use Jaeger\Mapper\SpanToJaegerMapper;
+use Jaeger\Span as JaegerSpan;
+use Jaeger\Thrift\Agent\AgentClient;
+use Jaeger\Thrift\Agent\AgentIf;
+use Jaeger\Thrift\Batch;
+use Jaeger\Thrift\Process;
+use Jaeger\Thrift\Span as JaegerThriftSpan;
+use Jaeger\Thrift\Tag;
+use Jaeger\Thrift\TagType;
+use Jaeger\Tracer;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+use Thrift\Protocol\TBinaryProtocol;
+use Thrift\Protocol\TCompactProtocol;
+use Thrift\Transport\TMemoryBuffer;
+use const Jaeger\JAEGER_HOSTNAME_TAG_KEY;
+
+class JaegerSender implements SenderInterface
+{
+ /**
+ * @var JaegerSpan[]
+ */
+ private $spans = [];
+
+ /**
+ * @var AgentIf
+ */
+ private $agentClient;
+
+ /**
+ * @var LoggerInterface
+ */
+ private $logger;
+
+ /**
+ * @var Tracer
+ */
+ private $tracer;
+
+ /**
+ * @var SpanToJaegerMapper
+ */
+ private $mapper;
+
+ /**
+ * @var int
+ */
+ private $jaegerBatchOverheadLength = 512;
+
+ /**
+ * The maximum length of the thrift-objects for a jaeger-batch.
+ *
+ * @var int
+ */
+ private $maxBufferLength = 64000;
+
+ /**
+ * @param AgentIf $agentClient
+ * @param LoggerInterface|null $logger
+ * @param SpanToJaegerMapper|null $mapper
+ */
+ public function __construct(
+ AgentIf $agentClient,
+ LoggerInterface $logger = null,
+ SpanToJaegerMapper $mapper = null
+ ) {
+ $this->agentClient = $agentClient;
+ $this->logger = $logger ?? new NullLogger();
+ $this->mapper = $mapper ?? new SpanToJaegerMapper();
+ }
+
+
+ public function flush(): int
+ {
+ $count = count($this->spans);
+ if ($count === 0) {
+ return 0;
+ }
+
+ $jaegerThriftSpans = $this->makeJaegerBatch($this->spans);
+
+ try {
+ $this->send($jaegerThriftSpans);
+ } catch (\Exception $e) {
+ $this->logger->warning($e->getMessage());
+ }
+
+ $this->spans = [];
+
+ return $count;
+ }
+
+ public function setMaxBufferLength($maxBufferLength)
+ {
+ $this->maxBufferLength = $maxBufferLength;
+ }
+
+ /**
+ * @param JaegerSpan[] $spans
+ * @return array
+ */
+ private function makeJaegerBatch(array $spans) : array
+ {
+ /** @var JaegerThriftSpan[] $jaegerSpans */
+ $jaegerSpans = [];
+
+ foreach ($spans as $span) {
+ if (empty($this->tracer)) {
+ $this->tracer = $span->getTracer();
+ }
+
+ $jaegerSpans[] = $this->mapper->mapSpanToJaeger($span);
+ }
+
+ return $jaegerSpans;
+ }
+
+ /**
+ * @param JaegerThriftSpan[] $spans
+ */
+ private function send(array $spans)
+ {
+ if (empty($this->tracer)) {
+ return ;
+ }
+
+ $chunks = $this->chunkSplit($spans);
+ foreach ($chunks as $chunk) {
+ /** @var JaegerThriftSpan[] $chunk */
+ $this->emitJaegerBatch($chunk);
+ }
+ }
+
+ /**
+ * @param JaegerThriftSpan $span
+ */
+ private function getBufferLength($span)
+ {
+ $memoryBuffer = new TMemoryBuffer();
+ $span->write(new TBinaryProtocol($memoryBuffer));
+ return $memoryBuffer->available();
+ }
+
+ private function chunkSplit(array $spans): array
+ {
+ $actualBufferSize = $this->jaegerBatchOverheadLength;
+ $chunkId = 0;
+ $chunks = [];
+
+ foreach ($spans as $span) {
+ $spanBufferLength = $this->getBufferLength($span);
+ if (!empty($chunks[$chunkId]) && ($actualBufferSize + $spanBufferLength) > $this->maxBufferLength) {
+ // point to next chunk
+ ++$chunkId;
+
+ // reset buffer size
+ $actualBufferSize = $this->jaegerBatchOverheadLength;
+ }
+
+ if (!isset($chunks[$chunkId])) {
+ $chunks[$chunkId] = [];
+ }
+
+ $chunks[$chunkId][] = $span;
+ $actualBufferSize += $spanBufferLength;
+ }
+
+ return $chunks;
+ }
+
+ protected function emitJaegerBatch(array $spans)
+ {
+ /** @var Tag[] $tags */
+ $tags = [];
+
+ foreach ($this->tracer->getTags() as $k => $v) {
+ if (!in_array($k, $this->mapper->getSpecialSpanTags())) {
+ if (strpos($k, $this->mapper->getProcessTagsPrefix()) !== 0) {
+ continue ;
+ }
+
+ $quoted = preg_quote($this->mapper->getProcessTagsPrefix());
+ $k = preg_replace(sprintf('/^%s/', $quoted), '', $k);
+ }
+
+ if ($k === JAEGER_HOSTNAME_TAG_KEY) {
+ $k = "hostname";
+ }
+
+ $tags[] = new Tag([
+ "key" => $k,
+ "vType" => TagType::STRING,
+ "vStr" => $v
+ ]);
+ }
+
+ $tags[] = new Tag([
+ "key" => "format",
+ "vType" => TagType::STRING,
+ "vStr" => "jaeger.thrift"
+ ]);
+
+ $tags[] = new Tag([
+ "key" => "ip",
+ "vType" => TagType::STRING,
+ "vStr" => $this->tracer->getIpAddress()
+ ]);
+
+ $batch = new Batch([
+ "spans" => $spans,
+ "process" => new Process([
+ "serviceName" => $this->tracer->getServiceName(),
+ "tags" => $tags
+ ])
+ ]);
+
+ $this->agentClient->emitBatch($batch);
+ }
+
+ /**
+ * @param JaegerSpan $span
+ */
+ public function append(JaegerSpan $span)
+ {
+ $this->spans[] = $span;
+ }
+
+ public function close()
+ {
+ }
+}
diff --git a/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/SenderInterface.php b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/SenderInterface.php
new file mode 100644
index 000000000..24dc4fbfa
--- /dev/null
+++ b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/SenderInterface.php
@@ -0,0 +1,12 @@
+<?php
+
+namespace Jaeger\Sender;
+
+use Jaeger\Span as JaegerSpan;
+
+interface SenderInterface
+{
+ public function flush(): int;
+ public function append(JaegerSpan $span);
+ public function close();
+}
diff --git a/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/UdpSender.php b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/UdpSender.php
new file mode 100644
index 000000000..ff0e4cf3e
--- /dev/null
+++ b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/UdpSender.php
@@ -0,0 +1,305 @@
+<?php
+
+namespace Jaeger\Sender;
+
+use Exception;
+use Jaeger\Thrift\Agent\AgentClient;
+use Jaeger\Thrift\Agent\Zipkin\Annotation;
+use Jaeger\Thrift\Agent\Zipkin\AnnotationType;
+use Jaeger\Thrift\Agent\Zipkin\BinaryAnnotation;
+use Jaeger\Thrift\Agent\Zipkin\Endpoint;
+use Jaeger\Thrift\Agent\Zipkin\Span as ThriftSpan;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+use Thrift\Base\TBase;
+use Thrift\Protocol\TCompactProtocol;
+use Thrift\Transport\TMemoryBuffer;
+use Jaeger\Span as JaegerSpan;
+
+use const OpenTracing\Tags\COMPONENT;
+
+class UdpSender
+{
+ const CLIENT_ADDR = "ca";
+ const SERVER_ADDR = "sa";
+
+ /**
+ * @var JaegerSpan[]
+ */
+ private $spans = [];
+
+ /**
+ * @var AgentClient
+ */
+ private $client;
+
+ /**
+ * @var LoggerInterface
+ */
+ private $logger;
+
+ /**
+ * The maximum length of the thrift-objects for a zipkin-batch.
+ *
+ * @var int
+ */
+ private $maxBufferLength;
+
+ /**
+ * The length of the zipkin-batch overhead.
+ *
+ * @var int
+ */
+ private $zipkinBatchOverheadLength = 30;
+
+ /**
+ * UdpSender constructor.
+ *
+ * @param AgentClient $client
+ * @param int $maxBufferLength
+ * @param LoggerInterface|null $logger
+ */
+ public function __construct(
+ AgentClient $client,
+ int $maxBufferLength,
+ LoggerInterface $logger = null
+ ) {
+ $this->client = $client;
+ $this->maxBufferLength = $maxBufferLength;
+ $this->logger = $logger ?? new NullLogger();
+ }
+
+ /**
+ * @param JaegerSpan $span
+ */
+ public function append(JaegerSpan $span)
+ {
+ $this->spans[] = $span;
+ }
+
+ /**
+ * @return int the number of flushed spans
+ */
+ public function flush(): int
+ {
+ $count = count($this->spans);
+ if ($count === 0) {
+ return 0;
+ }
+
+ $zipkinSpans = $this->makeZipkinBatch($this->spans);
+
+ try {
+ $this->send($zipkinSpans);
+ } catch (Exception $e) {
+ $this->logger->warning($e->getMessage());
+ }
+
+ $this->spans = [];
+
+ return $count;
+ }
+
+ public function close()
+ {
+ }
+
+ /**
+ * Emits the thrift-objects.
+ *
+ * @param array|ThriftSpan[]|TBase[] $thrifts
+ */
+ private function send(array $thrifts)
+ {
+ foreach ($this->chunkSplit($thrifts) as $chunk) {
+ /* @var $chunk ThriftSpan[] */
+ $this->client->emitZipkinBatch($chunk);
+ }
+ }
+
+ /**
+ * @param JaegerSpan[] $spans
+ * @return ThriftSpan[]
+ */
+ private function makeZipkinBatch(array $spans): array
+ {
+ /** @var ThriftSpan[] */
+ $zipkinSpans = [];
+
+ foreach ($spans as $span) {
+ /** @var JaegerSpan $span */
+
+ $endpoint = $this->makeEndpoint(
+ $span->getTracer()->getIpAddress(),
+ 0, // span.port,
+ $span->getTracer()->getServiceName()
+ );
+
+ $timestamp = $span->getStartTime();
+ $duration = $span->getEndTime() - $span->getStartTime();
+
+ $this->addZipkinAnnotations($span, $endpoint);
+
+ $zipkinSpan = new ThriftSpan([
+ 'name' => $span->getOperationName(),
+ 'id' => $span->getContext()->getSpanId(),
+ 'parent_id' => $span->getContext()->getParentId() ?? null,
+ 'trace_id' => $span->getContext()->getTraceId(),
+ 'annotations' => $this->createAnnotations($span, $endpoint),
+ 'binary_annotations' => $span->getTags(),
+ 'debug' => $span->isDebug(),
+ 'timestamp' => $timestamp,
+ 'duration' => $duration,
+ ]);
+
+ $zipkinSpans[] = $zipkinSpan;
+ }
+
+ return $zipkinSpans;
+ }
+
+ private function addZipkinAnnotations(JaegerSpan $span, Endpoint $endpoint)
+ {
+ if ($span->isRpc() && $span->peer) {
+ $isClient = $span->isRpcClient();
+
+ $host = $this->makeEndpoint(
+ $span->peer['ipv4'] ?? 0,
+ $span->peer['port'] ?? 0,
+ $span->peer['service_name'] ?? ''
+ );
+
+ $key = ($isClient) ? self::SERVER_ADDR : self::CLIENT_ADDR;
+
+ $peer = $this->makePeerAddressTag($key, $host);
+ $span->tags[$key] = $peer;
+ } else {
+ $tag = $this->makeLocalComponentTag(
+ $span->getComponent() ?? $span->getTracer()->getServiceName(),
+ $endpoint
+ );
+
+ $span->tags[COMPONENT] = $tag;
+ }
+ }
+
+ private function makeLocalComponentTag(string $componentName, Endpoint $endpoint): BinaryAnnotation
+ {
+ return new BinaryAnnotation([
+ 'key' => "lc",
+ 'value' => $componentName,
+ 'annotation_type' => AnnotationType::STRING,
+ 'host' => $endpoint,
+ ]);
+ }
+
+ private function makeEndpoint(string $ipv4, int $port, string $serviceName): Endpoint
+ {
+ $ipv4 = $this->ipv4ToInt($ipv4);
+
+ return new Endpoint([
+ 'ipv4' => $ipv4,
+ 'port' => $port,
+ 'service_name' => $serviceName,
+ ]);
+ }
+
+ private function ipv4ToInt(string $ipv4): int
+ {
+ if ($ipv4 == 'localhost') {
+ $ipv4 = '127.0.0.1';
+ } elseif ($ipv4 == '::1') {
+ $ipv4 = '127.0.0.1';
+ }
+
+ $long = ip2long($ipv4);
+ if (PHP_INT_SIZE === 8) {
+ return $long >> 31 ? $long - (1 << 32) : $long;
+ }
+ return $long;
+ }
+
+ // Used for Zipkin binary annotations like CA/SA (client/server address).
+ // They are modeled as Boolean type with '0x01' as the value.
+ private function makePeerAddressTag(string $key, Endpoint $host): BinaryAnnotation
+ {
+ return new BinaryAnnotation([
+ "key" => $key,
+ "value" => '0x01',
+ "annotation_type" => AnnotationType::BOOL,
+ "host" => $host,
+ ]);
+ }
+
+ /**
+ * Splits an array of thrift-objects into several chunks when the buffer limit has been reached.
+ *
+ * @param array|ThriftSpan[]|TBase[] $thrifts
+ *
+ * @return array
+ */
+ private function chunkSplit(array $thrifts): array
+ {
+ $actualBufferSize = $this->zipkinBatchOverheadLength;
+ $chunkId = 0;
+ $chunks = [];
+
+ foreach ($thrifts as $thrift) {
+ $spanBufferLength = $this->getBufferLength($thrift);
+
+ if (!empty($chunks[$chunkId]) && ($actualBufferSize + $spanBufferLength) > $this->maxBufferLength) {
+ // point to next chunk
+ ++$chunkId;
+
+ // reset buffer size
+ $actualBufferSize = $this->zipkinBatchOverheadLength;
+ }
+
+ if (!isset($chunks[$chunkId])) {
+ $chunks[$chunkId] = [];
+ }
+
+ $chunks[$chunkId][] = $thrift;
+ $actualBufferSize += $spanBufferLength;
+ }
+
+ return $chunks;
+ }
+
+ /**
+ * Returns the length of a thrift-object.
+ *
+ * @param ThriftSpan|TBase $thrift
+ *
+ * @return int
+ */
+ private function getBufferLength($thrift): int
+ {
+ $memoryBuffer = new TMemoryBuffer();
+
+ $thrift->write(new TCompactProtocol($memoryBuffer));
+
+ return $memoryBuffer->available();
+ }
+
+ /*
+ * @param JaegerSpan $span
+ * @param Endpoint $endpoint
+ *
+ * @return array|Annotation[]
+ */
+ private function createAnnotations(JaegerSpan $span, Endpoint $endpoint): array
+ {
+ $annotations = [];
+
+ foreach ($span->getLogs() as $values) {
+ $annotations[] = new Annotation([
+ 'timestamp' => $values['timestamp'],
+ 'value' => json_encode($values['fields']),
+ 'host' => $endpoint,
+ ]);
+ }
+
+ return $annotations;
+ }
+}