null, 'batch_size' => 25, 'pool_size' => 1, 'autoflush' => true, 'before' => null, 'error' => null ]; // Ensure the batch size is valid if ($config['batch_size'] > 25 || $config['batch_size'] < 2) { throw new \InvalidArgumentException('"batch_size" must be between 2 and 25.'); } // Ensure the callbacks are valid if ($config['before'] && !is_callable($config['before'])) { throw new \InvalidArgumentException('"before" must be callable.'); } if ($config['error'] && !is_callable($config['error'])) { throw new \InvalidArgumentException('"error" must be callable.'); } // If autoflush is enabled, set the threshold if ($config['autoflush']) { $config['threshold'] = $config['batch_size'] * $config['pool_size']; } $this->client = $client; $this->config = $config; $this->queue = []; } /** * Adds a put item request to the batch. * * @param array $item Data for an item to put. Format: * [ * 'attribute1' => ['type' => 'value'], * 'attribute2' => ['type' => 'value'], * ... * ] * @param string|null $table The name of the table. This must be specified * unless the "table" option was provided in the * config of the WriteRequestBatch. * * @return $this */ public function put(array $item, $table = null) { $this->queue[] = [ 'table' => $this->determineTable($table), 'data' => ['PutRequest' => ['Item' => $item]], ]; $this->autoFlush(); return $this; } /** * Adds a delete item request to the batch. * * @param array $key Key of an item to delete. Format: * [ * 'key1' => ['type' => 'value'], * ... * ] * @param string|null $table The name of the table. This must be specified * unless the "table" option was provided in the * config of the WriteRequestBatch. * * @return $this */ public function delete(array $key, $table = null) { $this->queue[] = [ 'table' => $this->determineTable($table), 'data' => ['DeleteRequest' => ['Key' => $key]], ]; $this->autoFlush(); return $this; } /** * Flushes the batch by combining all the queued put and delete requests * into BatchWriteItem commands and executing them. Unprocessed items are * automatically re-queued. * * @param bool $untilEmpty If true, flushing will continue until the queue * is completely empty. This will make sure that * unprocessed items are all eventually sent. * * @return $this */ public function flush($untilEmpty = true) { // Send BatchWriteItem requests until the queue is empty $keepFlushing = true; while ($this->queue && $keepFlushing) { $commands = $this->prepareCommands(); $pool = new CommandPool($this->client, $commands, [ 'before' => $this->config['before'], 'concurrency' => $this->config['pool_size'], 'fulfilled' => function (ResultInterface $result) { // Re-queue any unprocessed items if ($result->hasKey('UnprocessedItems')) { $this->retryUnprocessed($result['UnprocessedItems']); } }, 'rejected' => function ($reason) { if ($reason instanceof AwsException) { $code = $reason->getAwsErrorCode(); if ($code === 'ProvisionedThroughputExceededException') { $this->retryUnprocessed($reason->getCommand()['RequestItems']); } elseif (is_callable($this->config['error'])) { $this->config['error']($reason); } } } ]); $pool->promise()->wait(); $keepFlushing = (bool) $untilEmpty; } return $this; } /** * Creates BatchWriteItem commands from the items in the queue. * * @return CommandInterface[] */ private function prepareCommands() { // Chunk the queue into batches $batches = array_chunk($this->queue, $this->config['batch_size']); $this->queue = []; // Create BatchWriteItem commands for each batch $commands = []; foreach ($batches as $batch) { $requests = []; foreach ($batch as $item) { if (!isset($requests[$item['table']])) { $requests[$item['table']] = []; } $requests[$item['table']][] = $item['data']; } $commands[] = $this->client->getCommand( 'BatchWriteItem', ['RequestItems' => $requests] ); } return $commands; } /** * Re-queues unprocessed results with the correct data. * * @param array $unprocessed Unprocessed items from a result. */ private function retryUnprocessed(array $unprocessed) { foreach ($unprocessed as $table => $requests) { foreach ($requests as $request) { $this->queue[] = [ 'table' => $table, 'data' => $request, ]; } } } /** * If autoflush is enabled and the threshold is met, flush the batch */ private function autoFlush() { if ($this->config['autoflush'] && count($this->queue) >= $this->config['threshold'] ) { // Flush only once. Unprocessed items are handled in a later flush. $this->flush(false); } } /** * Determine the table name by looking at what was provided and what the * WriteRequestBatch was originally configured with. * * @param string|null $table The table name. * * @return string * @throws \RuntimeException if there was no table specified. */ private function determineTable($table) { $table = $table ?: $this->config['table']; if (!$table) { throw new \RuntimeException('There was no table specified.'); } return $table; } }