diff options
Diffstat (limited to 'vendor/open-telemetry/sdk/Metrics/MetricRegistry')
6 files changed, 294 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricCollectorInterface.php b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricCollectorInterface.php new file mode 100644 index 000000000..4e8e91ced --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricCollectorInterface.php @@ -0,0 +1,13 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\MetricRegistry; + +/** + * @internal + */ +interface MetricCollectorInterface +{ + public function collectAndPush(iterable $streamIds): void; +} diff --git a/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php new file mode 100644 index 000000000..9a18d2a84 --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistry.php @@ -0,0 +1,184 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\MetricRegistry; + +use function array_key_last; +use Closure; +use OpenTelemetry\Context\Context; +use OpenTelemetry\Context\ContextStorageInterface; +use OpenTelemetry\SDK\Common\Attribute\AttributesFactoryInterface; +use OpenTelemetry\SDK\Common\Time\ClockInterface; +use OpenTelemetry\SDK\Metrics\Instrument; +use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorFactoryInterface; +use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorInterface; +use OpenTelemetry\SDK\Metrics\Stream\MetricStreamInterface; +use function spl_object_id; + +/** + * @internal + */ +final class MetricRegistry implements MetricRegistryInterface, MetricWriterInterface +{ + private ?ContextStorageInterface $contextStorage; + private AttributesFactoryInterface $attributesFactory; + private ClockInterface $clock; + + /** @var array<int, MetricStreamInterface> */ + private array $streams = []; + /** @var array<int, MetricAggregatorInterface> */ + private array $synchronousAggregators = []; + /** @var array<int, MetricAggregatorFactoryInterface> */ + private array $asynchronousAggregatorFactories = []; + + /** @var array<int, array<int, int>> */ + private array $instrumentToStreams = []; + /** @var array<int, int> */ + private array $streamToInstrument = []; + /** @var array<int, array<int, int>> */ + private array $instrumentToCallbacks = []; + /** @var array<int, Closure> */ + private array $asynchronousCallbacks = []; + /** @var array<int, list<int>> */ + private array $asynchronousCallbackArguments = []; + + public function __construct( + ?ContextStorageInterface $contextStorage, + AttributesFactoryInterface $attributesFactory, + ClockInterface $clock + ) { + $this->contextStorage = $contextStorage; + $this->attributesFactory = $attributesFactory; + $this->clock = $clock; + } + + public function registerSynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorInterface $aggregator): int + { + $this->streams[] = $stream; + $streamId = array_key_last($this->streams); + $instrumentId = spl_object_id($instrument); + + $this->synchronousAggregators[$streamId] = $aggregator; + $this->instrumentToStreams[$instrumentId][$streamId] = $streamId; + $this->streamToInstrument[$streamId] = $instrumentId; + + return $streamId; + } + + public function registerAsynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorFactoryInterface $aggregatorFactory): int + { + $this->streams[] = $stream; + $streamId = array_key_last($this->streams); + $instrumentId = spl_object_id($instrument); + + $this->asynchronousAggregatorFactories[$streamId] = $aggregatorFactory; + $this->instrumentToStreams[$instrumentId][$streamId] = $streamId; + $this->streamToInstrument[$streamId] = $instrumentId; + + return $streamId; + } + + public function unregisterStream(int $streamId): void + { + $instrumentId = $this->streamToInstrument[$streamId]; + unset( + $this->streams[$streamId], + $this->synchronousAggregators[$streamId], + $this->asynchronousAggregatorFactories[$streamId], + $this->instrumentToStreams[$instrumentId][$streamId], + $this->streamToInstrument[$streamId], + ); + if (!$this->instrumentToStreams[$instrumentId]) { + unset($this->instrumentToStreams[$instrumentId]); + } + } + + public function record(Instrument $instrument, $value, iterable $attributes = [], $context = null): void + { + $context = Context::resolve($context, $this->contextStorage); + $attributes = $this->attributesFactory->builder($attributes)->build(); + $timestamp = $this->clock->now(); + $instrumentId = spl_object_id($instrument); + foreach ($this->instrumentToStreams[$instrumentId] ?? [] as $streamId) { + if ($aggregator = $this->synchronousAggregators[$streamId] ?? null) { + $aggregator->record($value, $attributes, $context, $timestamp); + } + } + } + + public function registerCallback(Closure $callback, Instrument $instrument, Instrument ...$instruments): int + { + $callbackId = array_key_last($this->asynchronousCallbacks) + 1; + $this->asynchronousCallbacks[$callbackId] = $callback; + + $instrumentId = spl_object_id($instrument); + $this->asynchronousCallbackArguments[$callbackId] = [$instrumentId]; + $this->instrumentToCallbacks[$instrumentId][$callbackId] = $callbackId; + foreach ($instruments as $instrument) { + $instrumentId = spl_object_id($instrument); + $this->asynchronousCallbackArguments[$callbackId][] = $instrumentId; + $this->instrumentToCallbacks[$instrumentId][$callbackId] = $callbackId; + } + + return $callbackId; + } + + public function unregisterCallback(int $callbackId): void + { + $instrumentIds = $this->asynchronousCallbackArguments[$callbackId]; + unset( + $this->asynchronousCallbacks[$callbackId], + $this->asynchronousCallbackArguments[$callbackId], + ); + foreach ($instrumentIds as $instrumentId) { + unset($this->instrumentToCallbacks[$instrumentId][$callbackId]); + if (!$this->instrumentToCallbacks[$instrumentId]) { + unset($this->instrumentToCallbacks[$instrumentId]); + } + } + } + + public function collectAndPush(iterable $streamIds): void + { + $timestamp = $this->clock->now(); + $aggregators = []; + $observers = []; + $callbackIds = []; + foreach ($streamIds as $streamId) { + if (!$aggregator = $this->synchronousAggregators[$streamId] ?? null) { + $aggregator = $this->asynchronousAggregatorFactories[$streamId]->create(); + + $instrumentId = $this->streamToInstrument[$streamId]; + $observers[$instrumentId] ??= new MultiObserver($this->attributesFactory, $timestamp); + $observers[$instrumentId]->writers[] = $aggregator; + foreach ($this->instrumentToCallbacks[$instrumentId] ?? [] as $callbackId) { + $callbackIds[$callbackId] = $callbackId; + } + } + + $aggregators[$streamId] = $aggregator; + } + + $noopObserver = new NoopObserver(); + $callbacks = []; + foreach ($callbackIds as $callbackId) { + $args = []; + foreach ($this->asynchronousCallbackArguments[$callbackId] as $instrumentId) { + $args[] = $observers[$instrumentId] ?? $noopObserver; + } + $callback = $this->asynchronousCallbacks[$callbackId]; + $callbacks[] = static fn () => $callback(...$args); + } + foreach ($callbacks as $callback) { + $callback(); + } + + $timestamp = $this->clock->now(); + foreach ($aggregators as $streamId => $aggregator) { + if ($stream = $this->streams[$streamId] ?? null) { + $stream->push($aggregator->collect($timestamp)); + } + } + } +} diff --git a/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistryInterface.php b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistryInterface.php new file mode 100644 index 000000000..e86731138 --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricRegistryInterface.php @@ -0,0 +1,22 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\MetricRegistry; + +use OpenTelemetry\SDK\Metrics\Instrument; +use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorFactoryInterface; +use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorInterface; +use OpenTelemetry\SDK\Metrics\Stream\MetricStreamInterface; + +/** + * @internal + */ +interface MetricRegistryInterface extends MetricCollectorInterface +{ + public function registerSynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorInterface $aggregator): int; + + public function registerAsynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorFactoryInterface $aggregatorFactory): int; + + public function unregisterStream(int $streamId): void; +} diff --git a/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricWriterInterface.php b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricWriterInterface.php new file mode 100644 index 000000000..e5ff7eb5c --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MetricWriterInterface.php @@ -0,0 +1,20 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\MetricRegistry; + +use Closure; +use OpenTelemetry\SDK\Metrics\Instrument; + +/** + * @internal + */ +interface MetricWriterInterface +{ + public function record(Instrument $instrument, $value, iterable $attributes = [], $context = null): void; + + public function registerCallback(Closure $callback, Instrument $instrument, Instrument ...$instruments): int; + + public function unregisterCallback(int $callbackId): void; +} diff --git a/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MultiObserver.php b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MultiObserver.php new file mode 100644 index 000000000..f36f74a2a --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/MultiObserver.php @@ -0,0 +1,37 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\MetricRegistry; + +use OpenTelemetry\API\Metrics\ObserverInterface; +use OpenTelemetry\Context\Context; +use OpenTelemetry\SDK\Common\Attribute\AttributesFactoryInterface; +use OpenTelemetry\SDK\Metrics\Stream\WritableMetricStreamInterface; + +/** + * @internal + */ +final class MultiObserver implements ObserverInterface +{ + private AttributesFactoryInterface $attributesFactory; + private int $timestamp; + + /** @var list<WritableMetricStreamInterface> */ + public array $writers = []; + + public function __construct(AttributesFactoryInterface $attributesFactory, int $timestamp) + { + $this->attributesFactory = $attributesFactory; + $this->timestamp = $timestamp; + } + + public function observe($amount, iterable $attributes = []): void + { + $context = Context::getRoot(); + $attributes = $this->attributesFactory->builder($attributes)->build(); + foreach ($this->writers as $writer) { + $writer->record($amount, $attributes, $context, $this->timestamp); + } + } +} diff --git a/vendor/open-telemetry/sdk/Metrics/MetricRegistry/NoopObserver.php b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/NoopObserver.php new file mode 100644 index 000000000..efbd94dac --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/MetricRegistry/NoopObserver.php @@ -0,0 +1,18 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\MetricRegistry; + +use OpenTelemetry\API\Metrics\ObserverInterface; + +/** + * @internal + */ +final class NoopObserver implements ObserverInterface +{ + public function observe($amount, iterable $attributes = []): void + { + // no-op + } +} |