diff options
Diffstat (limited to 'vendor/open-telemetry/sdk/Trace/SpanProcessor')
5 files changed, 577 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Trace/SpanProcessor/BatchSpanProcessor.php b/vendor/open-telemetry/sdk/Trace/SpanProcessor/BatchSpanProcessor.php new file mode 100644 index 000000000..58032749e --- /dev/null +++ b/vendor/open-telemetry/sdk/Trace/SpanProcessor/BatchSpanProcessor.php @@ -0,0 +1,290 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Trace\SpanProcessor; + +use function assert; +use function count; +use InvalidArgumentException; +use OpenTelemetry\API\Behavior\LogsMessagesTrait; +use OpenTelemetry\API\Metrics\MeterProviderInterface; +use OpenTelemetry\API\Metrics\ObserverInterface; +use OpenTelemetry\Context\Context; +use OpenTelemetry\Context\ContextInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Common\Time\ClockInterface; +use OpenTelemetry\SDK\Trace\ReadableSpanInterface; +use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface; +use OpenTelemetry\SDK\Trace\SpanDataInterface; +use OpenTelemetry\SDK\Trace\SpanExporterInterface; +use OpenTelemetry\SDK\Trace\SpanProcessorInterface; +use SplQueue; +use function sprintf; +use Throwable; + +class BatchSpanProcessor implements SpanProcessorInterface +{ + use LogsMessagesTrait; + + public const DEFAULT_SCHEDULE_DELAY = 5000; + public const DEFAULT_EXPORT_TIMEOUT = 30000; + public const DEFAULT_MAX_QUEUE_SIZE = 2048; + public const DEFAULT_MAX_EXPORT_BATCH_SIZE = 512; + + private const ATTRIBUTES_PROCESSOR = ['processor' => 'batching']; + private const ATTRIBUTES_QUEUED = self::ATTRIBUTES_PROCESSOR + ['state' => 'queued']; + private const ATTRIBUTES_PENDING = self::ATTRIBUTES_PROCESSOR + ['state' => 'pending']; + private const ATTRIBUTES_PROCESSED = self::ATTRIBUTES_PROCESSOR + ['state' => 'processed']; + private const ATTRIBUTES_DROPPED = self::ATTRIBUTES_PROCESSOR + ['state' => 'dropped']; + private const ATTRIBUTES_FREE = self::ATTRIBUTES_PROCESSOR + ['state' => 'free']; + + private SpanExporterInterface $exporter; + private ClockInterface $clock; + private int $maxQueueSize; + private int $scheduledDelayNanos; + private int $maxExportBatchSize; + private bool $autoFlush; + private ContextInterface $exportContext; + + private ?int $nextScheduledRun = null; + private bool $running = false; + private int $dropped = 0; + private int $processed = 0; + private int $batchId = 0; + private int $queueSize = 0; + /** @var list<SpanDataInterface> */ + private array $batch = []; + /** @var SplQueue<list<SpanDataInterface>> */ + private SplQueue $queue; + /** @var SplQueue<array{int, string, ?CancellationInterface, bool, ContextInterface}> */ + private SplQueue $flush; + + private bool $closed = false; + + public function __construct( + SpanExporterInterface $exporter, + ClockInterface $clock, + int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE, + int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY, + int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT, + int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE, + bool $autoFlush = true, + ?MeterProviderInterface $meterProvider = null + ) { + if ($maxQueueSize <= 0) { + throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize)); + } + if ($scheduledDelayMillis <= 0) { + throw new InvalidArgumentException(sprintf('Scheduled delay (%d) must be greater than zero', $scheduledDelayMillis)); + } + if ($exportTimeoutMillis <= 0) { + throw new InvalidArgumentException(sprintf('Export timeout (%d) must be greater than zero', $exportTimeoutMillis)); + } + if ($maxExportBatchSize <= 0) { + throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be greater than zero', $maxExportBatchSize)); + } + if ($maxExportBatchSize > $maxQueueSize) { + throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be less than or equal to maximum queue size (%d)', $maxExportBatchSize, $maxQueueSize)); + } + + $this->exporter = $exporter; + $this->clock = $clock; + $this->maxQueueSize = $maxQueueSize; + $this->scheduledDelayNanos = $scheduledDelayMillis * 1_000_000; + $this->maxExportBatchSize = $maxExportBatchSize; + $this->autoFlush = $autoFlush; + + $this->exportContext = Context::getCurrent(); + $this->queue = new SplQueue(); + $this->flush = new SplQueue(); + + if ($meterProvider === null) { + return; + } + + $meter = $meterProvider->getMeter('io.opentelemetry.sdk'); + $meter + ->createObservableUpDownCounter( + 'otel.trace.span_processor.spans', + '{spans}', + 'The number of sampled spans received by the span processor', + ) + ->observe(function (ObserverInterface $observer): void { + $queued = $this->queue->count() * $this->maxExportBatchSize + count($this->batch); + $pending = $this->queueSize - $queued; + $processed = $this->processed; + $dropped = $this->dropped; + + $observer->observe($queued, self::ATTRIBUTES_QUEUED); + $observer->observe($pending, self::ATTRIBUTES_PENDING); + $observer->observe($processed, self::ATTRIBUTES_PROCESSED); + $observer->observe($dropped, self::ATTRIBUTES_DROPPED); + }); + $meter + ->createObservableUpDownCounter( + 'otel.trace.span_processor.queue.limit', + '{spans}', + 'The queue size limit', + ) + ->observe(function (ObserverInterface $observer): void { + $observer->observe($this->maxQueueSize, self::ATTRIBUTES_PROCESSOR); + }); + $meter + ->createObservableUpDownCounter( + 'otel.trace.span_processor.queue.usage', + '{spans}', + 'The current queue usage', + ) + ->observe(function (ObserverInterface $observer): void { + $queued = $this->queue->count() * $this->maxExportBatchSize + count($this->batch); + $pending = $this->queueSize - $queued; + $free = $this->maxQueueSize - $this->queueSize; + + $observer->observe($queued, self::ATTRIBUTES_QUEUED); + $observer->observe($pending, self::ATTRIBUTES_PENDING); + $observer->observe($free, self::ATTRIBUTES_FREE); + }); + } + + public function onStart(ReadWriteSpanInterface $span, ContextInterface $parentContext): void + { + } + + public function onEnd(ReadableSpanInterface $span): void + { + if ($this->closed) { + return; + } + if (!$span->getContext()->isSampled()) { + return; + } + + if ($this->queueSize === $this->maxQueueSize) { + $this->dropped++; + + return; + } + + $this->queueSize++; + $this->batch[] = $span->toSpanData(); + $this->nextScheduledRun ??= $this->clock->now() + $this->scheduledDelayNanos; + + if (count($this->batch) === $this->maxExportBatchSize) { + $this->enqueueBatch(); + } + if ($this->autoFlush) { + $this->flush(); + } + } + + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + if ($this->closed) { + return false; + } + + return $this->flush(__FUNCTION__, $cancellation); + } + + public function shutdown(?CancellationInterface $cancellation = null): bool + { + if ($this->closed) { + return false; + } + + $this->closed = true; + + return $this->flush(__FUNCTION__, $cancellation); + } + + public static function builder(SpanExporterInterface $exporter): BatchSpanProcessorBuilder + { + return new BatchSpanProcessorBuilder($exporter); + } + + private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null): bool + { + if ($flushMethod !== null) { + $flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch; + $this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running, Context::getCurrent()]); + } + + if ($this->running) { + return false; + } + + $success = true; + $exception = null; + $this->running = true; + + try { + for (;;) { + while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) { + [, $flushMethod, $cancellation, $propagateResult, $context] = $this->flush->dequeue(); + $scope = $context->activate(); + + try { + $result = $this->exporter->$flushMethod($cancellation); + if ($propagateResult) { + $success = $result; + } + } catch (Throwable $e) { + if ($propagateResult) { + $exception = $e; + } else { + self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]); + } + } finally { + $scope->detach(); + } + } + + if (!$this->shouldFlush()) { + break; + } + + if ($this->queue->isEmpty()) { + $this->enqueueBatch(); + } + $batchSize = count($this->queue->bottom()); + $this->batchId++; + $scope = $this->exportContext->activate(); + + try { + $this->exporter->export($this->queue->dequeue())->await(); + } catch (Throwable $e) { + self::logError('Unhandled export error', ['exception' => $e]); + } finally { + $this->processed += $batchSize; + $this->queueSize -= $batchSize; + $scope->detach(); + } + } + } finally { + $this->running = false; + } + + if ($exception !== null) { + throw $exception; + } + + return $success; + } + + private function shouldFlush(): bool + { + return !$this->flush->isEmpty() + || $this->autoFlush && !$this->queue->isEmpty() + || $this->autoFlush && $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun; + } + + private function enqueueBatch(): void + { + assert($this->batch !== []); + + $this->queue->enqueue($this->batch); + $this->batch = []; + $this->nextScheduledRun = null; + } +} diff --git a/vendor/open-telemetry/sdk/Trace/SpanProcessor/BatchSpanProcessorBuilder.php b/vendor/open-telemetry/sdk/Trace/SpanProcessor/BatchSpanProcessorBuilder.php new file mode 100644 index 000000000..8e81e7dd6 --- /dev/null +++ b/vendor/open-telemetry/sdk/Trace/SpanProcessor/BatchSpanProcessorBuilder.php @@ -0,0 +1,41 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Trace\SpanProcessor; + +use OpenTelemetry\SDK\Common\Time\ClockFactory; +use OpenTelemetry\SDK\Metrics\MeterProviderInterface; +use OpenTelemetry\SDK\Trace\SpanExporterInterface; + +class BatchSpanProcessorBuilder +{ + private SpanExporterInterface $exporter; + private ?MeterProviderInterface $meterProvider = null; + + public function __construct(SpanExporterInterface $exporter) + { + $this->exporter = $exporter; + } + + public function setMeterProvider(MeterProviderInterface $meterProvider): self + { + $this->meterProvider = $meterProvider; + + return $this; + } + + public function build(): BatchSpanProcessor + { + return new BatchSpanProcessor( + $this->exporter, + ClockFactory::getDefault(), + BatchSpanProcessor::DEFAULT_MAX_QUEUE_SIZE, + BatchSpanProcessor::DEFAULT_SCHEDULE_DELAY, + BatchSpanProcessor::DEFAULT_EXPORT_TIMEOUT, + BatchSpanProcessor::DEFAULT_MAX_EXPORT_BATCH_SIZE, + true, + $this->meterProvider + ); + } +} diff --git a/vendor/open-telemetry/sdk/Trace/SpanProcessor/MultiSpanProcessor.php b/vendor/open-telemetry/sdk/Trace/SpanProcessor/MultiSpanProcessor.php new file mode 100644 index 000000000..e690791f2 --- /dev/null +++ b/vendor/open-telemetry/sdk/Trace/SpanProcessor/MultiSpanProcessor.php @@ -0,0 +1,79 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Trace\SpanProcessor; + +use OpenTelemetry\Context\ContextInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Trace\ReadableSpanInterface; +use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface; +use OpenTelemetry\SDK\Trace\SpanProcessorInterface; + +/** + * Class SpanMultiProcessor is a SpanProcessor that forwards all events to an + * array of SpanProcessors. + */ +final class MultiSpanProcessor implements SpanProcessorInterface +{ + /** @var list<SpanProcessorInterface> */ + private array $processors = []; + + public function __construct(SpanProcessorInterface ...$spanProcessors) + { + foreach ($spanProcessors as $processor) { + $this->addSpanProcessor($processor); + } + } + + public function addSpanProcessor(SpanProcessorInterface $processor): void + { + $this->processors[] = $processor; + } + + /** @return list<SpanProcessorInterface> */ + public function getSpanProcessors(): array + { + return $this->processors; + } + + /** @inheritDoc */ + public function onStart(ReadWriteSpanInterface $span, ContextInterface $parentContext): void + { + foreach ($this->processors as $processor) { + $processor->onStart($span, $parentContext); + } + } + + /** @inheritDoc */ + public function onEnd(ReadableSpanInterface $span): void + { + foreach ($this->processors as $processor) { + $processor->onEnd($span); + } + } + + /** @inheritDoc */ + public function shutdown(?CancellationInterface $cancellation = null): bool + { + $result = true; + + foreach ($this->processors as $processor) { + $result = $result && $processor->shutdown(); + } + + return $result; + } + + /** @inheritDoc */ + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + $result = true; + + foreach ($this->processors as $processor) { + $result = $result && $processor->forceFlush(); + } + + return $result; + } +} diff --git a/vendor/open-telemetry/sdk/Trace/SpanProcessor/NoopSpanProcessor.php b/vendor/open-telemetry/sdk/Trace/SpanProcessor/NoopSpanProcessor.php new file mode 100644 index 000000000..9c4d1eabe --- /dev/null +++ b/vendor/open-telemetry/sdk/Trace/SpanProcessor/NoopSpanProcessor.php @@ -0,0 +1,47 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Trace\SpanProcessor; + +use OpenTelemetry\Context\ContextInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Trace\ReadableSpanInterface; +use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface; +use OpenTelemetry\SDK\Trace\SpanProcessorInterface; + +class NoopSpanProcessor implements SpanProcessorInterface +{ + private static ?SpanProcessorInterface $instance = null; + + public static function getInstance(): SpanProcessorInterface + { + if (null === self::$instance) { + self::$instance = new self(); + } + + return self::$instance; + } + + /** @inheritDoc */ + public function onStart(ReadWriteSpanInterface $span, ContextInterface $parentContext): void + { + } //@codeCoverageIgnore + + /** @inheritDoc */ + public function onEnd(ReadableSpanInterface $span): void + { + } //@codeCoverageIgnore + + /** @inheritDoc */ + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + return true; + } + + /** @inheritDoc */ + public function shutdown(?CancellationInterface $cancellation = null): bool + { + return $this->forceFlush(); + } +} diff --git a/vendor/open-telemetry/sdk/Trace/SpanProcessor/SimpleSpanProcessor.php b/vendor/open-telemetry/sdk/Trace/SpanProcessor/SimpleSpanProcessor.php new file mode 100644 index 000000000..4e86e79ab --- /dev/null +++ b/vendor/open-telemetry/sdk/Trace/SpanProcessor/SimpleSpanProcessor.php @@ -0,0 +1,120 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Trace\SpanProcessor; + +use Closure; +use OpenTelemetry\API\Behavior\LogsMessagesTrait; +use OpenTelemetry\Context\Context; +use OpenTelemetry\Context\ContextInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Trace\ReadableSpanInterface; +use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface; +use OpenTelemetry\SDK\Trace\SpanExporterInterface; +use OpenTelemetry\SDK\Trace\SpanProcessorInterface; +use SplQueue; +use function sprintf; +use Throwable; + +class SimpleSpanProcessor implements SpanProcessorInterface +{ + use LogsMessagesTrait; + + private SpanExporterInterface $exporter; + private ContextInterface $exportContext; + + private bool $running = false; + /** @var SplQueue<array{Closure, string, bool, ContextInterface}> */ + private SplQueue $queue; + + private bool $closed = false; + + public function __construct(SpanExporterInterface $exporter) + { + $this->exporter = $exporter; + + $this->exportContext = Context::getCurrent(); + $this->queue = new SplQueue(); + } + + public function onStart(ReadWriteSpanInterface $span, ContextInterface $parentContext): void + { + } + + public function onEnd(ReadableSpanInterface $span): void + { + if ($this->closed) { + return; + } + if (!$span->getContext()->isSampled()) { + return; + } + + $spanData = $span->toSpanData(); + $this->flush(fn () => $this->exporter->export([$spanData])->await(), 'export', false, $this->exportContext); + } + + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + if ($this->closed) { + return false; + } + + return $this->flush(fn (): bool => $this->exporter->forceFlush($cancellation), __FUNCTION__, true, Context::getCurrent()); + } + + public function shutdown(?CancellationInterface $cancellation = null): bool + { + if ($this->closed) { + return false; + } + + $this->closed = true; + + return $this->flush(fn (): bool => $this->exporter->shutdown($cancellation), __FUNCTION__, true, Context::getCurrent()); + } + + private function flush(Closure $task, string $taskName, bool $propagateResult, ContextInterface $context): bool + { + $this->queue->enqueue([$task, $taskName, $propagateResult && !$this->running, $context]); + + if ($this->running) { + return false; + } + + $success = true; + $exception = null; + $this->running = true; + + try { + while (!$this->queue->isEmpty()) { + [$task, $taskName, $propagateResult, $context] = $this->queue->dequeue(); + $scope = $context->activate(); + + try { + $result = $task(); + if ($propagateResult) { + $success = $result; + } + } catch (Throwable $e) { + if ($propagateResult) { + $exception = $e; + } else { + self::logError(sprintf('Unhandled %s error', $taskName), ['exception' => $e]); + } + } finally { + $scope->detach(); + } + } + } finally { + $this->running = false; + } + + if ($exception !== null) { + throw $exception; + } + + return $success; + } +} |