summaryrefslogtreecommitdiff
path: root/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php')
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php111
1 files changed, 111 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php b/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php
new file mode 100644
index 000000000..cb32f94df
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php
@@ -0,0 +1,111 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use function array_search;
+use function count;
+use OpenTelemetry\SDK\Metrics\AggregationInterface;
+use OpenTelemetry\SDK\Metrics\Data\DataInterface;
+use OpenTelemetry\SDK\Metrics\Data\Exemplar;
+use OpenTelemetry\SDK\Metrics\Data\Temporality;
+
+/**
+ * @internal
+ */
+final class AsynchronousMetricStream implements MetricStreamInterface
+{
+ private AggregationInterface $aggregation;
+
+ private int $startTimestamp;
+ private Metric $metric;
+
+ /** @var array<int, Metric|null> */
+ private array $lastReads = [];
+
+ public function __construct(AggregationInterface $aggregation, int $startTimestamp)
+ {
+ $this->aggregation = $aggregation;
+ $this->startTimestamp = $startTimestamp;
+ $this->metric = new Metric([], [], $startTimestamp);
+ }
+
+ public function temporality()
+ {
+ return Temporality::CUMULATIVE;
+ }
+
+ public function timestamp(): int
+ {
+ return $this->metric->timestamp;
+ }
+
+ public function push(Metric $metric): void
+ {
+ $this->metric = $metric;
+ }
+
+ public function register($temporality): int
+ {
+ if ($temporality === Temporality::CUMULATIVE) {
+ return -1;
+ }
+
+ if (($reader = array_search(null, $this->lastReads, true)) === false) {
+ $reader = count($this->lastReads);
+ }
+
+ $this->lastReads[$reader] = $this->metric;
+
+ return $reader;
+ }
+
+ public function unregister(int $reader): void
+ {
+ if (!isset($this->lastReads[$reader])) {
+ return;
+ }
+
+ $this->lastReads[$reader] = null;
+ }
+
+ public function collect(int $reader): DataInterface
+ {
+ $metric = $this->metric;
+
+ if (($lastRead = $this->lastReads[$reader] ?? null) === null) {
+ $temporality = Temporality::CUMULATIVE;
+ $startTimestamp = $this->startTimestamp;
+ } else {
+ $temporality = Temporality::DELTA;
+ $startTimestamp = $lastRead->timestamp;
+
+ $this->lastReads[$reader] = $metric;
+ $metric = $this->diff($lastRead, $metric);
+ }
+
+ return $this->aggregation->toData(
+ $metric->attributes,
+ $metric->summaries,
+ Exemplar::groupByIndex($metric->exemplars),
+ $startTimestamp,
+ $metric->timestamp,
+ $temporality,
+ );
+ }
+
+ private function diff(Metric $lastRead, Metric $metric): Metric
+ {
+ $diff = clone $metric;
+ foreach ($metric->summaries as $k => $summary) {
+ if (!isset($lastRead->summaries[$k])) {
+ continue;
+ }
+
+ $diff->summaries[$k] = $this->aggregation->diff($lastRead->summaries[$k], $summary);
+ }
+
+ return $diff;
+ }
+}