diff options
Diffstat (limited to 'vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php')
-rw-r--r-- | vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php new file mode 100644 index 000000000..9a18d2a84 --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php @@ -0,0 +1,184 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\MetricRegistry; + +use function array_key_last; +use Closure; +use OpenTelemetry\Context\Context; +use OpenTelemetry\Context\ContextStorageInterface; +use OpenTelemetry\SDK\Common\Attribute\AttributesFactoryInterface; +use OpenTelemetry\SDK\Common\Time\ClockInterface; +use OpenTelemetry\SDK\Metrics\Instrument; +use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorFactoryInterface; +use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorInterface; +use OpenTelemetry\SDK\Metrics\Stream\MetricStreamInterface; +use function spl_object_id; + +/** + * @internal + */ +final class MetricRegistry implements MetricRegistryInterface, MetricWriterInterface +{ + private ?ContextStorageInterface $contextStorage; + private AttributesFactoryInterface $attributesFactory; + private ClockInterface $clock; + + /** @var array<int, MetricStreamInterface> */ + private array $streams = []; + /** @var array<int, MetricAggregatorInterface> */ + private array $synchronousAggregators = []; + /** @var array<int, MetricAggregatorFactoryInterface> */ + private array $asynchronousAggregatorFactories = []; + + /** @var array<int, array<int, int>> */ + private array $instrumentToStreams = []; + /** @var array<int, int> */ + private array $streamToInstrument = []; + /** @var array<int, array<int, int>> */ + private array $instrumentToCallbacks = []; + /** @var array<int, Closure> */ + private array $asynchronousCallbacks = []; + /** @var array<int, list<int>> */ + private array $asynchronousCallbackArguments = []; + + public function __construct( + ?ContextStorageInterface $contextStorage, + AttributesFactoryInterface $attributesFactory, + ClockInterface $clock + ) { + $this->contextStorage = $contextStorage; + $this->attributesFactory = $attributesFactory; + $this->clock = $clock; + } + + public function registerSynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorInterface $aggregator): int + { + $this->streams[] = $stream; + $streamId = array_key_last($this->streams); + $instrumentId = spl_object_id($instrument); + + $this->synchronousAggregators[$streamId] = $aggregator; + $this->instrumentToStreams[$instrumentId][$streamId] = $streamId; + $this->streamToInstrument[$streamId] = $instrumentId; + + return $streamId; + } + + public function registerAsynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorFactoryInterface $aggregatorFactory): int + { + $this->streams[] = $stream; + $streamId = array_key_last($this->streams); + $instrumentId = spl_object_id($instrument); + + $this->asynchronousAggregatorFactories[$streamId] = $aggregatorFactory; + $this->instrumentToStreams[$instrumentId][$streamId] = $streamId; + $this->streamToInstrument[$streamId] = $instrumentId; + + return $streamId; + } + + public function unregisterStream(int $streamId): void + { + $instrumentId = $this->streamToInstrument[$streamId]; + unset( + $this->streams[$streamId], + $this->synchronousAggregators[$streamId], + $this->asynchronousAggregatorFactories[$streamId], + $this->instrumentToStreams[$instrumentId][$streamId], + $this->streamToInstrument[$streamId], + ); + if (!$this->instrumentToStreams[$instrumentId]) { + unset($this->instrumentToStreams[$instrumentId]); + } + } + + public function record(Instrument $instrument, $value, iterable $attributes = [], $context = null): void + { + $context = Context::resolve($context, $this->contextStorage); + $attributes = $this->attributesFactory->builder($attributes)->build(); + $timestamp = $this->clock->now(); + $instrumentId = spl_object_id($instrument); + foreach ($this->instrumentToStreams[$instrumentId] ?? [] as $streamId) { + if ($aggregator = $this->synchronousAggregators[$streamId] ?? null) { + $aggregator->record($value, $attributes, $context, $timestamp); + } + } + } + + public function registerCallback(Closure $callback, Instrument $instrument, Instrument ...$instruments): int + { + $callbackId = array_key_last($this->asynchronousCallbacks) + 1; + $this->asynchronousCallbacks[$callbackId] = $callback; + + $instrumentId = spl_object_id($instrument); + $this->asynchronousCallbackArguments[$callbackId] = [$instrumentId]; + $this->instrumentToCallbacks[$instrumentId][$callbackId] = $callbackId; + foreach ($instruments as $instrument) { + $instrumentId = spl_object_id($instrument); + $this->asynchronousCallbackArguments[$callbackId][] = $instrumentId; + $this->instrumentToCallbacks[$instrumentId][$callbackId] = $callbackId; + } + + return $callbackId; + } + + public function unregisterCallback(int $callbackId): void + { + $instrumentIds = $this->asynchronousCallbackArguments[$callbackId]; + unset( + $this->asynchronousCallbacks[$callbackId], + $this->asynchronousCallbackArguments[$callbackId], + ); + foreach ($instrumentIds as $instrumentId) { + unset($this->instrumentToCallbacks[$instrumentId][$callbackId]); + if (!$this->instrumentToCallbacks[$instrumentId]) { + unset($this->instrumentToCallbacks[$instrumentId]); + } + } + } + + public function collectAndPush(iterable $streamIds): void + { + $timestamp = $this->clock->now(); + $aggregators = []; + $observers = []; + $callbackIds = []; + foreach ($streamIds as $streamId) { + if (!$aggregator = $this->synchronousAggregators[$streamId] ?? null) { + $aggregator = $this->asynchronousAggregatorFactories[$streamId]->create(); + + $instrumentId = $this->streamToInstrument[$streamId]; + $observers[$instrumentId] ??= new MultiObserver($this->attributesFactory, $timestamp); + $observers[$instrumentId]->writers[] = $aggregator; + foreach ($this->instrumentToCallbacks[$instrumentId] ?? [] as $callbackId) { + $callbackIds[$callbackId] = $callbackId; + } + } + + $aggregators[$streamId] = $aggregator; + } + + $noopObserver = new NoopObserver(); + $callbacks = []; + foreach ($callbackIds as $callbackId) { + $args = []; + foreach ($this->asynchronousCallbackArguments[$callbackId] as $instrumentId) { + $args[] = $observers[$instrumentId] ?? $noopObserver; + } + $callback = $this->asynchronousCallbacks[$callbackId]; + $callbacks[] = static fn () => $callback(...$args); + } + foreach ($callbacks as $callback) { + $callback(); + } + + $timestamp = $this->clock->now(); + foreach ($aggregators as $streamId => $aggregator) { + if ($stream = $this->streams[$streamId] ?? null) { + $stream->push($aggregator->collect($timestamp)); + } + } + } +} |