aggregation === null) { continue; } $dedupId = $this->streamId($view->aggregation, $view->attributeKeys); if (($streamId = $dedup[$dedupId] ?? null) === null) { $stream = new AsynchronousMetricStream($view->aggregation, $timestamp); $streamId = $registry->registerAsynchronousStream($instrument, $stream, new MetricAggregatorFactory( $this->attributeProcessor($view->attributeKeys), $view->aggregation, )); $streams[$streamId] = $stream; $dedup[$dedupId] = $streamId; } $this->registerSource( $view, $instrument, $instrumentationScope, $resource, $streams[$streamId], $registry, $registration, $streamId, ); } return array_keys($streams); } public function createSynchronousWriter( MetricRegistryInterface $registry, ResourceInfo $resource, InstrumentationScopeInterface $instrumentationScope, Instrument $instrument, int $timestamp, iterable $views, ?ExemplarFilterInterface $exemplarFilter = null ): array { $streams = []; $dedup = []; foreach ($views as [$view, $registration]) { if ($view->aggregation === null) { continue; } $dedupId = $this->streamId($view->aggregation, $view->attributeKeys); if (($streamId = $dedup[$dedupId] ?? null) === null) { $stream = new SynchronousMetricStream($view->aggregation, $timestamp); $streamId = $registry->registerSynchronousStream($instrument, $stream, new MetricAggregator( $this->attributeProcessor($view->attributeKeys), $view->aggregation, $this->createExemplarReservoir($view->aggregation, $exemplarFilter), )); $streams[$streamId] = $stream; $dedup[$dedupId] = $streamId; } $this->registerSource( $view, $instrument, $instrumentationScope, $resource, $streams[$streamId], $registry, $registration, $streamId, ); } return array_keys($streams); } private function attributeProcessor( ?array $attributeKeys ): ?AttributeProcessorInterface { return $attributeKeys !== null ? new FilteredAttributeProcessor($attributeKeys) : null; } private function createExemplarReservoir( AggregationInterface $aggregation, ?ExemplarFilterInterface $exemplarFilter ): ?ExemplarReservoirInterface { if (!$exemplarFilter) { return null; } if ($aggregation instanceof ExplicitBucketHistogramAggregation && $aggregation->boundaries) { $exemplarReservoir = new HistogramBucketReservoir($aggregation->boundaries); } else { $exemplarReservoir = new FixedSizeReservoir(); } return new FilteredReservoir($exemplarReservoir, $exemplarFilter); } private function registerSource( ViewProjection $view, Instrument $instrument, InstrumentationScopeInterface $instrumentationScope, ResourceInfo $resource, MetricStreamInterface $stream, MetricCollectorInterface $metricCollector, MetricRegistrationInterface $metricRegistration, int $streamId ): void { $provider = new StreamMetricSourceProvider( $view, $instrument, $instrumentationScope, $resource, $stream, $metricCollector, $streamId, ); $metricRegistration->register($provider, $provider); } private function streamId(AggregationInterface $aggregation, ?array $attributeKeys): string { return $this->trySerialize($aggregation) . serialize($attributeKeys); } private function trySerialize(object $object) { try { return serialize($object); } catch (Throwable $e) { } return spl_object_id($object); } }