summaryrefslogtreecommitdiff
path: root/vendor/open-telemetry/sdk/Trace/SpanProcessor/SimpleSpanProcessor.php
blob: 4e86e79ab23d2465aa9da935dbbb1f80d950cb3b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
<?php

declare(strict_types=1);

namespace OpenTelemetry\SDK\Trace\SpanProcessor;

use Closure;
use OpenTelemetry\API\Behavior\LogsMessagesTrait;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Context\ContextInterface;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Trace\ReadableSpanInterface;
use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface;
use OpenTelemetry\SDK\Trace\SpanExporterInterface;
use OpenTelemetry\SDK\Trace\SpanProcessorInterface;
use SplQueue;
use function sprintf;
use Throwable;

class SimpleSpanProcessor implements SpanProcessorInterface
{
    use LogsMessagesTrait;

    private SpanExporterInterface $exporter;
    private ContextInterface $exportContext;

    private bool $running = false;
    /** @var SplQueue<array{Closure, string, bool, ContextInterface}> */
    private SplQueue $queue;

    private bool $closed = false;

    public function __construct(SpanExporterInterface $exporter)
    {
        $this->exporter = $exporter;

        $this->exportContext = Context::getCurrent();
        $this->queue = new SplQueue();
    }

    public function onStart(ReadWriteSpanInterface $span, ContextInterface $parentContext): void
    {
    }

    public function onEnd(ReadableSpanInterface $span): void
    {
        if ($this->closed) {
            return;
        }
        if (!$span->getContext()->isSampled()) {
            return;
        }

        $spanData = $span->toSpanData();
        $this->flush(fn () => $this->exporter->export([$spanData])->await(), 'export', false, $this->exportContext);
    }

    public function forceFlush(?CancellationInterface $cancellation = null): bool
    {
        if ($this->closed) {
            return false;
        }

        return $this->flush(fn (): bool => $this->exporter->forceFlush($cancellation), __FUNCTION__, true, Context::getCurrent());
    }

    public function shutdown(?CancellationInterface $cancellation = null): bool
    {
        if ($this->closed) {
            return false;
        }

        $this->closed = true;

        return $this->flush(fn (): bool => $this->exporter->shutdown($cancellation), __FUNCTION__, true, Context::getCurrent());
    }

    private function flush(Closure $task, string $taskName, bool $propagateResult, ContextInterface $context): bool
    {
        $this->queue->enqueue([$task, $taskName, $propagateResult && !$this->running, $context]);

        if ($this->running) {
            return false;
        }

        $success = true;
        $exception = null;
        $this->running = true;

        try {
            while (!$this->queue->isEmpty()) {
                [$task, $taskName, $propagateResult, $context] = $this->queue->dequeue();
                $scope = $context->activate();

                try {
                    $result = $task();
                    if ($propagateResult) {
                        $success = $result;
                    }
                } catch (Throwable $e) {
                    if ($propagateResult) {
                        $exception = $e;
                    } else {
                        self::logError(sprintf('Unhandled %s error', $taskName), ['exception' => $e]);
                    }
                } finally {
                    $scope->detach();
                }
            }
        } finally {
            $this->running = false;
        }

        if ($exception !== null) {
            throw $exception;
        }

        return $success;
    }
}