diff options
author | Andrew Dolgov <[email protected]> | 2023-10-20 17:12:29 +0300 |
---|---|---|
committer | Andrew Dolgov <[email protected]> | 2023-10-20 21:13:39 +0300 |
commit | cdd7ad020e165fe680703b6d3319b908b682fb7a (patch) | |
tree | b51eb09b7b4587e8fbc5624ac8d88d28cfcd0b04 /vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php | |
parent | 45a9ff0c88cbd33892ff16ab837e9059937d656e (diff) |
jaeger-client -> opentelemetry
Diffstat (limited to 'vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php')
-rw-r--r-- | vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php | 111 |
1 files changed, 111 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php b/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php new file mode 100644 index 000000000..cb32f94df --- /dev/null +++ b/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php @@ -0,0 +1,111 @@ +<?php + +declare(strict_types=1); + +namespace OpenTelemetry\SDK\Metrics\Stream; + +use function array_search; +use function count; +use OpenTelemetry\SDK\Metrics\AggregationInterface; +use OpenTelemetry\SDK\Metrics\Data\DataInterface; +use OpenTelemetry\SDK\Metrics\Data\Exemplar; +use OpenTelemetry\SDK\Metrics\Data\Temporality; + +/** + * @internal + */ +final class AsynchronousMetricStream implements MetricStreamInterface +{ + private AggregationInterface $aggregation; + + private int $startTimestamp; + private Metric $metric; + + /** @var array<int, Metric|null> */ + private array $lastReads = []; + + public function __construct(AggregationInterface $aggregation, int $startTimestamp) + { + $this->aggregation = $aggregation; + $this->startTimestamp = $startTimestamp; + $this->metric = new Metric([], [], $startTimestamp); + } + + public function temporality() + { + return Temporality::CUMULATIVE; + } + + public function timestamp(): int + { + return $this->metric->timestamp; + } + + public function push(Metric $metric): void + { + $this->metric = $metric; + } + + public function register($temporality): int + { + if ($temporality === Temporality::CUMULATIVE) { + return -1; + } + + if (($reader = array_search(null, $this->lastReads, true)) === false) { + $reader = count($this->lastReads); + } + + $this->lastReads[$reader] = $this->metric; + + return $reader; + } + + public function unregister(int $reader): void + { + if (!isset($this->lastReads[$reader])) { + return; + } + + $this->lastReads[$reader] = null; + } + + public function collect(int $reader): DataInterface + { + $metric = $this->metric; + + if (($lastRead = $this->lastReads[$reader] ?? null) === null) { + $temporality = Temporality::CUMULATIVE; + $startTimestamp = $this->startTimestamp; + } else { + $temporality = Temporality::DELTA; + $startTimestamp = $lastRead->timestamp; + + $this->lastReads[$reader] = $metric; + $metric = $this->diff($lastRead, $metric); + } + + return $this->aggregation->toData( + $metric->attributes, + $metric->summaries, + Exemplar::groupByIndex($metric->exemplars), + $startTimestamp, + $metric->timestamp, + $temporality, + ); + } + + private function diff(Metric $lastRead, Metric $metric): Metric + { + $diff = clone $metric; + foreach ($metric->summaries as $k => $summary) { + if (!isset($lastRead->summaries[$k])) { + continue; + } + + $diff->summaries[$k] = $this->aggregation->diff($lastRead->summaries[$k], $summary); + } + + return $diff; + } +} |