summaryrefslogtreecommitdiff
path: root/vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php')
-rw-r--r--vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php187
1 files changed, 187 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php b/vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php
new file mode 100644
index 000000000..2c3af4c06
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/MetricFactory/StreamFactory.php
@@ -0,0 +1,187 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\MetricFactory;
+
+use function array_keys;
+use OpenTelemetry\SDK\Common\Instrumentation\InstrumentationScopeInterface;
+use OpenTelemetry\SDK\Metrics\Aggregation\ExplicitBucketHistogramAggregation;
+use OpenTelemetry\SDK\Metrics\AggregationInterface;
+use OpenTelemetry\SDK\Metrics\AttributeProcessor\FilteredAttributeProcessor;
+use OpenTelemetry\SDK\Metrics\AttributeProcessorInterface;
+use OpenTelemetry\SDK\Metrics\Exemplar\ExemplarFilterInterface;
+use OpenTelemetry\SDK\Metrics\Exemplar\ExemplarReservoirInterface;
+use OpenTelemetry\SDK\Metrics\Exemplar\FilteredReservoir;
+use OpenTelemetry\SDK\Metrics\Exemplar\FixedSizeReservoir;
+use OpenTelemetry\SDK\Metrics\Exemplar\HistogramBucketReservoir;
+use OpenTelemetry\SDK\Metrics\Instrument;
+use OpenTelemetry\SDK\Metrics\MetricFactoryInterface;
+use OpenTelemetry\SDK\Metrics\MetricRegistrationInterface;
+use OpenTelemetry\SDK\Metrics\MetricRegistry\MetricCollectorInterface;
+use OpenTelemetry\SDK\Metrics\MetricRegistry\MetricRegistryInterface;
+use OpenTelemetry\SDK\Metrics\Stream\AsynchronousMetricStream;
+use OpenTelemetry\SDK\Metrics\Stream\MetricAggregator;
+use OpenTelemetry\SDK\Metrics\Stream\MetricAggregatorFactory;
+use OpenTelemetry\SDK\Metrics\Stream\MetricStreamInterface;
+use OpenTelemetry\SDK\Metrics\Stream\SynchronousMetricStream;
+use OpenTelemetry\SDK\Metrics\ViewProjection;
+use OpenTelemetry\SDK\Resource\ResourceInfo;
+use function serialize;
+use function spl_object_id;
+use Throwable;
+
+/**
+ * @internal
+ */
+final class StreamFactory implements MetricFactoryInterface
+{
+ public function createAsynchronousObserver(
+ MetricRegistryInterface $registry,
+ ResourceInfo $resource,
+ InstrumentationScopeInterface $instrumentationScope,
+ Instrument $instrument,
+ int $timestamp,
+ iterable $views
+ ): array {
+ $streams = [];
+ $dedup = [];
+ foreach ($views as [$view, $registration]) {
+ if ($view->aggregation === null) {
+ continue;
+ }
+
+ $dedupId = $this->streamId($view->aggregation, $view->attributeKeys);
+ if (($streamId = $dedup[$dedupId] ?? null) === null) {
+ $stream = new AsynchronousMetricStream($view->aggregation, $timestamp);
+ $streamId = $registry->registerAsynchronousStream($instrument, $stream, new MetricAggregatorFactory(
+ $this->attributeProcessor($view->attributeKeys),
+ $view->aggregation,
+ ));
+
+ $streams[$streamId] = $stream;
+ $dedup[$dedupId] = $streamId;
+ }
+
+ $this->registerSource(
+ $view,
+ $instrument,
+ $instrumentationScope,
+ $resource,
+ $streams[$streamId],
+ $registry,
+ $registration,
+ $streamId,
+ );
+ }
+
+ return array_keys($streams);
+ }
+
+ public function createSynchronousWriter(
+ MetricRegistryInterface $registry,
+ ResourceInfo $resource,
+ InstrumentationScopeInterface $instrumentationScope,
+ Instrument $instrument,
+ int $timestamp,
+ iterable $views,
+ ?ExemplarFilterInterface $exemplarFilter = null
+ ): array {
+ $streams = [];
+ $dedup = [];
+ foreach ($views as [$view, $registration]) {
+ if ($view->aggregation === null) {
+ continue;
+ }
+
+ $dedupId = $this->streamId($view->aggregation, $view->attributeKeys);
+ if (($streamId = $dedup[$dedupId] ?? null) === null) {
+ $stream = new SynchronousMetricStream($view->aggregation, $timestamp);
+ $streamId = $registry->registerSynchronousStream($instrument, $stream, new MetricAggregator(
+ $this->attributeProcessor($view->attributeKeys),
+ $view->aggregation,
+ $this->createExemplarReservoir($view->aggregation, $exemplarFilter),
+ ));
+
+ $streams[$streamId] = $stream;
+ $dedup[$dedupId] = $streamId;
+ }
+
+ $this->registerSource(
+ $view,
+ $instrument,
+ $instrumentationScope,
+ $resource,
+ $streams[$streamId],
+ $registry,
+ $registration,
+ $streamId,
+ );
+ }
+
+ return array_keys($streams);
+ }
+
+ private function attributeProcessor(
+ ?array $attributeKeys
+ ): ?AttributeProcessorInterface {
+ return $attributeKeys !== null
+ ? new FilteredAttributeProcessor($attributeKeys)
+ : null;
+ }
+
+ private function createExemplarReservoir(
+ AggregationInterface $aggregation,
+ ?ExemplarFilterInterface $exemplarFilter
+ ): ?ExemplarReservoirInterface {
+ if (!$exemplarFilter) {
+ return null;
+ }
+
+ if ($aggregation instanceof ExplicitBucketHistogramAggregation && $aggregation->boundaries) {
+ $exemplarReservoir = new HistogramBucketReservoir($aggregation->boundaries);
+ } else {
+ $exemplarReservoir = new FixedSizeReservoir();
+ }
+
+ return new FilteredReservoir($exemplarReservoir, $exemplarFilter);
+ }
+
+ private function registerSource(
+ ViewProjection $view,
+ Instrument $instrument,
+ InstrumentationScopeInterface $instrumentationScope,
+ ResourceInfo $resource,
+ MetricStreamInterface $stream,
+ MetricCollectorInterface $metricCollector,
+ MetricRegistrationInterface $metricRegistration,
+ int $streamId
+ ): void {
+ $provider = new StreamMetricSourceProvider(
+ $view,
+ $instrument,
+ $instrumentationScope,
+ $resource,
+ $stream,
+ $metricCollector,
+ $streamId,
+ );
+
+ $metricRegistration->register($provider, $provider);
+ }
+
+ private function streamId(AggregationInterface $aggregation, ?array $attributeKeys): string
+ {
+ return $this->trySerialize($aggregation) . serialize($attributeKeys);
+ }
+
+ private function trySerialize(object $object)
+ {
+ try {
+ return serialize($object);
+ } catch (Throwable $e) {
+ }
+
+ return spl_object_id($object);
+ }
+}