summaryrefslogtreecommitdiff
path: root/vendor/aws/aws-sdk-php/src/Multipart/AbstractUploader.php
blob: 75e6794660a2a99d84976878e5852cff99211d02 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
<?php
namespace Aws\Multipart;

use Aws\AwsClientInterface as Client;
use Aws\Exception\AwsException;
use GuzzleHttp\Psr7;
use InvalidArgumentException as IAE;
use Psr\Http\Message\StreamInterface as Stream;

abstract class AbstractUploader extends AbstractUploadManager
{
    /** @var Stream Source of the data to be uploaded. */
    protected $source;

    /**
     * @param Client $client
     * @param mixed  $source
     * @param array  $config
     */
    public function __construct(Client $client, $source, array $config = [])
    {
        $this->source = $this->determineSource($source);
        parent::__construct($client, $config);
    }

    /**
     * Create a stream for a part that starts at the current position and
     * has a length of the upload part size (or less with the final part).
     *
     * @param Stream $stream
     *
     * @return Psr7\LimitStream
     */
    protected function limitPartStream(Stream $stream)
    {
        // Limit what is read from the stream to the part size.
        return new Psr7\LimitStream(
            $stream,
            $this->state->getPartSize(),
            $this->source->tell()
        );
    }

    protected function getUploadCommands(callable $resultHandler)
    {
        // Determine if the source can be seeked.
        $seekable = $this->source->isSeekable()
            && $this->source->getMetadata('wrapper_type') === 'plainfile';

        for ($partNumber = 1; $this->isEof($seekable); $partNumber++) {
            // If we haven't already uploaded this part, yield a new part.
            if (!$this->state->hasPartBeenUploaded($partNumber)) {
                $partStartPos = $this->source->tell();
                if (!($data = $this->createPart($seekable, $partNumber))) {
                    break;
                }
                $command = $this->client->getCommand(
                    $this->info['command']['upload'],
                    $data + $this->state->getId()
                );
                $command->getHandlerList()->appendSign($resultHandler, 'mup');
                $numberOfParts = $this->getNumberOfParts($this->state->getPartSize());
                if (isset($numberOfParts) && $partNumber > $numberOfParts) {
                    throw new $this->config['exception_class'](
                        $this->state,
                        new AwsException(
                            "Maximum part number for this job exceeded, file has likely been corrupted." .
                            "  Please restart this upload.",
                            $command
                        )
                    );
                }

                yield $command;
                if ($this->source->tell() > $partStartPos) {
                    continue;
                }
            }

            // Advance the source's offset if not already advanced.
            if ($seekable) {
                $this->source->seek(min(
                    $this->source->tell() + $this->state->getPartSize(),
                    $this->source->getSize()
                ));
            } else {
                $this->source->read($this->state->getPartSize());
            }
        }
    }

    /**
     * Generates the parameters for an upload part by analyzing a range of the
     * source starting from the current offset up to the part size.
     *
     * @param bool $seekable
     * @param int  $number
     *
     * @return array|null
     */
    abstract protected function createPart($seekable, $number);

    /**
     * Checks if the source is at EOF.
     *
     * @param bool $seekable
     *
     * @return bool
     */
    private function isEof($seekable)
    {
        return $seekable
            ? $this->source->tell() < $this->source->getSize()
            : !$this->source->eof();
    }

    /**
     * Turns the provided source into a stream and stores it.
     *
     * If a string is provided, it is assumed to be a filename, otherwise, it
     * passes the value directly to `Psr7\Utils::streamFor()`.
     *
     * @param mixed $source
     *
     * @return Stream
     */
    private function determineSource($source)
    {
        // Use the contents of a file as the data source.
        if (is_string($source)) {
            $source = Psr7\Utils::tryFopen($source, 'r');
        }

        // Create a source stream.
        $stream = Psr7\Utils::streamFor($source);
        if (!$stream->isReadable()) {
            throw new IAE('Source stream must be readable.');
        }

        return $stream;
    }

    protected function getNumberOfParts($partSize)
    {
        if ($sourceSize = $this->source->getSize()) {
            return ceil($sourceSize/$partSize);
        }
        return null;
    }
}