summaryrefslogtreecommitdiff
path: root/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php')
-rw-r--r--vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php184
1 files changed, 184 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php
new file mode 100644
index 000000000..9a18d2a84
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php
@@ -0,0 +1,184 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\MetricRegistry;
+
+use function array_key_last;
+use Closure;
+use OpenTelemetry\Context\Context;
+use OpenTelemetry\Context\ContextStorageInterface;
+use OpenTelemetry\SDK\Common\Attribute\AttributesFactoryInterface;
+use OpenTelemetry\SDK\Common\Time\ClockInterface;
+use OpenTelemetry\SDK\Metrics\Instrument;
+use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorFactoryInterface;
+use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorInterface;
+use OpenTelemetry\SDK\Metrics\Stream\MetricStreamInterface;
+use function spl_object_id;
+
+/**
+ * @internal
+ */
+final class MetricRegistry implements MetricRegistryInterface, MetricWriterInterface
+{
+ private ?ContextStorageInterface $contextStorage;
+ private AttributesFactoryInterface $attributesFactory;
+ private ClockInterface $clock;
+
+ /** @var array<int, MetricStreamInterface> */
+ private array $streams = [];
+ /** @var array<int, MetricAggregatorInterface> */
+ private array $synchronousAggregators = [];
+ /** @var array<int, MetricAggregatorFactoryInterface> */
+ private array $asynchronousAggregatorFactories = [];
+
+ /** @var array<int, array<int, int>> */
+ private array $instrumentToStreams = [];
+ /** @var array<int, int> */
+ private array $streamToInstrument = [];
+ /** @var array<int, array<int, int>> */
+ private array $instrumentToCallbacks = [];
+ /** @var array<int, Closure> */
+ private array $asynchronousCallbacks = [];
+ /** @var array<int, list<int>> */
+ private array $asynchronousCallbackArguments = [];
+
+ public function __construct(
+ ?ContextStorageInterface $contextStorage,
+ AttributesFactoryInterface $attributesFactory,
+ ClockInterface $clock
+ ) {
+ $this->contextStorage = $contextStorage;
+ $this->attributesFactory = $attributesFactory;
+ $this->clock = $clock;
+ }
+
+ public function registerSynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorInterface $aggregator): int
+ {
+ $this->streams[] = $stream;
+ $streamId = array_key_last($this->streams);
+ $instrumentId = spl_object_id($instrument);
+
+ $this->synchronousAggregators[$streamId] = $aggregator;
+ $this->instrumentToStreams[$instrumentId][$streamId] = $streamId;
+ $this->streamToInstrument[$streamId] = $instrumentId;
+
+ return $streamId;
+ }
+
+ public function registerAsynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorFactoryInterface $aggregatorFactory): int
+ {
+ $this->streams[] = $stream;
+ $streamId = array_key_last($this->streams);
+ $instrumentId = spl_object_id($instrument);
+
+ $this->asynchronousAggregatorFactories[$streamId] = $aggregatorFactory;
+ $this->instrumentToStreams[$instrumentId][$streamId] = $streamId;
+ $this->streamToInstrument[$streamId] = $instrumentId;
+
+ return $streamId;
+ }
+
+ public function unregisterStream(int $streamId): void
+ {
+ $instrumentId = $this->streamToInstrument[$streamId];
+ unset(
+ $this->streams[$streamId],
+ $this->synchronousAggregators[$streamId],
+ $this->asynchronousAggregatorFactories[$streamId],
+ $this->instrumentToStreams[$instrumentId][$streamId],
+ $this->streamToInstrument[$streamId],
+ );
+ if (!$this->instrumentToStreams[$instrumentId]) {
+ unset($this->instrumentToStreams[$instrumentId]);
+ }
+ }
+
+ public function record(Instrument $instrument, $value, iterable $attributes = [], $context = null): void
+ {
+ $context = Context::resolve($context, $this->contextStorage);
+ $attributes = $this->attributesFactory->builder($attributes)->build();
+ $timestamp = $this->clock->now();
+ $instrumentId = spl_object_id($instrument);
+ foreach ($this->instrumentToStreams[$instrumentId] ?? [] as $streamId) {
+ if ($aggregator = $this->synchronousAggregators[$streamId] ?? null) {
+ $aggregator->record($value, $attributes, $context, $timestamp);
+ }
+ }
+ }
+
+ public function registerCallback(Closure $callback, Instrument $instrument, Instrument ...$instruments): int
+ {
+ $callbackId = array_key_last($this->asynchronousCallbacks) + 1;
+ $this->asynchronousCallbacks[$callbackId] = $callback;
+
+ $instrumentId = spl_object_id($instrument);
+ $this->asynchronousCallbackArguments[$callbackId] = [$instrumentId];
+ $this->instrumentToCallbacks[$instrumentId][$callbackId] = $callbackId;
+ foreach ($instruments as $instrument) {
+ $instrumentId = spl_object_id($instrument);
+ $this->asynchronousCallbackArguments[$callbackId][] = $instrumentId;
+ $this->instrumentToCallbacks[$instrumentId][$callbackId] = $callbackId;
+ }
+
+ return $callbackId;
+ }
+
+ public function unregisterCallback(int $callbackId): void
+ {
+ $instrumentIds = $this->asynchronousCallbackArguments[$callbackId];
+ unset(
+ $this->asynchronousCallbacks[$callbackId],
+ $this->asynchronousCallbackArguments[$callbackId],
+ );
+ foreach ($instrumentIds as $instrumentId) {
+ unset($this->instrumentToCallbacks[$instrumentId][$callbackId]);
+ if (!$this->instrumentToCallbacks[$instrumentId]) {
+ unset($this->instrumentToCallbacks[$instrumentId]);
+ }
+ }
+ }
+
+ public function collectAndPush(iterable $streamIds): void
+ {
+ $timestamp = $this->clock->now();
+ $aggregators = [];
+ $observers = [];
+ $callbackIds = [];
+ foreach ($streamIds as $streamId) {
+ if (!$aggregator = $this->synchronousAggregators[$streamId] ?? null) {
+ $aggregator = $this->asynchronousAggregatorFactories[$streamId]->create();
+
+ $instrumentId = $this->streamToInstrument[$streamId];
+ $observers[$instrumentId] ??= new MultiObserver($this->attributesFactory, $timestamp);
+ $observers[$instrumentId]->writers[] = $aggregator;
+ foreach ($this->instrumentToCallbacks[$instrumentId] ?? [] as $callbackId) {
+ $callbackIds[$callbackId] = $callbackId;
+ }
+ }
+
+ $aggregators[$streamId] = $aggregator;
+ }
+
+ $noopObserver = new NoopObserver();
+ $callbacks = [];
+ foreach ($callbackIds as $callbackId) {
+ $args = [];
+ foreach ($this->asynchronousCallbackArguments[$callbackId] as $instrumentId) {
+ $args[] = $observers[$instrumentId] ?? $noopObserver;
+ }
+ $callback = $this->asynchronousCallbacks[$callbackId];
+ $callbacks[] = static fn () => $callback(...$args);
+ }
+ foreach ($callbacks as $callback) {
+ $callback();
+ }
+
+ $timestamp = $this->clock->now();
+ foreach ($aggregators as $streamId => $aggregator) {
+ if ($stream = $this->streams[$streamId] ?? null) {
+ $stream->push($aggregator->collect($timestamp));
+ }
+ }
+ }
+}