diff options
Diffstat (limited to 'vendor/open-telemetry/sdk/Logs/Processor')
4 files changed, 410 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Logs/Processor/BatchLogRecordProcessor.php b/vendor/open-telemetry/sdk/Logs/Processor/BatchLogRecordProcessor.php new file mode 100644 index 000000000..fc6faca54 --- /dev/null +++ b/vendor/open-telemetry/sdk/Logs/Processor/BatchLogRecordProcessor.php @@ -0,0 +1,273 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Logs\Processor; + +use InvalidArgumentException; +use OpenTelemetry\API\Behavior\LogsMessagesTrait; +use OpenTelemetry\API\Metrics\MeterProviderInterface; +use OpenTelemetry\API\Metrics\ObserverInterface; +use OpenTelemetry\Context\Context; +use OpenTelemetry\Context\ContextInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Common\Time\ClockInterface; +use OpenTelemetry\SDK\Logs\LogRecordExporterInterface; +use OpenTelemetry\SDK\Logs\LogRecordProcessorInterface; +use OpenTelemetry\SDK\Logs\ReadWriteLogRecord; +use SplQueue; +use Throwable; + +class BatchLogRecordProcessor implements LogRecordProcessorInterface +{ + use LogsMessagesTrait; + + public const DEFAULT_SCHEDULE_DELAY = 1000; + public const DEFAULT_EXPORT_TIMEOUT = 30000; + public const DEFAULT_MAX_QUEUE_SIZE = 2048; + public const DEFAULT_MAX_EXPORT_BATCH_SIZE = 512; + + private const ATTRIBUTES_PROCESSOR = ['processor' => 'batching']; + private const ATTRIBUTES_QUEUED = self::ATTRIBUTES_PROCESSOR + ['state' => 'queued']; + private const ATTRIBUTES_PENDING = self::ATTRIBUTES_PROCESSOR + ['state' => 'pending']; + private const ATTRIBUTES_PROCESSED = self::ATTRIBUTES_PROCESSOR + ['state' => 'processed']; + private const ATTRIBUTES_DROPPED = self::ATTRIBUTES_PROCESSOR + ['state' => 'dropped']; + private const ATTRIBUTES_FREE = self::ATTRIBUTES_PROCESSOR + ['state' => 'free']; + + private LogRecordExporterInterface $exporter; + private ClockInterface $clock; + private int $maxQueueSize; + private int $scheduledDelayNanos; + private int $maxExportBatchSize; + private bool $autoFlush; + private ContextInterface $exportContext; + + private ?int $nextScheduledRun = null; + private bool $running = false; + private int $dropped = 0; + private int $processed = 0; + private int $batchId = 0; + private int $queueSize = 0; + /** @var list<ReadWriteLogRecord> */ + private array $batch = []; + /** @var SplQueue<list<ReadWriteLogRecord>> */ + private SplQueue $queue; + /** @var SplQueue<array{int, string, ?CancellationInterface, bool, ContextInterface}> */ + private SplQueue $flush; + + private bool $closed = false; + + public function __construct( + LogRecordExporterInterface $exporter, + ClockInterface $clock, + int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE, + int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY, + int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT, + int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE, + bool $autoFlush = true, + ?MeterProviderInterface $meterProvider = null + ) { + if ($maxQueueSize <= 0) { + throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize)); + } + if ($scheduledDelayMillis <= 0) { + throw new InvalidArgumentException(sprintf('Scheduled delay (%d) must be greater than zero', $scheduledDelayMillis)); + } + if ($exportTimeoutMillis <= 0) { + throw new InvalidArgumentException(sprintf('Export timeout (%d) must be greater than zero', $exportTimeoutMillis)); + } + if ($maxExportBatchSize <= 0) { + throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be greater than zero', $maxExportBatchSize)); + } + if ($maxExportBatchSize > $maxQueueSize) { + throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be less than or equal to maximum queue size (%d)', $maxExportBatchSize, $maxQueueSize)); + } + + $this->exporter = $exporter; + $this->clock = $clock; + $this->maxQueueSize = $maxQueueSize; + $this->scheduledDelayNanos = $scheduledDelayMillis * 1_000_000; + $this->maxExportBatchSize = $maxExportBatchSize; + $this->autoFlush = $autoFlush; + + $this->exportContext = Context::getCurrent(); + $this->queue = new SplQueue(); + $this->flush = new SplQueue(); + + if ($meterProvider === null) { + return; + } + + $meter = $meterProvider->getMeter('io.opentelemetry.sdk'); + $meter + ->createObservableUpDownCounter( + 'otel.logs.log_processor.logs', + '{logs}', + 'The number of log records received by the processor', + ) + ->observe(function (ObserverInterface $observer): void { + $queued = $this->queue->count() * $this->maxExportBatchSize + count($this->batch); + $pending = $this->queueSize - $queued; + $processed = $this->processed; + $dropped = $this->dropped; + + $observer->observe($queued, self::ATTRIBUTES_QUEUED); + $observer->observe($pending, self::ATTRIBUTES_PENDING); + $observer->observe($processed, self::ATTRIBUTES_PROCESSED); + $observer->observe($dropped, self::ATTRIBUTES_DROPPED); + }); + $meter + ->createObservableUpDownCounter( + 'otel.logs.log_processor.queue.limit', + '{logs}', + 'The queue size limit', + ) + ->observe(function (ObserverInterface $observer): void { + $observer->observe($this->maxQueueSize, self::ATTRIBUTES_PROCESSOR); + }); + $meter + ->createObservableUpDownCounter( + 'otel.logs.log_processor.queue.usage', + '{logs}', + 'The current queue usage', + ) + ->observe(function (ObserverInterface $observer): void { + $queued = $this->queue->count() * $this->maxExportBatchSize + count($this->batch); + $pending = $this->queueSize - $queued; + $free = $this->maxQueueSize - $this->queueSize; + + $observer->observe($queued, self::ATTRIBUTES_QUEUED); + $observer->observe($pending, self::ATTRIBUTES_PENDING); + $observer->observe($free, self::ATTRIBUTES_FREE); + }); + } + + public function onEmit(ReadWriteLogRecord $record, ?ContextInterface $context = null): void + { + if ($this->closed) { + return; + } + + if ($this->queueSize === $this->maxQueueSize) { + $this->dropped++; + + return; + } + + $this->queueSize++; + $this->batch[] = $record; + $this->nextScheduledRun ??= $this->clock->now() + $this->scheduledDelayNanos; + + if (count($this->batch) === $this->maxExportBatchSize) { + $this->enqueueBatch(); + } + if ($this->autoFlush) { + $this->flush(); + } + } + + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + if ($this->closed) { + return false; + } + + return $this->flush(__FUNCTION__, $cancellation); + } + + public function shutdown(?CancellationInterface $cancellation = null): bool + { + if ($this->closed) { + return false; + } + + $this->closed = true; + + return $this->flush(__FUNCTION__, $cancellation); + } + + private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null): bool + { + if ($flushMethod !== null) { + $flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch; + $this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running, Context::getCurrent()]); + } + + if ($this->running) { + return false; + } + + $success = true; + $exception = null; + $this->running = true; + + try { + for (;;) { + while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) { + [, $flushMethod, $cancellation, $propagateResult, $context] = $this->flush->dequeue(); + $scope = $context->activate(); + + try { + $result = $this->exporter->$flushMethod($cancellation); + if ($propagateResult) { + $success = $result; + } + } catch (Throwable $e) { + if ($propagateResult) { + $exception = $e; + } else { + self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]); + } + } finally { + $scope->detach(); + } + } + + if (!$this->shouldFlush()) { + break; + } + + if ($this->queue->isEmpty()) { + $this->enqueueBatch(); + } + $batchSize = count($this->queue->bottom()); + $this->batchId++; + $scope = $this->exportContext->activate(); + + try { + $this->exporter->export($this->queue->dequeue())->await(); + } catch (Throwable $e) { + self::logError('Unhandled export error', ['exception' => $e]); + } finally { + $this->processed += $batchSize; + $this->queueSize -= $batchSize; + $scope->detach(); + } + } + } finally { + $this->running = false; + } + + if ($exception !== null) { + throw $exception; + } + + return $success; + } + + private function shouldFlush(): bool + { + return !$this->flush->isEmpty() + || $this->autoFlush && !$this->queue->isEmpty() + || $this->autoFlush && $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun; + } + + private function enqueueBatch(): void + { + assert($this->batch !== []); + + $this->queue->enqueue($this->batch); + $this->batch = []; + $this->nextScheduledRun = null; + } +} diff --git a/vendor/open-telemetry/sdk/Logs/Processor/MultiLogRecordProcessor.php b/vendor/open-telemetry/sdk/Logs/Processor/MultiLogRecordProcessor.php new file mode 100644 index 000000000..753a75df8 --- /dev/null +++ b/vendor/open-telemetry/sdk/Logs/Processor/MultiLogRecordProcessor.php @@ -0,0 +1,62 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Logs\Processor; + +use OpenTelemetry\Context\ContextInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Logs\LogRecordProcessorInterface; +use OpenTelemetry\SDK\Logs\ReadWriteLogRecord; + +class MultiLogRecordProcessor implements LogRecordProcessorInterface +{ + // @var LogRecordProcessorInterface[] + private array $processors = []; + + public function __construct(array $processors) + { + foreach ($processors as $processor) { + assert($processor instanceof LogRecordProcessorInterface); + $this->processors[] = $processor; + } + } + + public function onEmit(ReadWriteLogRecord $record, ?ContextInterface $context = null): void + { + foreach ($this->processors as $processor) { + $processor->onEmit($record, $context); + } + } + + /** + * Returns `true` if all processors shut down successfully, else `false` + * Subsequent calls to `shutdown` are a no-op. + */ + public function shutdown(?CancellationInterface $cancellation = null): bool + { + $result = true; + foreach ($this->processors as $processor) { + if (!$processor->shutdown($cancellation)) { + $result = false; + } + } + + return $result; + } + + /** + * Returns `true` if all processors flush successfully, else `false`. + */ + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + $result = true; + foreach ($this->processors as $processor) { + if (!$processor->forceFlush($cancellation)) { + $result = false; + } + } + + return $result; + } +} diff --git a/vendor/open-telemetry/sdk/Logs/Processor/NoopLogRecordProcessor.php b/vendor/open-telemetry/sdk/Logs/Processor/NoopLogRecordProcessor.php new file mode 100644 index 000000000..7028052e1 --- /dev/null +++ b/vendor/open-telemetry/sdk/Logs/Processor/NoopLogRecordProcessor.php @@ -0,0 +1,37 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Logs\Processor; + +use OpenTelemetry\Context\ContextInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Logs\LogRecordProcessorInterface; +use OpenTelemetry\SDK\Logs\ReadWriteLogRecord; + +class NoopLogRecordProcessor implements LogRecordProcessorInterface +{ + public static function getInstance(): self + { + static $instance; + + return $instance ??= new self(); + } + + /** + * @codeCoverageIgnore + */ + public function onEmit(ReadWriteLogRecord $record, ?ContextInterface $context = null): void + { + } + + public function shutdown(?CancellationInterface $cancellation = null): bool + { + return true; + } + + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + return true; + } +} diff --git a/vendor/open-telemetry/sdk/Logs/Processor/SimpleLogRecordProcessor.php b/vendor/open-telemetry/sdk/Logs/Processor/SimpleLogRecordProcessor.php new file mode 100644 index 000000000..f26f6607c --- /dev/null +++ b/vendor/open-telemetry/sdk/Logs/Processor/SimpleLogRecordProcessor.php @@ -0,0 +1,38 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Logs\Processor; + +use OpenTelemetry\Context\ContextInterface; +use OpenTelemetry\SDK\Common\Future\CancellationInterface; +use OpenTelemetry\SDK\Logs\LogRecordExporterInterface; +use OpenTelemetry\SDK\Logs\LogRecordProcessorInterface; +use OpenTelemetry\SDK\Logs\ReadWriteLogRecord; + +class SimpleLogRecordProcessor implements LogRecordProcessorInterface +{ + private LogRecordExporterInterface $exporter; + public function __construct(LogRecordExporterInterface $exporter) + { + $this->exporter = $exporter; + } + + /** + * @see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/sdk.md#onemit + */ + public function onEmit(ReadWriteLogRecord $record, ?ContextInterface $context = null): void + { + $this->exporter->export([$record]); + } + + public function shutdown(?CancellationInterface $cancellation = null): bool + { + return $this->exporter->shutdown($cancellation); + } + + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + return $this->exporter->forceFlush($cancellation); + } +} |