Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions cloudinary/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from cloudinary import utils
from cloudinary.api_client.execute_request import EXCEPTION_CODES
from cloudinary.cache.responsive_breakpoints_cache import instance as responsive_breakpoints_cache_instance
from cloudinary.exceptions import Error
from cloudinary.exceptions import Error, AuthorizationRequired
from cloudinary.utils import build_eager

try:
Expand Down Expand Up @@ -262,6 +262,32 @@ def upload_resource(file, **options):
)


def _upload_large_part_with_auth_retry(file, http_headers, options):
"""
Uploads a single chunk, recovering once from an expired OAuth token via the
optional oauth_token_refresh_callback config hook. Retries the same chunk
(same http_headers, so same X-Unique-Upload-Id) to resume the upload.

:param file: The chunk to upload.
:param http_headers: Per-chunk headers, reused on retry.
:param options: Upload options (must not contain an oauth_token key).
:return: The result of the chunk upload API call.
:rtype: dict
"""
# Pin the token so the value handed to the callback is the one actually sent.
token = cloudinary.config().oauth_token
pinned = dict(options, oauth_token=token) if token else options
try:
return upload_large_part(file, http_headers=http_headers, **pinned)
except AuthorizationRequired:
callback = cloudinary.config().oauth_token_refresh_callback
if not callback:
raise
callback(token)
# Retry with the original options so call_api re-reads the refreshed token from config.
return upload_large_part(file, http_headers=http_headers, **options)


def upload_large(file, **options):
"""
Uploads a large file (in chunks) to Cloudinary.
Expand Down Expand Up @@ -308,7 +334,9 @@ def upload_large(file, **options):
"X-Unique-Upload-Id": upload_id
}

upload_result = upload_large_part((file_name, chunk), http_headers=http_headers, **options)
upload_result = _upload_large_part_with_auth_retry(
(file_name, chunk), http_headers, options
)

options["public_id"] = upload_result.get("public_id")

Expand Down
186 changes: 185 additions & 1 deletion test/test_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from test.helper_test import uploader_response_mock, SUFFIX, TEST_IMAGE, get_params, get_headers, TEST_ICON, TEST_DOC, \
REMOTE_TEST_IMAGE, UTC, populate_large_file, TEST_UNICODE_IMAGE, get_uri, get_method, get_param, \
cleanup_test_resources_by_tag, cleanup_test_transformation, cleanup_test_resources, EVAL_STR, ON_SUCCESS_STR, \
URLLIB3_REQUEST, patch, retry_assertion, CldTestCase
URLLIB3_REQUEST, patch, retry_assertion, CldTestCase, http_response_mock
from test.test_utils import TEST_ID, TEST_FOLDER

MOCK_RESPONSE = uploader_response_mock()
Expand Down Expand Up @@ -837,6 +837,190 @@ def test_upload_large_file_io(self):
self.assertEqual(resource["width"], LARGE_FILE_WIDTH)
self.assertEqual(resource["height"], LARGE_FILE_HEIGHT)

_OAUTH_CHUNK_SIZE = 4096
_OAUTH_FILE_SIZE = _OAUTH_CHUNK_SIZE * 3

@staticmethod
def _oauth_part_response(public_id="test_public_id"):
return {"public_id": public_id, "done": True}

def _run_upload_large_with_oauth(self, side_effect, chunk_size=None):
"""Runs upload_large over a 3-chunk in-memory file with upload_large_part mocked"""
chunk_size = chunk_size or self._OAUTH_CHUNK_SIZE
with io.BytesIO() as temp_file:
populate_large_file(temp_file, self._OAUTH_FILE_SIZE)
with patch("cloudinary.uploader.upload_large_part") as part_mock:
part_mock.side_effect = side_effect
result = uploader.upload_large(temp_file, chunk_size=chunk_size,
tags=[UNIQUE_TAG], resource_type="image")
return result, part_mock

def test_upload_large_oauth_resume_on_mid_stream_401(self):
"""Should refresh once and resume the same upload when a chunk gets a 401"""
cloudinary.config(oauth_token="expired-token")

rejected_tokens = []

def refresh(rejected):
rejected_tokens.append(rejected)
cloudinary.config(oauth_token="fresh-token")

cloudinary.config().oauth_token_refresh_callback = refresh

calls = {"n": 0}

def side_effect(file, http_headers=None, **options):
calls["n"] += 1
if calls["n"] == 2:
raise exceptions.AuthorizationRequired("Server returned unexpected status code - 401")
return self._oauth_part_response()

result, part_mock = self._run_upload_large_with_oauth(side_effect)

self.assertEqual(rejected_tokens, ["expired-token"])
self.assertEqual(part_mock.call_count, 4)

# The retry must reuse the failed chunk's X-Unique-Upload-Id and Content-Range to resume.
upload_ids = [c[1]["http_headers"]["X-Unique-Upload-Id"] for c in part_mock.call_args_list]
self.assertEqual(len(set(upload_ids)), 1)
ranges = [c[1]["http_headers"]["Content-Range"] for c in part_mock.call_args_list]
self.assertEqual(ranges[1], ranges[2])

self.assertEqual(result, self._oauth_part_response())

def test_upload_large_oauth_resume_on_first_chunk_401(self):
"""Should refresh and resume when the first chunk (no public_id yet) gets a 401"""
cloudinary.config(oauth_token="expired-token")

refresh_calls = {"n": 0}

def refresh(rejected):
refresh_calls["n"] += 1
cloudinary.config(oauth_token="fresh-token")

cloudinary.config().oauth_token_refresh_callback = refresh

calls = {"n": 0}

def side_effect(file, http_headers=None, **options):
calls["n"] += 1
if calls["n"] == 1:
self.assertIsNone(options.get("public_id"))
raise exceptions.AuthorizationRequired("Server returned unexpected status code - 401")
return self._oauth_part_response()

result, part_mock = self._run_upload_large_with_oauth(side_effect)

self.assertEqual(refresh_calls["n"], 1)
self.assertEqual(part_mock.call_count, 4)
upload_ids = [c[1]["http_headers"]["X-Unique-Upload-Id"] for c in part_mock.call_args_list]
self.assertEqual(len(set(upload_ids)), 1)
self.assertEqual(result, self._oauth_part_response())

def test_upload_large_oauth_single_retry_then_propagate(self):
"""Should retry once then propagate when the token stays rejected"""
cloudinary.config(oauth_token="expired-token")

refresh_calls = {"n": 0}

def refresh(rejected):
refresh_calls["n"] += 1
cloudinary.config(oauth_token="still-bad-token")

cloudinary.config().oauth_token_refresh_callback = refresh

def side_effect(file, http_headers=None, **options):
raise exceptions.AuthorizationRequired("Server returned unexpected status code - 401")

with self.assertRaises(exceptions.AuthorizationRequired):
self._run_upload_large_with_oauth(side_effect)

self.assertEqual(refresh_calls["n"], 1)

def test_upload_large_oauth_no_hook_propagates(self):
"""Should propagate the first 401 with no retry when no callback is registered"""
cloudinary.config(oauth_token="expired-token")

calls = {"n": 0}

def side_effect(file, http_headers=None, **options):
calls["n"] += 1
raise exceptions.AuthorizationRequired("Server returned unexpected status code - 401")

with self.assertRaises(exceptions.AuthorizationRequired):
self._run_upload_large_with_oauth(side_effect)

self.assertEqual(calls["n"], 1)

def test_upload_large_oauth_non_auth_error_not_retried(self):
"""Should not retry non-auth errors even when a callback is registered"""
cloudinary.config(oauth_token="some-token")

refresh_calls = {"n": 0}

def refresh(rejected):
refresh_calls["n"] += 1

cloudinary.config().oauth_token_refresh_callback = refresh

for error in (exceptions.BadRequest("bad request"),
exceptions.Error("Socket error: some transient failure")):
refresh_calls["n"] = 0
calls = {"n": 0}

def side_effect(file, http_headers=None, _err=error, **options):
calls["n"] += 1
raise _err

with self.assertRaises(type(error)):
self._run_upload_large_with_oauth(side_effect)

self.assertEqual(calls["n"], 1)
self.assertEqual(refresh_calls["n"], 0)

@patch(URLLIB3_REQUEST)
def test_upload_large_oauth_rejected_token_is_the_one_sent(self, request_mock):
"""Should pass the token actually sent (pinned) to the callback when it rotates per read"""
token_counter = {"n": 0}

class RotatingConfig(cloudinary.Config):
@property
def oauth_token(self):
token_counter["n"] += 1
return "token-{}".format(token_counter["n"])

@oauth_token.setter
def oauth_token(self, value):
pass

rotating = RotatingConfig()
rejected_seen = {}

def refresh(rejected):
rejected_seen["token"] = rejected
request_mock.return_value = uploader_response_mock()

rotating.oauth_token_refresh_callback = refresh

responses = {"first": True}

def request_side_effect(*args, **kwargs):
if responses["first"]:
responses["first"] = False
rejected_seen["sent"] = kwargs["headers"].get("authorization", "").replace("Bearer ", "")
return http_response_mock('{"error":{"message":"401"}}', status=401)
return uploader_response_mock()

request_mock.side_effect = request_side_effect

with patch("cloudinary.config", return_value=rotating):
with io.BytesIO() as temp_file:
populate_large_file(temp_file, self._OAUTH_CHUNK_SIZE)
uploader.upload_large(temp_file, chunk_size=self._OAUTH_CHUNK_SIZE,
tags=[UNIQUE_TAG], resource_type="image")

self.assertEqual(rejected_seen["token"], rejected_seen["sent"])

@patch(URLLIB3_REQUEST)
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
def test_upload_preset(self, mocker):
Expand Down
Loading