Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
except ImportError:
bigquery_magics = None

if sys.version_info < (3, 10):
if sys.version_info < (3, 10): # pragma: NO COVER
warnings.warn(
"The python-bigquery library no longer supports Python <= 3.9. "
f"Your Python version is {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}. We "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def close(self):

if self._owns_bqstorage_client:
# There is no close() on the BQ Storage client itself.
self._bqstorage_client._transport.grpc_channel.close()
self._bqstorage_client._transport.close()

for cursor_ in self._cursors_created:
if not cursor_._closed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,4 +773,4 @@ def _close_transports(client, bqstorage_client):
"""
client.close()
if bqstorage_client is not None:
bqstorage_client._transport.grpc_channel.close()
bqstorage_client._transport.close()
Original file line number Diff line number Diff line change
Expand Up @@ -2353,7 +2353,9 @@ def to_arrow(
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client._transport.grpc_channel.close() # type: ignore
# mypy: bqstorage_client is guaranteed to be not None when owns_bqstorage_client is True,
# but mypy cannot infer this correlation. We ignore the union-attr error here.
bqstorage_client._transport.close() # type: ignore[union-attr]

if record_batches and bqstorage_client is not None:
return pyarrow.Table.from_batches(record_batches)
Expand Down
24 changes: 23 additions & 1 deletion packages/google-cloud-bigquery/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

from __future__ import absolute_import

import contextlib
from functools import wraps
import os
import pathlib
from typing import Generator
import re
import shutil
import time
Expand Down Expand Up @@ -80,6 +82,25 @@ def wrapper(*args, **kwargs):
]


@contextlib.contextmanager
def log_package_context(session: nox.Session) -> Generator[None, None, None]:
"""Logs a highly visible package context banner right before a session exits.

Ensures metadata is printed adjacent to Nox's final status log,
even if the session fails or raises an exception.
"""
# Dynamically extract current folder name (e.g., 'google-cloud-bigquery')
package_name = CURRENT_DIRECTORY.name

try:
# Hands control back to the session code block
yield
finally:
# This executes AFTER test output finishes, immediately above Nox's summary line
banner_text = f"Finished session for {package_name.lower()}"
session.log(banner_text)


def default(session, install_extras=True):
"""Default unit test session.

Expand Down Expand Up @@ -193,7 +214,8 @@ def mypy(session):
"types-setuptools",
)
session.run("python", "-m", "pip", "freeze")
session.run("mypy", "-p", "google", "--show-traceback")
with log_package_context(session):
session.run("mypy", "-p", "google", "--show-traceback")


@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS)
Expand Down
107 changes: 75 additions & 32 deletions packages/google-cloud-bigquery/tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

import base64
import contextlib
import google.auth.transport.requests
import copy
import csv
import datetime
Expand Down Expand Up @@ -155,6 +157,25 @@ def _load_json_schema(filename="schema.json"):
return _parse_schema_resource(json.load(schema_file))


@contextlib.contextmanager
def patch_tracked_requests():
original_init = google.auth.transport.requests.Request.__init__
tracked_requests = []

def patched_init(self, session=None):
original_init(self, session=session)
tracked_requests.append(self)

google.auth.transport.requests.Request.__init__ = patched_init
try:
yield tracked_requests
finally:
google.auth.transport.requests.Request.__init__ = original_init
for req in tracked_requests:
if hasattr(req, "session") and req.session is not None:
req.session.close()


class Config(object):
"""Run-time configuration to be modified at set-up.

Expand Down Expand Up @@ -234,23 +255,34 @@ def _create_bucket(self, bucket_name, location=None):

def test_close_releases_open_sockets(self):
current_process = psutil.Process()
conn_count_start = len(current_process.net_connections())
conn_start = current_process.net_connections()
conn_count_start = len(conn_start)

with patch_tracked_requests() as tracked_requests:
client = Config.CLIENT
client.query(
"""
SELECT
source_year AS year, COUNT(is_male) AS birth_count
FROM `bigquery-public-data.samples.natality`
GROUP BY year
ORDER BY year DESC
LIMIT 15
"""
)

client = Config.CLIENT
client.query(
"""
SELECT
source_year AS year, COUNT(is_male) AS birth_count
FROM `bigquery-public-data.samples.natality`
GROUP BY year
ORDER BY year DESC
LIMIT 15
"""
)
client.close()

client.close()
import gc

conn_count_end = len(current_process.net_connections())
gc.collect()
conn_end = current_process.net_connections()
conn_count_end = len(conn_end)
if conn_count_end > conn_count_start:
print("DEBUG: test_close_releases_open_sockets failed!")
print(f"DEBUG: Start connections ({conn_count_start}): {conn_start}")
print(f"DEBUG: End connections ({conn_count_end}): {conn_end}")
print(f"DEBUG: Tracked requests: {tracked_requests}")
self.assertLessEqual(conn_count_end, conn_count_start)

def test_create_dataset(self):
Expand Down Expand Up @@ -2174,25 +2206,36 @@ def test_dbapi_dry_run_query(self):
def test_dbapi_connection_does_not_leak_sockets(self):
pytest.importorskip("google.cloud.bigquery_storage")
current_process = psutil.Process()
conn_count_start = len(current_process.net_connections())

# Provide no explicit clients, so that the connection will create and own them.
connection = dbapi.connect()
cursor = connection.cursor()

cursor.execute(
conn_start = current_process.net_connections()
conn_count_start = len(conn_start)

with patch_tracked_requests() as tracked_requests:
# Provide no explicit clients, so that the connection will create and own them.
connection = dbapi.connect()
cursor = connection.cursor()

cursor.execute(
"""
SELECT id, `by`, timestamp
FROM `bigquery-public-data.hacker_news.full`
ORDER BY `id` ASC
LIMIT 100000
"""
SELECT id, `by`, timestamp
FROM `bigquery-public-data.hacker_news.full`
ORDER BY `id` ASC
LIMIT 100000
"""
)
rows = cursor.fetchall()
self.assertEqual(len(rows), 100000)

connection.close()
conn_count_end = len(current_process.net_connections())
)
rows = cursor.fetchall()
self.assertEqual(len(rows), 100000)

connection.close()
import gc

gc.collect()
conn_end = current_process.net_connections()
conn_count_end = len(conn_end)
if conn_count_end > conn_count_start:
print("DEBUG: test_dbapi_connection_does_not_leak_sockets failed!")
print(f"DEBUG: Start connections ({conn_count_start}): {conn_start}")
print(f"DEBUG: End connections ({conn_count_end}): {conn_end}")
print(f"DEBUG: Tracked requests: {tracked_requests}")
self.assertLessEqual(conn_count_end, conn_count_start)

def _load_table_for_dml(self, rows, dataset_id, table_id):
Expand Down
72 changes: 51 additions & 21 deletions packages/google-cloud-bigquery/tests/system/test_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@

"""System tests for Jupyter/IPython connector."""

import contextlib
import re

import google.auth.transport.requests

import pytest
import psutil

Expand Down Expand Up @@ -45,30 +48,52 @@ def ipython_interactive(ipython):
yield ipython


@contextlib.contextmanager
def patch_tracked_requests():
original_init = google.auth.transport.requests.Request.__init__
tracked_requests = []

def patched_init(self, session=None):
original_init(self, session=session)
tracked_requests.append(self)

google.auth.transport.requests.Request.__init__ = patched_init
try:
yield tracked_requests
finally:
google.auth.transport.requests.Request.__init__ = original_init
for req in tracked_requests:
if hasattr(req, "session") and req.session is not None:
req.session.close()


def test_bigquery_magic(ipython_interactive):
ip = IPython.get_ipython()
current_process = psutil.Process()
conn_count_start = len(current_process.net_connections())

# Deprecated, but should still work in google-cloud-bigquery 3.x.
with pytest.warns(FutureWarning, match="bigquery_magics"):
ip.extension_manager.load_extension("google.cloud.bigquery")

sql = """
SELECT
CONCAT(
'https://stackoverflow.com/questions/',
CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10
"""
with io.capture_output() as captured:
result = ip.run_cell_magic("bigquery", "--use_rest_api", sql)

conn_count_end = len(current_process.net_connections())
conn_start = current_process.net_connections()
conn_count_start = len(conn_start)

with patch_tracked_requests() as tracked_requests:
# Deprecated, but should still work in google-cloud-bigquery 3.x.
with pytest.warns(FutureWarning, match="bigquery_magics"):
ip.extension_manager.load_extension("google.cloud.bigquery")

sql = """
SELECT
CONCAT(
'https://stackoverflow.com/questions/',
CAST(id as STRING)) as url,
view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE tags like '%google-bigquery%'
ORDER BY view_count DESC
LIMIT 10
"""
with io.capture_output() as captured:
result = ip.run_cell_magic("bigquery", "--use_rest_api", sql)

conn_end = current_process.net_connections()
conn_count_end = len(conn_end)

lines = re.split("\n|\r", captured.stdout)
# Removes blanks & terminal code (result of display clearing)
Expand All @@ -82,4 +107,9 @@ def test_bigquery_magic(ipython_interactive):
# NOTE: For some reason, the number of open sockets is sometimes one *less*
# than expected when running system tests on Kokoro, thus using the <= assertion.
# That's still fine, however, since the sockets are apparently not leaked.
if conn_count_end > conn_count_start:
print("DEBUG: test_bigquery_magic failed!")
print(f"DEBUG: Start connections ({conn_count_start}): {conn_start}")
print(f"DEBUG: End connections ({conn_count_end}): {conn_end}")
print(f"DEBUG: Tracked requests: {tracked_requests}")
assert conn_count_end <= conn_count_start # system resources are released
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _mock_bqstorage_client(self):
from google.cloud import bigquery_storage

mock_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
mock_client._transport = mock.Mock(spec=["channel"])
mock_client._transport = mock.Mock(spec=["channel", "close"])
mock_client._transport.grpc_channel = mock.Mock(spec=["close"])
return mock_client

Expand Down Expand Up @@ -176,7 +176,7 @@ def test_close_closes_all_created_bigquery_clients(self):
connection.close()

self.assertTrue(client.close.called)
self.assertTrue(bqstorage_client._transport.grpc_channel.close.called)
self.assertTrue(bqstorage_client._transport.close.called)

def test_close_does_not_close_bigquery_clients_passed_to_it(self):
pytest.importorskip("google.cloud.bigquery_storage")
Expand All @@ -187,7 +187,7 @@ def test_close_does_not_close_bigquery_clients_passed_to_it(self):
connection.close()

self.assertFalse(client.close.called)
self.assertFalse(bqstorage_client._transport.grpc_channel.close.called)
self.assertFalse(bqstorage_client._transport.close.called)

def test_close_closes_all_created_cursors(self):
connection = self._make_one(client=self._mock_client())
Expand Down
7 changes: 2 additions & 5 deletions packages/google-cloud-bigquery/tests/unit/test_magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

@pytest.fixture()
def use_local_magics_context(monkeypatch):
if magics is not None:
if magics is not None: # pragma: NO COVER
local_context = magics.Context()
local_context._project = "unit-test-project"
mock_credentials = mock.create_autospec(
Expand Down Expand Up @@ -2195,13 +2195,10 @@ def test_bigquery_magic_create_dataset_fails(monkeypatch):


@pytest.mark.usefixtures("ipython_interactive")
def test_bigquery_magic_with_location(monkeypatch):
def test_bigquery_magic_with_location(monkeypatch, use_local_magics_context):
ip = IPython.get_ipython()
monkeypatch.setattr(bigquery, "bigquery_magics", None)
bigquery.load_ipython_extension(ip)
magics.context.credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)

run_query_patch = mock.patch(
"google.cloud.bigquery.magics.magics._run_query", autospec=True
Expand Down
Loading
Loading