summaryrefslogtreecommitdiff
path: root/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php')
-rw-r--r--vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php236
1 files changed, 236 insertions, 0 deletions
diff --git a/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php
new file mode 100644
index 000000000..dfce510b8
--- /dev/null
+++ b/vendor/jonahgeorge/jaeger-client-php/src/Jaeger/Sender/JaegerSender.php
@@ -0,0 +1,236 @@
+<?php
+
+
+namespace Jaeger\Sender;
+
+use Jaeger\Mapper\SpanToJaegerMapper;
+use Jaeger\Span as JaegerSpan;
+use Jaeger\Thrift\Agent\AgentClient;
+use Jaeger\Thrift\Agent\AgentIf;
+use Jaeger\Thrift\Batch;
+use Jaeger\Thrift\Process;
+use Jaeger\Thrift\Span as JaegerThriftSpan;
+use Jaeger\Thrift\Tag;
+use Jaeger\Thrift\TagType;
+use Jaeger\Tracer;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+use Thrift\Protocol\TBinaryProtocol;
+use Thrift\Protocol\TCompactProtocol;
+use Thrift\Transport\TMemoryBuffer;
+use const Jaeger\JAEGER_HOSTNAME_TAG_KEY;
+
+class JaegerSender implements SenderInterface
+{
+ /**
+ * @var JaegerSpan[]
+ */
+ private $spans = [];
+
+ /**
+ * @var AgentIf
+ */
+ private $agentClient;
+
+ /**
+ * @var LoggerInterface
+ */
+ private $logger;
+
+ /**
+ * @var Tracer
+ */
+ private $tracer;
+
+ /**
+ * @var SpanToJaegerMapper
+ */
+ private $mapper;
+
+ /**
+ * @var int
+ */
+ private $jaegerBatchOverheadLength = 512;
+
+ /**
+ * The maximum length of the thrift-objects for a jaeger-batch.
+ *
+ * @var int
+ */
+ private $maxBufferLength = 64000;
+
+ /**
+ * @param AgentIf $agentClient
+ * @param LoggerInterface|null $logger
+ * @param SpanToJaegerMapper|null $mapper
+ */
+ public function __construct(
+ AgentIf $agentClient,
+ LoggerInterface $logger = null,
+ SpanToJaegerMapper $mapper = null
+ ) {
+ $this->agentClient = $agentClient;
+ $this->logger = $logger ?? new NullLogger();
+ $this->mapper = $mapper ?? new SpanToJaegerMapper();
+ }
+
+
+ public function flush(): int
+ {
+ $count = count($this->spans);
+ if ($count === 0) {
+ return 0;
+ }
+
+ $jaegerThriftSpans = $this->makeJaegerBatch($this->spans);
+
+ try {
+ $this->send($jaegerThriftSpans);
+ } catch (\Exception $e) {
+ $this->logger->warning($e->getMessage());
+ }
+
+ $this->spans = [];
+
+ return $count;
+ }
+
+ public function setMaxBufferLength($maxBufferLength)
+ {
+ $this->maxBufferLength = $maxBufferLength;
+ }
+
+ /**
+ * @param JaegerSpan[] $spans
+ * @return array
+ */
+ private function makeJaegerBatch(array $spans) : array
+ {
+ /** @var JaegerThriftSpan[] $jaegerSpans */
+ $jaegerSpans = [];
+
+ foreach ($spans as $span) {
+ if (empty($this->tracer)) {
+ $this->tracer = $span->getTracer();
+ }
+
+ $jaegerSpans[] = $this->mapper->mapSpanToJaeger($span);
+ }
+
+ return $jaegerSpans;
+ }
+
+ /**
+ * @param JaegerThriftSpan[] $spans
+ */
+ private function send(array $spans)
+ {
+ if (empty($this->tracer)) {
+ return ;
+ }
+
+ $chunks = $this->chunkSplit($spans);
+ foreach ($chunks as $chunk) {
+ /** @var JaegerThriftSpan[] $chunk */
+ $this->emitJaegerBatch($chunk);
+ }
+ }
+
+ /**
+ * @param JaegerThriftSpan $span
+ */
+ private function getBufferLength($span)
+ {
+ $memoryBuffer = new TMemoryBuffer();
+ $span->write(new TBinaryProtocol($memoryBuffer));
+ return $memoryBuffer->available();
+ }
+
+ private function chunkSplit(array $spans): array
+ {
+ $actualBufferSize = $this->jaegerBatchOverheadLength;
+ $chunkId = 0;
+ $chunks = [];
+
+ foreach ($spans as $span) {
+ $spanBufferLength = $this->getBufferLength($span);
+ if (!empty($chunks[$chunkId]) && ($actualBufferSize + $spanBufferLength) > $this->maxBufferLength) {
+ // point to next chunk
+ ++$chunkId;
+
+ // reset buffer size
+ $actualBufferSize = $this->jaegerBatchOverheadLength;
+ }
+
+ if (!isset($chunks[$chunkId])) {
+ $chunks[$chunkId] = [];
+ }
+
+ $chunks[$chunkId][] = $span;
+ $actualBufferSize += $spanBufferLength;
+ }
+
+ return $chunks;
+ }
+
+ protected function emitJaegerBatch(array $spans)
+ {
+ /** @var Tag[] $tags */
+ $tags = [];
+
+ foreach ($this->tracer->getTags() as $k => $v) {
+ if (!in_array($k, $this->mapper->getSpecialSpanTags())) {
+ if (strpos($k, $this->mapper->getProcessTagsPrefix()) !== 0) {
+ continue ;
+ }
+
+ $quoted = preg_quote($this->mapper->getProcessTagsPrefix());
+ $k = preg_replace(sprintf('/^%s/', $quoted), '', $k);
+ }
+
+ if ($k === JAEGER_HOSTNAME_TAG_KEY) {
+ $k = "hostname";
+ }
+
+ $tags[] = new Tag([
+ "key" => $k,
+ "vType" => TagType::STRING,
+ "vStr" => $v
+ ]);
+ }
+
+ $tags[] = new Tag([
+ "key" => "format",
+ "vType" => TagType::STRING,
+ "vStr" => "jaeger.thrift"
+ ]);
+
+ $tags[] = new Tag([
+ "key" => "ip",
+ "vType" => TagType::STRING,
+ "vStr" => $this->tracer->getIpAddress()
+ ]);
+
+ $batch = new Batch([
+ "spans" => $spans,
+ "process" => new Process([
+ "serviceName" => $this->tracer->getServiceName(),
+ "tags" => $tags
+ ])
+ ]);
+
+ $this->agentClient->emitBatch($batch);
+ }
+
+ /**
+ * @param JaegerSpan $span
+ */
+ public function append(JaegerSpan $span)
+ {
+ $this->spans[] = $span;
+ }
+
+ public function close()
+ {
+ }
+}