diff options
Diffstat (limited to 'vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php')
-rw-r--r-- | vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php b/vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php new file mode 100644 index 000000000..2c3af4c06 --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php @@ -0,0 +1,187 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\MetricFactory; + +use function array_keys; +use OpenTelemetry\SDK\Common\Instrumentation\InstrumentationScopeInterface; +use OpenTelemetry\SDK\Metrics\Aggregation\ExplicitBucketHistogramAggregation; +use OpenTelemetry\SDK\Metrics\AggregationInterface; +use OpenTelemetry\SDK\Metrics\AttributeProcessor\FilteredAttributeProcessor; +use OpenTelemetry\SDK\Metrics\AttributeProcessorInterface; +use OpenTelemetry\SDK\Metrics\Exemplar\ExemplarFilterInterface; +use OpenTelemetry\SDK\Metrics\Exemplar\ExemplarReservoirInterface; +use OpenTelemetry\SDK\Metrics\Exemplar\FilteredReservoir; +use OpenTelemetry\SDK\Metrics\Exemplar\FixedSizeReservoir; +use OpenTelemetry\SDK\Metrics\Exemplar\HistogramBucketReservoir; +use OpenTelemetry\SDK\Metrics\Instrument; +use OpenTelemetry\SDK\Metrics\MetricFactoryInterface; +use OpenTelemetry\SDK\Metrics\MetricRegistrationInterface; +use OpenTelemetry\SDK\Metrics\MetricRegistry\MetricCollectorInterface; +use OpenTelemetry\SDK\Metrics\MetricRegistry\MetricRegistryInterface; +use OpenTelemetry\SDK\Metrics\Stream\AsynchronousMetricStream; +use OpenTelemetry\SDK\Metrics\Stream\MetricAggregator; +use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorFactory; +use OpenTelemetry\SDK\Metrics\Stream\MetricStreamInterface; +use OpenTelemetry\SDK\Metrics\Stream\SynchronousMetricStream; +use OpenTelemetry\SDK\Metrics\ViewProjection; +use OpenTelemetry\SDK\Resource\ResourceInfo; +use function serialize; +use function spl_object_id; +use Throwable; + +/** + * @internal + */ +final class StreamFactory implements MetricFactoryInterface +{ + public function createAsynchronousObserver( + MetricRegistryInterface $registry, + ResourceInfo $resource, + InstrumentationScopeInterface $instrumentationScope, + Instrument $instrument, + int $timestamp, + iterable $views + ): array { + $streams = []; + $dedup = []; + foreach ($views as [$view, $registration]) { + if ($view->aggregation === null) { + continue; + } + + $dedupId = $this->streamId($view->aggregation, $view->attributeKeys); + if (($streamId = $dedup[$dedupId] ?? null) === null) { + $stream = new AsynchronousMetricStream($view->aggregation, $timestamp); + $streamId = $registry->registerAsynchronousStream($instrument, $stream, new MetricAggregatorFactory( + $this->attributeProcessor($view->attributeKeys), + $view->aggregation, + )); + + $streams[$streamId] = $stream; + $dedup[$dedupId] = $streamId; + } + + $this->registerSource( + $view, + $instrument, + $instrumentationScope, + $resource, + $streams[$streamId], + $registry, + $registration, + $streamId, + ); + } + + return array_keys($streams); + } + + public function createSynchronousWriter( + MetricRegistryInterface $registry, + ResourceInfo $resource, + InstrumentationScopeInterface $instrumentationScope, + Instrument $instrument, + int $timestamp, + iterable $views, + ?ExemplarFilterInterface $exemplarFilter = null + ): array { + $streams = []; + $dedup = []; + foreach ($views as [$view, $registration]) { + if ($view->aggregation === null) { + continue; + } + + $dedupId = $this->streamId($view->aggregation, $view->attributeKeys); + if (($streamId = $dedup[$dedupId] ?? null) === null) { + $stream = new SynchronousMetricStream($view->aggregation, $timestamp); + $streamId = $registry->registerSynchronousStream($instrument, $stream, new MetricAggregator( + $this->attributeProcessor($view->attributeKeys), + $view->aggregation, + $this->createExemplarReservoir($view->aggregation, $exemplarFilter), + )); + + $streams[$streamId] = $stream; + $dedup[$dedupId] = $streamId; + } + + $this->registerSource( + $view, + $instrument, + $instrumentationScope, + $resource, + $streams[$streamId], + $registry, + $registration, + $streamId, + ); + } + + return array_keys($streams); + } + + private function attributeProcessor( + ?array $attributeKeys + ): ?AttributeProcessorInterface { + return $attributeKeys !== null + ? new FilteredAttributeProcessor($attributeKeys) + : null; + } + + private function createExemplarReservoir( + AggregationInterface $aggregation, + ?ExemplarFilterInterface $exemplarFilter + ): ?ExemplarReservoirInterface { + if (!$exemplarFilter) { + return null; + } + + if ($aggregation instanceof ExplicitBucketHistogramAggregation && $aggregation->boundaries) { + $exemplarReservoir = new HistogramBucketReservoir($aggregation->boundaries); + } else { + $exemplarReservoir = new FixedSizeReservoir(); + } + + return new FilteredReservoir($exemplarReservoir, $exemplarFilter); + } + + private function registerSource( + ViewProjection $view, + Instrument $instrument, + InstrumentationScopeInterface $instrumentationScope, + ResourceInfo $resource, + MetricStreamInterface $stream, + MetricCollectorInterface $metricCollector, + MetricRegistrationInterface $metricRegistration, + int $streamId + ): void { + $provider = new StreamMetricSourceProvider( + $view, + $instrument, + $instrumentationScope, + $resource, + $stream, + $metricCollector, + $streamId, + ); + + $metricRegistration->register($provider, $provider); + } + + private function streamId(AggregationInterface $aggregation, ?array $attributeKeys): string + { + return $this->trySerialize($aggregation) . serialize($attributeKeys); + } + + private function trySerialize(object $object) + { + try { + return serialize($object); + } catch (Throwable $e) { + } + + return spl_object_id($object); + } +} |