*/ private array $streams = []; /** @var array */ private array $synchronousAggregators = []; /** @var array */ private array $asynchronousAggregatorFactories = []; /** @var array> */ private array $instrumentToStreams = []; /** @var array */ private array $streamToInstrument = []; /** @var array> */ private array $instrumentToCallbacks = []; /** @var array */ private array $asynchronousCallbacks = []; /** @var array> */ private array $asynchronousCallbackArguments = []; public function __construct( ?ContextStorageInterface $contextStorage, AttributesFactoryInterface $attributesFactory, ClockInterface $clock ) { $this->contextStorage = $contextStorage; $this->attributesFactory = $attributesFactory; $this->clock = $clock; } public function registerSynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorInterface $aggregator): int { $this->streams[] = $stream; $streamId = array_key_last($this->streams); $instrumentId = spl_object_id($instrument); $this->synchronousAggregators[$streamId] = $aggregator; $this->instrumentToStreams[$instrumentId][$streamId] = $streamId; $this->streamToInstrument[$streamId] = $instrumentId; return $streamId; } public function registerAsynchronousStream(Instrument $instrument, MetricStreamInterface $stream, MetricAggregatorFactoryInterface $aggregatorFactory): int { $this->streams[] = $stream; $streamId = array_key_last($this->streams); $instrumentId = spl_object_id($instrument); $this->asynchronousAggregatorFactories[$streamId] = $aggregatorFactory; $this->instrumentToStreams[$instrumentId][$streamId] = $streamId; $this->streamToInstrument[$streamId] = $instrumentId; return $streamId; } public function unregisterStream(int $streamId): void { $instrumentId = $this->streamToInstrument[$streamId]; unset( $this->streams[$streamId], $this->synchronousAggregators[$streamId], $this->asynchronousAggregatorFactories[$streamId], $this->instrumentToStreams[$instrumentId][$streamId], $this->streamToInstrument[$streamId], ); if (!$this->instrumentToStreams[$instrumentId]) { unset($this->instrumentToStreams[$instrumentId]); } } public function record(Instrument $instrument, $value, iterable $attributes = [], $context = null): void { $context = Context::resolve($context, $this->contextStorage); $attributes = $this->attributesFactory->builder($attributes)->build(); $timestamp = $this->clock->now(); $instrumentId = spl_object_id($instrument); foreach ($this->instrumentToStreams[$instrumentId] ?? [] as $streamId) { if ($aggregator = $this->synchronousAggregators[$streamId] ?? null) { $aggregator->record($value, $attributes, $context, $timestamp); } } } public function registerCallback(Closure $callback, Instrument $instrument, Instrument ...$instruments): int { $callbackId = array_key_last($this->asynchronousCallbacks) + 1; $this->asynchronousCallbacks[$callbackId] = $callback; $instrumentId = spl_object_id($instrument); $this->asynchronousCallbackArguments[$callbackId] = [$instrumentId]; $this->instrumentToCallbacks[$instrumentId][$callbackId] = $callbackId; foreach ($instruments as $instrument) { $instrumentId = spl_object_id($instrument); $this->asynchronousCallbackArguments[$callbackId][] = $instrumentId; $this->instrumentToCallbacks[$instrumentId][$callbackId] = $callbackId; } return $callbackId; } public function unregisterCallback(int $callbackId): void { $instrumentIds = $this->asynchronousCallbackArguments[$callbackId]; unset( $this->asynchronousCallbacks[$callbackId], $this->asynchronousCallbackArguments[$callbackId], ); foreach ($instrumentIds as $instrumentId) { unset($this->instrumentToCallbacks[$instrumentId][$callbackId]); if (!$this->instrumentToCallbacks[$instrumentId]) { unset($this->instrumentToCallbacks[$instrumentId]); } } } public function collectAndPush(iterable $streamIds): void { $timestamp = $this->clock->now(); $aggregators = []; $observers = []; $callbackIds = []; foreach ($streamIds as $streamId) { if (!$aggregator = $this->synchronousAggregators[$streamId] ?? null) { $aggregator = $this->asynchronousAggregatorFactories[$streamId]->create(); $instrumentId = $this->streamToInstrument[$streamId]; $observers[$instrumentId] ??= new MultiObserver($this->attributesFactory, $timestamp); $observers[$instrumentId]->writers[] = $aggregator; foreach ($this->instrumentToCallbacks[$instrumentId] ?? [] as $callbackId) { $callbackIds[$callbackId] = $callbackId; } } $aggregators[$streamId] = $aggregator; } $noopObserver = new NoopObserver(); $callbacks = []; foreach ($callbackIds as $callbackId) { $args = []; foreach ($this->asynchronousCallbackArguments[$callbackId] as $instrumentId) { $args[] = $observers[$instrumentId] ?? $noopObserver; } $callback = $this->asynchronousCallbacks[$callbackId]; $callbacks[] = static fn () => $callback(...$args); } foreach ($callbacks as $callback) { $callback(); } $timestamp = $this->clock->now(); foreach ($aggregators as $streamId => $aggregator) { if ($stream = $this->streams[$streamId] ?? null) { $stream->push($aggregator->collect($timestamp)); } } } }