diff options
Diffstat (limited to 'vendor/open-telemetry/sdk/Metrics/MetricReader/ExportingReader.php')
-rw-r--r-- | vendor/open-telemetry/sdk/Metrics/MetricReader/ExportingReader.php | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Metrics/MetricReader/ExportingReader.php b/vendor/open-telemetry/sdk/Metrics/MetricReader/ExportingReader.php new file mode 100644 index 000000000..3c2eff9f1 --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/MetricReader/ExportingReader.php @@ -0,0 +1,156 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\MetricReader; + +use function array_keys; +use OpenTelemetry\SDK\Metrics\AggregationInterface; +use OpenTelemetry\SDK\Metrics\AggregationTemporalitySelectorInterface; +use OpenTelemetry\SDK\Metrics\DefaultAggregationProviderInterface; +use OpenTelemetry\SDK\Metrics\DefaultAggregationProviderTrait; +use OpenTelemetry\SDK\Metrics\MetricExporterInterface; +use OpenTelemetry\SDK\Metrics\MetricFactory\StreamMetricSourceProvider; +use OpenTelemetry\SDK\Metrics\MetricMetadataInterface; +use OpenTelemetry\SDK\Metrics\MetricReaderInterface; +use OpenTelemetry\SDK\Metrics\MetricRegistry\MetricCollectorInterface; +use OpenTelemetry\SDK\Metrics\MetricSourceInterface; +use OpenTelemetry\SDK\Metrics\MetricSourceProviderInterface; +use OpenTelemetry\SDK\Metrics\MetricSourceRegistryInterface; +use OpenTelemetry\SDK\Metrics\PushMetricExporterInterface; +use OpenTelemetry\SDK\Metrics\StalenessHandlerInterface; +use function spl_object_id; + +final class ExportingReader implements MetricReaderInterface, MetricSourceRegistryInterface, DefaultAggregationProviderInterface +{ + use DefaultAggregationProviderTrait { defaultAggregation as private _defaultAggregation; } + + private MetricExporterInterface $exporter; + /** @var array<int, MetricSourceInterface> */ + private array $sources = []; + + /** @var array<int, MetricCollectorInterface> */ + private array $registries = []; + /** @var array<int, array<int, int>> */ + private array $streamIds = []; + + private bool $closed = false; + + public function __construct(MetricExporterInterface $exporter) + { + $this->exporter = $exporter; + } + + public function defaultAggregation($instrumentType): ?AggregationInterface + { + if ($this->exporter instanceof DefaultAggregationProviderInterface) { + return $this->exporter->defaultAggregation($instrumentType); + } + + return $this->_defaultAggregation($instrumentType); + } + + public function add(MetricSourceProviderInterface $provider, MetricMetadataInterface $metadata, StalenessHandlerInterface $stalenessHandler): void + { + if ($this->closed) { + return; + } + if (!$this->exporter instanceof AggregationTemporalitySelectorInterface) { + return; + } + if (!$temporality = $this->exporter->temporality($metadata)) { + return; + } + + $source = $provider->create($temporality); + $sourceId = spl_object_id($source); + + $this->sources[$sourceId] = $source; + $stalenessHandler->onStale(function () use ($sourceId): void { + unset($this->sources[$sourceId]); + }); + + if (!$provider instanceof StreamMetricSourceProvider) { + return; + } + + $streamId = $provider->streamId; + $registry = $provider->metricCollector; + $registryId = spl_object_id($registry); + + $this->registries[$registryId] = $registry; + $this->streamIds[$registryId][$streamId] ??= 0; + $this->streamIds[$registryId][$streamId]++; + + $stalenessHandler->onStale(function () use ($streamId, $registryId): void { + if (!--$this->streamIds[$registryId][$streamId]) { + unset($this->streamIds[$registryId][$streamId]); + if (!$this->streamIds[$registryId]) { + unset( + $this->registries[$registryId], + $this->streamIds[$registryId], + ); + } + } + }); + } + + private function doCollect(): bool + { + foreach ($this->registries as $registryId => $registry) { + $streamIds = $this->streamIds[$registryId] ?? []; + $registry->collectAndPush(array_keys($streamIds)); + } + + $metrics = []; + foreach ($this->sources as $source) { + $metrics[] = $source->collect(); + } + + if ($metrics === []) { + return true; + } + + return $this->exporter->export($metrics); + } + + public function collect(): bool + { + if ($this->closed) { + return false; + } + + return $this->doCollect(); + } + + public function shutdown(): bool + { + if ($this->closed) { + return false; + } + + $this->closed = true; + + $collect = $this->doCollect(); + $shutdown = $this->exporter->shutdown(); + + $this->sources = []; + + return $collect && $shutdown; + } + + public function forceFlush(): bool + { + if ($this->closed) { + return false; + } + if ($this->exporter instanceof PushMetricExporterInterface) { + $collect = $this->doCollect(); + $forceFlush = $this->exporter->forceFlush(); + + return $collect && $forceFlush; + } + + return true; + } +} |