summaryrefslogtreecommitdiff
path: root/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php
blob: cb32f94df91a6196ef63c023a7fee7a363313768 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
<?php

declare(strict_types=1);

namespace OpenTelemetry\SDK\Metrics\Stream;

use function array_search;
use function count;
use OpenTelemetry\SDK\Metrics\AggregationInterface;
use OpenTelemetry\SDK\Metrics\Data\DataInterface;
use OpenTelemetry\SDK\Metrics\Data\Exemplar;
use OpenTelemetry\SDK\Metrics\Data\Temporality;

/**
 * @internal
 */
final class AsynchronousMetricStream implements MetricStreamInterface
{
    private AggregationInterface $aggregation;

    private int $startTimestamp;
    private Metric $metric;

    /** @var array<int, Metric|null> */
    private array $lastReads = [];

    public function __construct(AggregationInterface $aggregation, int $startTimestamp)
    {
        $this->aggregation = $aggregation;
        $this->startTimestamp = $startTimestamp;
        $this->metric = new Metric([], [], $startTimestamp);
    }

    public function temporality()
    {
        return Temporality::CUMULATIVE;
    }

    public function timestamp(): int
    {
        return $this->metric->timestamp;
    }

    public function push(Metric $metric): void
    {
        $this->metric = $metric;
    }

    public function register($temporality): int
    {
        if ($temporality === Temporality::CUMULATIVE) {
            return -1;
        }

        if (($reader = array_search(null, $this->lastReads, true)) === false) {
            $reader = count($this->lastReads);
        }

        $this->lastReads[$reader] = $this->metric;

        return $reader;
    }

    public function unregister(int $reader): void
    {
        if (!isset($this->lastReads[$reader])) {
            return;
        }

        $this->lastReads[$reader] = null;
    }

    public function collect(int $reader): DataInterface
    {
        $metric = $this->metric;

        if (($lastRead = $this->lastReads[$reader] ?? null) === null) {
            $temporality = Temporality::CUMULATIVE;
            $startTimestamp = $this->startTimestamp;
        } else {
            $temporality = Temporality::DELTA;
            $startTimestamp = $lastRead->timestamp;

            $this->lastReads[$reader] = $metric;
            $metric = $this->diff($lastRead, $metric);
        }

        return $this->aggregation->toData(
            $metric->attributes,
            $metric->summaries,
            Exemplar::groupByIndex($metric->exemplars),
            $startTimestamp,
            $metric->timestamp,
            $temporality,
        );
    }

    private function diff(Metric $lastRead, Metric $metric): Metric
    {
        $diff = clone $metric;
        foreach ($metric->summaries as $k => $summary) {
            if (!isset($lastRead->summaries[$k])) {
                continue;
            }

            $diff->summaries[$k] = $this->aggregation->diff($lastRead->summaries[$k], $summary);
        }

        return $diff;
    }
}