diff --git a/cloudinary/uploader.py b/cloudinary/uploader.py index 57efe5f..50b86cc 100644 --- a/cloudinary/uploader.py +++ b/cloudinary/uploader.py @@ -11,7 +11,7 @@ from cloudinary import utils from cloudinary.api_client.execute_request import EXCEPTION_CODES from cloudinary.cache.responsive_breakpoints_cache import instance as responsive_breakpoints_cache_instance -from cloudinary.exceptions import Error +from cloudinary.exceptions import Error, AuthorizationRequired from cloudinary.utils import build_eager try: @@ -262,6 +262,32 @@ def upload_resource(file, **options): ) +def _upload_large_part_with_auth_retry(file, http_headers, options): + """ + Uploads a single chunk, recovering once from an expired OAuth token via the + optional oauth_token_refresh_callback config hook. Retries the same chunk + (same http_headers, so same X-Unique-Upload-Id) to resume the upload. + + :param file: The chunk to upload. + :param http_headers: Per-chunk headers, reused on retry. + :param options: Upload options (must not contain an oauth_token key). + :return: The result of the chunk upload API call. + :rtype: dict + """ + # Pin the token so the value handed to the callback is the one actually sent. + token = cloudinary.config().oauth_token + pinned = dict(options, oauth_token=token) if token else options + try: + return upload_large_part(file, http_headers=http_headers, **pinned) + except AuthorizationRequired: + callback = cloudinary.config().oauth_token_refresh_callback + if not callback: + raise + callback(token) + # Retry with the original options so call_api re-reads the refreshed token from config. + return upload_large_part(file, http_headers=http_headers, **options) + + def upload_large(file, **options): """ Uploads a large file (in chunks) to Cloudinary. @@ -308,7 +334,9 @@ def upload_large(file, **options): "X-Unique-Upload-Id": upload_id } - upload_result = upload_large_part((file_name, chunk), http_headers=http_headers, **options) + upload_result = _upload_large_part_with_auth_retry( + (file_name, chunk), http_headers, options + ) options["public_id"] = upload_result.get("public_id") diff --git a/test/test_uploader.py b/test/test_uploader.py index d05d9ee..6a7cc60 100644 --- a/test/test_uploader.py +++ b/test/test_uploader.py @@ -18,7 +18,7 @@ from test.helper_test import uploader_response_mock, SUFFIX, TEST_IMAGE, get_params, get_headers, TEST_ICON, TEST_DOC, \ REMOTE_TEST_IMAGE, UTC, populate_large_file, TEST_UNICODE_IMAGE, get_uri, get_method, get_param, \ cleanup_test_resources_by_tag, cleanup_test_transformation, cleanup_test_resources, EVAL_STR, ON_SUCCESS_STR, \ - URLLIB3_REQUEST, patch, retry_assertion, CldTestCase + URLLIB3_REQUEST, patch, retry_assertion, CldTestCase, http_response_mock from test.test_utils import TEST_ID, TEST_FOLDER MOCK_RESPONSE = uploader_response_mock() @@ -837,6 +837,190 @@ def test_upload_large_file_io(self): self.assertEqual(resource["width"], LARGE_FILE_WIDTH) self.assertEqual(resource["height"], LARGE_FILE_HEIGHT) + _OAUTH_CHUNK_SIZE = 4096 + _OAUTH_FILE_SIZE = _OAUTH_CHUNK_SIZE * 3 + + @staticmethod + def _oauth_part_response(public_id="test_public_id"): + return {"public_id": public_id, "done": True} + + def _run_upload_large_with_oauth(self, side_effect, chunk_size=None): + """Runs upload_large over a 3-chunk in-memory file with upload_large_part mocked""" + chunk_size = chunk_size or self._OAUTH_CHUNK_SIZE + with io.BytesIO() as temp_file: + populate_large_file(temp_file, self._OAUTH_FILE_SIZE) + with patch("cloudinary.uploader.upload_large_part") as part_mock: + part_mock.side_effect = side_effect + result = uploader.upload_large(temp_file, chunk_size=chunk_size, + tags=[UNIQUE_TAG], resource_type="image") + return result, part_mock + + def test_upload_large_oauth_resume_on_mid_stream_401(self): + """Should refresh once and resume the same upload when a chunk gets a 401""" + cloudinary.config(oauth_token="expired-token") + + rejected_tokens = [] + + def refresh(rejected): + rejected_tokens.append(rejected) + cloudinary.config(oauth_token="fresh-token") + + cloudinary.config().oauth_token_refresh_callback = refresh + + calls = {"n": 0} + + def side_effect(file, http_headers=None, **options): + calls["n"] += 1 + if calls["n"] == 2: + raise exceptions.AuthorizationRequired("Server returned unexpected status code - 401") + return self._oauth_part_response() + + result, part_mock = self._run_upload_large_with_oauth(side_effect) + + self.assertEqual(rejected_tokens, ["expired-token"]) + self.assertEqual(part_mock.call_count, 4) + + # The retry must reuse the failed chunk's X-Unique-Upload-Id and Content-Range to resume. + upload_ids = [c[1]["http_headers"]["X-Unique-Upload-Id"] for c in part_mock.call_args_list] + self.assertEqual(len(set(upload_ids)), 1) + ranges = [c[1]["http_headers"]["Content-Range"] for c in part_mock.call_args_list] + self.assertEqual(ranges[1], ranges[2]) + + self.assertEqual(result, self._oauth_part_response()) + + def test_upload_large_oauth_resume_on_first_chunk_401(self): + """Should refresh and resume when the first chunk (no public_id yet) gets a 401""" + cloudinary.config(oauth_token="expired-token") + + refresh_calls = {"n": 0} + + def refresh(rejected): + refresh_calls["n"] += 1 + cloudinary.config(oauth_token="fresh-token") + + cloudinary.config().oauth_token_refresh_callback = refresh + + calls = {"n": 0} + + def side_effect(file, http_headers=None, **options): + calls["n"] += 1 + if calls["n"] == 1: + self.assertIsNone(options.get("public_id")) + raise exceptions.AuthorizationRequired("Server returned unexpected status code - 401") + return self._oauth_part_response() + + result, part_mock = self._run_upload_large_with_oauth(side_effect) + + self.assertEqual(refresh_calls["n"], 1) + self.assertEqual(part_mock.call_count, 4) + upload_ids = [c[1]["http_headers"]["X-Unique-Upload-Id"] for c in part_mock.call_args_list] + self.assertEqual(len(set(upload_ids)), 1) + self.assertEqual(result, self._oauth_part_response()) + + def test_upload_large_oauth_single_retry_then_propagate(self): + """Should retry once then propagate when the token stays rejected""" + cloudinary.config(oauth_token="expired-token") + + refresh_calls = {"n": 0} + + def refresh(rejected): + refresh_calls["n"] += 1 + cloudinary.config(oauth_token="still-bad-token") + + cloudinary.config().oauth_token_refresh_callback = refresh + + def side_effect(file, http_headers=None, **options): + raise exceptions.AuthorizationRequired("Server returned unexpected status code - 401") + + with self.assertRaises(exceptions.AuthorizationRequired): + self._run_upload_large_with_oauth(side_effect) + + self.assertEqual(refresh_calls["n"], 1) + + def test_upload_large_oauth_no_hook_propagates(self): + """Should propagate the first 401 with no retry when no callback is registered""" + cloudinary.config(oauth_token="expired-token") + + calls = {"n": 0} + + def side_effect(file, http_headers=None, **options): + calls["n"] += 1 + raise exceptions.AuthorizationRequired("Server returned unexpected status code - 401") + + with self.assertRaises(exceptions.AuthorizationRequired): + self._run_upload_large_with_oauth(side_effect) + + self.assertEqual(calls["n"], 1) + + def test_upload_large_oauth_non_auth_error_not_retried(self): + """Should not retry non-auth errors even when a callback is registered""" + cloudinary.config(oauth_token="some-token") + + refresh_calls = {"n": 0} + + def refresh(rejected): + refresh_calls["n"] += 1 + + cloudinary.config().oauth_token_refresh_callback = refresh + + for error in (exceptions.BadRequest("bad request"), + exceptions.Error("Socket error: some transient failure")): + refresh_calls["n"] = 0 + calls = {"n": 0} + + def side_effect(file, http_headers=None, _err=error, **options): + calls["n"] += 1 + raise _err + + with self.assertRaises(type(error)): + self._run_upload_large_with_oauth(side_effect) + + self.assertEqual(calls["n"], 1) + self.assertEqual(refresh_calls["n"], 0) + + @patch(URLLIB3_REQUEST) + def test_upload_large_oauth_rejected_token_is_the_one_sent(self, request_mock): + """Should pass the token actually sent (pinned) to the callback when it rotates per read""" + token_counter = {"n": 0} + + class RotatingConfig(cloudinary.Config): + @property + def oauth_token(self): + token_counter["n"] += 1 + return "token-{}".format(token_counter["n"]) + + @oauth_token.setter + def oauth_token(self, value): + pass + + rotating = RotatingConfig() + rejected_seen = {} + + def refresh(rejected): + rejected_seen["token"] = rejected + request_mock.return_value = uploader_response_mock() + + rotating.oauth_token_refresh_callback = refresh + + responses = {"first": True} + + def request_side_effect(*args, **kwargs): + if responses["first"]: + responses["first"] = False + rejected_seen["sent"] = kwargs["headers"].get("authorization", "").replace("Bearer ", "") + return http_response_mock('{"error":{"message":"401"}}', status=401) + return uploader_response_mock() + + request_mock.side_effect = request_side_effect + + with patch("cloudinary.config", return_value=rotating): + with io.BytesIO() as temp_file: + populate_large_file(temp_file, self._OAUTH_CHUNK_SIZE) + uploader.upload_large(temp_file, chunk_size=self._OAUTH_CHUNK_SIZE, + tags=[UNIQUE_TAG], resource_type="image") + + self.assertEqual(rejected_seen["token"], rejected_seen["sent"]) + @patch(URLLIB3_REQUEST) @unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret") def test_upload_preset(self, mocker):