summaryrefslogtreecommitdiff
path: root/vendor/open-telemetry/sdk/Metrics/Stream
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/open-telemetry/sdk/Metrics/Stream')
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php111
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/Delta.php33
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/DeltaStorage.php110
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/Metric.php44
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregator.php73
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorFactory.php28
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorFactoryInterface.php13
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorInterface.php12
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/MetricCollectorInterface.php13
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/MetricStreamInterface.php58
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/SynchronousMetricStream.php126
-rw-r--r--vendor/open-telemetry/sdk/Metrics/Stream/WritableMetricStreamInterface.php19
12 files changed, 640 insertions, 0 deletions
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php b/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php
new file mode 100644
index 000000000..cb32f94df
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/AsynchronousMetricStream.php
@@ -0,0 +1,111 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use function array_search;
+use function count;
+use OpenTelemetry\SDK\Metrics\AggregationInterface;
+use OpenTelemetry\SDK\Metrics\Data\DataInterface;
+use OpenTelemetry\SDK\Metrics\Data\Exemplar;
+use OpenTelemetry\SDK\Metrics\Data\Temporality;
+
+/**
+ * @internal
+ */
+final class AsynchronousMetricStream implements MetricStreamInterface
+{
+ private AggregationInterface $aggregation;
+
+ private int $startTimestamp;
+ private Metric $metric;
+
+ /** @var array<int, Metric|null> */
+ private array $lastReads = [];
+
+ public function __construct(AggregationInterface $aggregation, int $startTimestamp)
+ {
+ $this->aggregation = $aggregation;
+ $this->startTimestamp = $startTimestamp;
+ $this->metric = new Metric([], [], $startTimestamp);
+ }
+
+ public function temporality()
+ {
+ return Temporality::CUMULATIVE;
+ }
+
+ public function timestamp(): int
+ {
+ return $this->metric->timestamp;
+ }
+
+ public function push(Metric $metric): void
+ {
+ $this->metric = $metric;
+ }
+
+ public function register($temporality): int
+ {
+ if ($temporality === Temporality::CUMULATIVE) {
+ return -1;
+ }
+
+ if (($reader = array_search(null, $this->lastReads, true)) === false) {
+ $reader = count($this->lastReads);
+ }
+
+ $this->lastReads[$reader] = $this->metric;
+
+ return $reader;
+ }
+
+ public function unregister(int $reader): void
+ {
+ if (!isset($this->lastReads[$reader])) {
+ return;
+ }
+
+ $this->lastReads[$reader] = null;
+ }
+
+ public function collect(int $reader): DataInterface
+ {
+ $metric = $this->metric;
+
+ if (($lastRead = $this->lastReads[$reader] ?? null) === null) {
+ $temporality = Temporality::CUMULATIVE;
+ $startTimestamp = $this->startTimestamp;
+ } else {
+ $temporality = Temporality::DELTA;
+ $startTimestamp = $lastRead->timestamp;
+
+ $this->lastReads[$reader] = $metric;
+ $metric = $this->diff($lastRead, $metric);
+ }
+
+ return $this->aggregation->toData(
+ $metric->attributes,
+ $metric->summaries,
+ Exemplar::groupByIndex($metric->exemplars),
+ $startTimestamp,
+ $metric->timestamp,
+ $temporality,
+ );
+ }
+
+ private function diff(Metric $lastRead, Metric $metric): Metric
+ {
+ $diff = clone $metric;
+ foreach ($metric->summaries as $k => $summary) {
+ if (!isset($lastRead->summaries[$k])) {
+ continue;
+ }
+
+ $diff->summaries[$k] = $this->aggregation->diff($lastRead->summaries[$k], $summary);
+ }
+
+ return $diff;
+ }
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/Delta.php b/vendor/open-telemetry/sdk/Metrics/Stream/Delta.php
new file mode 100644
index 000000000..a4ff56d71
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/Delta.php
@@ -0,0 +1,33 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use GMP;
+
+/**
+ * @internal
+ */
+final class Delta
+{
+ public Metric $metric;
+ /**
+ * @psalm-suppress UndefinedDocblockClass
+ * @phan-suppress PhanUndeclaredTypeProperty
+ * @var int|GMP
+ */
+ public $readers;
+ public ?self $prev;
+ /**
+ * @psalm-suppress UndefinedDocblockClass
+ * @phan-suppress PhanUndeclaredTypeParameter
+ * @param int|GMP $readers
+ */
+ public function __construct(Metric $metric, $readers, ?self $prev = null)
+ {
+ $this->metric = $metric;
+ $this->readers = $readers;
+ $this->prev = $prev;
+ }
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/DeltaStorage.php b/vendor/open-telemetry/sdk/Metrics/Stream/DeltaStorage.php
new file mode 100644
index 000000000..b46a28d65
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/DeltaStorage.php
@@ -0,0 +1,110 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use function assert;
+use GMP;
+use OpenTelemetry\SDK\Metrics\AggregationInterface;
+
+/**
+ * @internal
+ */
+final class DeltaStorage
+{
+ private AggregationInterface $aggregation;
+ private Delta $head;
+
+ public function __construct(AggregationInterface $aggregation)
+ {
+ $this->aggregation = $aggregation;
+ $this->head = new Delta(new Metric([], [], 0), 0);
+
+ /** @phan-suppress-next-line PhanTypeObjectUnsetDeclaredProperty */
+ unset($this->head->metric);
+ }
+
+ /**
+ * @psalm-suppress UndefinedDocblockClass
+ * @phan-suppress PhanUndeclaredTypeParameter
+ * @param int|GMP $readers
+ */
+ public function add(Metric $metric, $readers): void
+ {
+ /** @phpstan-ignore-next-line */
+ if ($readers == 0) {
+ return;
+ }
+
+ if (($this->head->prev->readers ?? null) != $readers) {
+ $this->head->prev = new Delta($metric, $readers, $this->head->prev);
+ } else {
+ assert($this->head->prev !== null);
+ $this->mergeInto($this->head->prev->metric, $metric);
+ }
+ }
+
+ public function collect(int $reader, bool $retain = false): ?Metric
+ {
+ $n = null;
+ for ($d = $this->head; $d->prev; $d = $d->prev) {
+ if (($d->prev->readers >> $reader & 1) != 0) {
+ if ($n !== null) {
+ assert($n->prev !== null);
+ $n->prev->readers ^= $d->prev->readers;
+ $this->mergeInto($d->prev->metric, $n->prev->metric);
+ $this->tryUnlink($n);
+
+ if ($n->prev === $d->prev) {
+ continue;
+ }
+ }
+
+ $n = $d;
+ }
+ }
+
+ $metric = $n->prev->metric ?? null;
+
+ if (!$retain && $n) {
+ assert($n->prev !== null);
+ $n->prev->readers ^= ($n->prev->readers & 1 | 1) << $reader;
+ $this->tryUnlink($n);
+ }
+
+ return $metric;
+ }
+
+ private function tryUnlink(Delta $n): void
+ {
+ assert($n->prev !== null);
+ /** @phpstan-ignore-next-line */
+ if ($n->prev->readers == 0) {
+ $n->prev = $n->prev->prev;
+
+ return;
+ }
+
+ for ($c = $n->prev->prev;
+ $c && ($n->prev->readers & $c->readers) == 0;
+ $c = $c->prev) {
+ }
+
+ if ($c && $n->prev->readers === $c->readers) {
+ $this->mergeInto($c->metric, $n->prev->metric);
+ $n->prev = $n->prev->prev;
+ }
+ }
+
+ private function mergeInto(Metric $into, Metric $metric): void
+ {
+ foreach ($metric->summaries as $k => $summary) {
+ $into->attributes[$k] ??= $metric->attributes[$k];
+ $into->summaries[$k] = isset($into->summaries[$k])
+ ? $this->aggregation->merge($into->summaries[$k], $summary)
+ : $summary;
+ }
+ $into->exemplars += $metric->exemplars;
+ }
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/Metric.php b/vendor/open-telemetry/sdk/Metrics/Stream/Metric.php
new file mode 100644
index 000000000..6b1db9eef
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/Metric.php
@@ -0,0 +1,44 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use OpenTelemetry\SDK\Common\Attribute\AttributesInterface;
+use OpenTelemetry\SDK\Metrics\Data\Exemplar;
+
+/**
+ * @internal
+ *
+ * @template T
+ */
+final class Metric
+{
+
+ /**
+ * @var array<AttributesInterface>
+ */
+ public array $attributes;
+ /**
+ * @var array<T>
+ */
+ public array $summaries;
+ public int $timestamp;
+ /**
+ * @var array<Exemplar>
+ */
+ public array $exemplars;
+
+ /**
+ * @param array<AttributesInterface> $attributes
+ * @param array<T> $summaries
+ * @param array<Exemplar> $exemplars
+ */
+ public function __construct(array $attributes, array $summaries, int $timestamp, array $exemplars = [])
+ {
+ $this->attributes = $attributes;
+ $this->summaries = $summaries;
+ $this->timestamp = $timestamp;
+ $this->exemplars = $exemplars;
+ }
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregator.php b/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregator.php
new file mode 100644
index 000000000..b1328eb07
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregator.php
@@ -0,0 +1,73 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use OpenTelemetry\Context\ContextInterface;
+use OpenTelemetry\SDK\Common\Attribute\AttributesInterface;
+use OpenTelemetry\SDK\Metrics\AggregationInterface;
+use OpenTelemetry\SDK\Metrics\AttributeProcessorInterface;
+use OpenTelemetry\SDK\Metrics\Exemplar\ExemplarReservoirInterface;
+use function serialize;
+
+/**
+ * @internal
+ */
+final class MetricAggregator implements MetricAggregatorInterface
+{
+ private ?AttributeProcessorInterface $attributeProcessor;
+ private AggregationInterface $aggregation;
+ private ?ExemplarReservoirInterface $exemplarReservoir;
+
+ /** @var array<AttributesInterface> */
+ private array $attributes = [];
+ private array $summaries = [];
+
+ public function __construct(
+ ?AttributeProcessorInterface $attributeProcessor,
+ AggregationInterface $aggregation,
+ ?ExemplarReservoirInterface $exemplarReservoir = null
+ ) {
+ $this->attributeProcessor = $attributeProcessor;
+ $this->aggregation = $aggregation;
+ $this->exemplarReservoir = $exemplarReservoir;
+ }
+
+ /**
+ * @param float|int $value
+ */
+ public function record($value, AttributesInterface $attributes, ContextInterface $context, int $timestamp): void
+ {
+ $filteredAttributes = $this->attributeProcessor !== null
+ ? $this->attributeProcessor->process($attributes, $context)
+ : $attributes;
+ $raw = $filteredAttributes->toArray();
+ $index = $raw !== [] ? serialize($raw) : 0;
+ $this->attributes[$index] ??= $filteredAttributes;
+ $this->aggregation->record(
+ $this->summaries[$index] ??= $this->aggregation->initialize(),
+ $value,
+ $attributes,
+ $context,
+ $timestamp,
+ );
+
+ if ($this->exemplarReservoir !== null) {
+ $this->exemplarReservoir->offer($index, $value, $attributes, $context, $timestamp);
+ }
+ }
+
+ public function collect(int $timestamp): Metric
+ {
+ $exemplars = $this->exemplarReservoir
+ ? $this->exemplarReservoir->collect($this->attributes)
+ : [];
+ $metric = new Metric($this->attributes, $this->summaries, $timestamp, $exemplars);
+
+ $this->attributes = [];
+ $this->summaries = [];
+
+ return $metric;
+ }
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorFactory.php b/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorFactory.php
new file mode 100644
index 000000000..5866a72b7
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorFactory.php
@@ -0,0 +1,28 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use OpenTelemetry\SDK\Metrics\AggregationInterface;
+use OpenTelemetry\SDK\Metrics\AttributeProcessorInterface;
+
+/**
+ * @internal
+ */
+final class MetricAggregatorFactory implements MetricAggregatorFactoryInterface
+{
+ private ?AttributeProcessorInterface $attributeProcessor;
+ private AggregationInterface $aggregation;
+
+ public function __construct(?AttributeProcessorInterface $attributeProcessor, AggregationInterface $aggregation)
+ {
+ $this->attributeProcessor = $attributeProcessor;
+ $this->aggregation = $aggregation;
+ }
+
+ public function create(): MetricAggregatorInterface
+ {
+ return new MetricAggregator($this->attributeProcessor, $this->aggregation);
+ }
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorFactoryInterface.php b/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorFactoryInterface.php
new file mode 100644
index 000000000..356f682f2
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorFactoryInterface.php
@@ -0,0 +1,13 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+/**
+ * @internal
+ */
+interface MetricAggregatorFactoryInterface
+{
+ public function create(): MetricAggregatorInterface;
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorInterface.php b/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorInterface.php
new file mode 100644
index 000000000..2f5cfbf15
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/MetricAggregatorInterface.php
@@ -0,0 +1,12 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+/**
+ * @internal
+ */
+interface MetricAggregatorInterface extends WritableMetricStreamInterface, MetricCollectorInterface
+{
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/MetricCollectorInterface.php b/vendor/open-telemetry/sdk/Metrics/Stream/MetricCollectorInterface.php
new file mode 100644
index 000000000..51a728df7
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/MetricCollectorInterface.php
@@ -0,0 +1,13 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+/**
+ * @internal
+ */
+interface MetricCollectorInterface
+{
+ public function collect(int $timestamp): Metric;
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/MetricStreamInterface.php b/vendor/open-telemetry/sdk/Metrics/Stream/MetricStreamInterface.php
new file mode 100644
index 000000000..1373a1c93
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/MetricStreamInterface.php
@@ -0,0 +1,58 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use OpenTelemetry\SDK\Metrics\Data\DataInterface;
+use OpenTelemetry\SDK\Metrics\Data\Temporality;
+
+/**
+ * @internal
+ */
+interface MetricStreamInterface
+{
+ /**
+ * Returns the internal temporality of this stream.
+ *
+ * @return string|Temporality internal temporality
+ */
+ public function temporality();
+
+ /**
+ * Returns the last metric timestamp.
+ *
+ * @return int metric timestamp
+ */
+ public function timestamp(): int;
+
+ /**
+ * Pushes metric data to the stream.
+ *
+ * @param Metric $metric metric data to push
+ */
+ public function push(Metric $metric): void;
+
+ /**
+ * Registers a new reader with the given temporality.
+ *
+ * @param string|Temporality $temporality temporality to use
+ * @return int reader id
+ */
+ public function register($temporality): int;
+
+ /**
+ * Unregisters the given reader.
+ *
+ * @param int $reader reader id
+ */
+ public function unregister(int $reader): void;
+
+ /**
+ * Collects metric data for the given reader.
+ *
+ * @param int $reader reader id
+ * @return DataInterface metric data
+ */
+ public function collect(int $reader): DataInterface;
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/SynchronousMetricStream.php b/vendor/open-telemetry/sdk/Metrics/Stream/SynchronousMetricStream.php
new file mode 100644
index 000000000..52645504c
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/SynchronousMetricStream.php
@@ -0,0 +1,126 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use function assert;
+use const E_USER_WARNING;
+use function extension_loaded;
+use GMP;
+use function gmp_init;
+use function is_int;
+use OpenTelemetry\SDK\Metrics\AggregationInterface;
+use OpenTelemetry\SDK\Metrics\Data\DataInterface;
+use OpenTelemetry\SDK\Metrics\Data\Exemplar;
+use OpenTelemetry\SDK\Metrics\Data\Temporality;
+use const PHP_INT_SIZE;
+use function sprintf;
+use function trigger_error;
+
+/**
+ * @internal
+ */
+final class SynchronousMetricStream implements MetricStreamInterface
+{
+ private AggregationInterface $aggregation;
+
+ private int $timestamp;
+
+ private DeltaStorage $delta;
+ /**
+ * @psalm-suppress UndefinedDocblockClass
+ * @phan-suppress PhanUndeclaredTypeProperty
+ * @var int|GMP
+ */
+ private $readers = 0;
+ /**
+ * @psalm-suppress UndefinedDocblockClass
+ * @phan-suppress PhanUndeclaredTypeProperty
+ * @var int|GMP
+ */
+ private $cumulative = 0;
+
+ public function __construct(AggregationInterface $aggregation, int $startTimestamp)
+ {
+ $this->aggregation = $aggregation;
+ $this->timestamp = $startTimestamp;
+ $this->delta = new DeltaStorage($aggregation);
+ }
+
+ public function temporality()
+ {
+ return Temporality::DELTA;
+ }
+
+ public function timestamp(): int
+ {
+ return $this->timestamp;
+ }
+
+ public function push(Metric $metric): void
+ {
+ [$this->timestamp, $metric->timestamp] = [$metric->timestamp, $this->timestamp];
+ $this->delta->add($metric, $this->readers);
+ }
+
+ public function register($temporality): int
+ {
+ $reader = 0;
+ for ($r = $this->readers; ($r & 1) != 0; $r >>= 1, $reader++) {
+ }
+
+ if ($reader === (PHP_INT_SIZE << 3) - 1 && is_int($this->readers)) {
+ if (!extension_loaded('gmp')) {
+ trigger_error(sprintf('GMP extension required to register over %d readers', (PHP_INT_SIZE << 3) - 1), E_USER_WARNING);
+ $reader = PHP_INT_SIZE << 3;
+ } else {
+ assert(is_int($this->cumulative));
+ $this->readers = gmp_init($this->readers);
+ $this->cumulative = gmp_init($this->cumulative);
+ }
+ }
+
+ $readerMask = ($this->readers & 1 | 1) << $reader;
+ $this->readers ^= $readerMask;
+ if ($temporality === Temporality::CUMULATIVE) {
+ $this->cumulative ^= $readerMask;
+ }
+
+ return $reader;
+ }
+
+ public function unregister(int $reader): void
+ {
+ $readerMask = ($this->readers & 1 | 1) << $reader;
+ if (($this->readers & $readerMask) == 0) {
+ return;
+ }
+
+ $this->delta->collect($reader);
+
+ $this->readers ^= $readerMask;
+ if (($this->cumulative & $readerMask) != 0) {
+ $this->cumulative ^= $readerMask;
+ }
+ }
+
+ public function collect(int $reader): DataInterface
+ {
+ $cumulative = ($this->cumulative >> $reader & 1) != 0;
+ $metric = $this->delta->collect($reader, $cumulative) ?? new Metric([], [], $this->timestamp);
+
+ $temporality = $cumulative
+ ? Temporality::CUMULATIVE
+ : Temporality::DELTA;
+
+ return $this->aggregation->toData(
+ $metric->attributes,
+ $metric->summaries,
+ Exemplar::groupByIndex($metric->exemplars),
+ $metric->timestamp,
+ $this->timestamp,
+ $temporality,
+ );
+ }
+}
diff --git a/vendor/open-telemetry/sdk/Metrics/Stream/WritableMetricStreamInterface.php b/vendor/open-telemetry/sdk/Metrics/Stream/WritableMetricStreamInterface.php
new file mode 100644
index 000000000..9fd425a44
--- /dev/null
+++ b/vendor/open-telemetry/sdk/Metrics/Stream/WritableMetricStreamInterface.php
@@ -0,0 +1,19 @@
+<?php
+
+declare(strict_types=1);
+
+namespace OpenTelemetry\SDK\Metrics\Stream;
+
+use OpenTelemetry\Context\ContextInterface;
+use OpenTelemetry\SDK\Common\Attribute\AttributesInterface;
+
+/**
+ * @internal
+ */
+interface WritableMetricStreamInterface
+{
+ /**
+ * @param float|int $value
+ */
+ public function record($value, AttributesInterface $attributes, ContextInterface $context, int $timestamp): void;
+}