Skip to content

Commit f072f88

Browse files
committed
Refactor Connection.add_notice_callback API.
1. Rename to `Connection.add_log_listener()` 2. Fix `Connection.reset()` to reset log listeners 3. Add a new `PostgresLogMessage` class for log messages 4. Instances of `PostgresLogMessage` are immutable 5. Regenerate `exceptions/__init__`; `PostgresWarning` is now a subclass of PostgresLogMessage.
1 parent 0453243 commit f072f88

File tree

6 files changed

+212
-118
lines changed

6 files changed

+212
-118
lines changed

asyncpg/connection.py

+48-38
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class Connection(metaclass=ConnectionMeta):
4242
'_stmt_cache', '_stmts_to_close', '_listeners',
4343
'_server_version', '_server_caps', '_intro_query',
4444
'_reset_query', '_proxy', '_stmt_exclusive_section',
45-
'_config', '_params', '_addr', '_notice_callbacks')
45+
'_config', '_params', '_addr', '_log_listeners')
4646

4747
def __init__(self, protocol, transport, loop,
4848
addr: (str, int) or str,
@@ -70,7 +70,7 @@ def __init__(self, protocol, transport, loop,
7070
self._stmts_to_close = set()
7171

7272
self._listeners = {}
73-
self._notice_callbacks = set()
73+
self._log_listeners = set()
7474

7575
settings = self._protocol.get_settings()
7676
ver_string = settings.server_version
@@ -128,25 +128,30 @@ async def remove_listener(self, channel, callback):
128128
del self._listeners[channel]
129129
await self.fetch('UNLISTEN {}'.format(channel))
130130

131-
def add_notice_callback(self, callback):
132-
"""Add a callback for Postgres notices (NOTICE, DEBUG, LOG etc.).
131+
def add_log_listener(self, callback):
132+
"""Add a listener for Postgres log messages.
133133
134134
It will be called when asyncronous NoticeResponse is received
135-
from the connection. Possible message types are: WARNING, NOTICE, DEBUG,
136-
INFO, or LOG.
135+
from the connection. Possible message types are: WARNING, NOTICE,
136+
DEBUG, INFO, or LOG.
137137
138138
:param callable callback:
139139
A callable receiving the following arguments:
140140
**connection**: a Connection the callback is registered with;
141-
**message**: the `exceptions.PostgresNotice` message.
141+
**message**: the `exceptions.PostgresLogMessage` message.
142+
143+
.. versionadded:: 0.12.0
142144
"""
143145
if self.is_closed():
144146
raise exceptions.InterfaceError('connection is closed')
145-
self._notice_callbacks.add(callback)
147+
self._log_listeners.add(callback)
148+
149+
def remove_log_listener(self, callback):
150+
"""Remove a listening callback for log messages.
146151
147-
def remove_notice_callback(self, callback):
148-
"""Remove a callback for notices."""
149-
self._notice_callbacks.discard(callback)
152+
.. versionadded:: 0.12.0
153+
"""
154+
self._log_listeners.discard(callback)
150155

151156
def get_server_pid(self):
152157
"""Return the PID of the Postgres server the connection is bound to."""
@@ -975,22 +980,23 @@ async def close(self):
975980
if self.is_closed():
976981
return
977982
self._mark_stmts_as_closed()
978-
self._listeners = {}
983+
self._listeners.clear()
984+
self._log_listeners.clear()
979985
self._aborted = True
980986
await self._protocol.close()
981-
self._notice_callbacks = set()
982987

983988
def terminate(self):
984989
"""Terminate the connection without waiting for pending data."""
985990
self._mark_stmts_as_closed()
986-
self._listeners = {}
991+
self._listeners.clear()
992+
self._log_listeners.clear()
987993
self._aborted = True
988994
self._protocol.abort()
989-
self._notice_callbacks = set()
990995

991996
async def reset(self):
992997
self._check_open()
993998
self._listeners.clear()
999+
self._log_listeners.clear()
9941000
reset_query = self._get_reset_query()
9951001
if reset_query:
9961002
await self.execute(reset_query)
@@ -1068,44 +1074,37 @@ async def cancel():
10681074

10691075
self._loop.create_task(cancel())
10701076

1071-
def _notice(self, message):
1072-
if self._proxy is None:
1073-
con_ref = self
1074-
else:
1075-
# See the comment in the `_notify` below.
1076-
con_ref = self._proxy
1077+
def _process_log_message(self, fields, last_query):
1078+
if not self._log_listeners:
1079+
return
10771080

1078-
for cb in self._notice_callbacks:
1079-
self._loop.call_soon(self._call_notice_cb, cb, con_ref, message)
1081+
message = exceptions.PostgresLogMessage.new(fields, query=last_query)
10801082

1081-
def _call_notice_cb(self, cb, con_ref, message):
1083+
con_ref = self._unwrap()
1084+
for cb in self._log_listeners:
1085+
self._loop.call_soon(
1086+
self._call_log_listener, cb, con_ref, message)
1087+
1088+
def _call_log_listener(self, cb, con_ref, message):
10821089
try:
10831090
cb(con_ref, message)
10841091
except Exception as ex:
10851092
self._loop.call_exception_handler({
1086-
'message': 'Unhandled exception in asyncpg notice message '
1087-
'callback {!r}'.format(cb),
1093+
'message': 'Unhandled exception in asyncpg log message '
1094+
'listener callback {!r}'.format(cb),
10881095
'exception': ex
10891096
})
10901097

1091-
def _notify(self, pid, channel, payload):
1098+
def _process_notification(self, pid, channel, payload):
10921099
if channel not in self._listeners:
10931100
return
10941101

1095-
if self._proxy is None:
1096-
con_ref = self
1097-
else:
1098-
# `_proxy` is not None when the connection is a member
1099-
# of a connection pool. Which means that the user is working
1100-
# with a `PoolConnectionProxy` instance, and expects to see it
1101-
# (and not the actual Connection) in their event callbacks.
1102-
con_ref = self._proxy
1103-
1102+
con_ref = self._unwrap()
11041103
for cb in self._listeners[channel]:
11051104
self._loop.call_soon(
1106-
self._call_notify_cb, cb, con_ref, pid, channel, payload)
1105+
self._call_listener, cb, con_ref, pid, channel, payload)
11071106

1108-
def _call_notify_cb(self, cb, con_ref, pid, channel, payload):
1107+
def _call_listener(self, cb, con_ref, pid, channel, payload):
11091108
try:
11101109
cb(con_ref, pid, channel, payload)
11111110
except Exception as ex:
@@ -1115,6 +1114,17 @@ def _call_notify_cb(self, cb, con_ref, pid, channel, payload):
11151114
'exception': ex
11161115
})
11171116

1117+
def _unwrap(self):
1118+
if self._proxy is None:
1119+
con_ref = self
1120+
else:
1121+
# `_proxy` is not None when the connection is a member
1122+
# of a connection pool. Which means that the user is working
1123+
# with a `PoolConnectionProxy` instance, and expects to see it
1124+
# (and not the actual Connection) in their event callbacks.
1125+
con_ref = self._proxy
1126+
return con_ref
1127+
11181128
def _get_reset_query(self):
11191129
if self._reset_query is not None:
11201130
return self._reset_query

asyncpg/exceptions/__init__.py

+21-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from . import _base
66

77

8-
class PostgresWarning(Warning, _base.PostgresMessage):
8+
class PostgresWarning(_base.PostgresLogMessage, Warning):
99
sqlstate = '01000'
1010

1111

@@ -261,6 +261,10 @@ class NumericValueOutOfRangeError(DataError):
261261
sqlstate = '22003'
262262

263263

264+
class SequenceGeneratorLimitExceededError(DataError):
265+
sqlstate = '2200H'
266+
267+
264268
class StringDataLengthMismatchError(DataError):
265269
sqlstate = '22026'
266270

@@ -608,6 +612,10 @@ class WrongObjectTypeError(SyntaxOrAccessError):
608612
sqlstate = '42809'
609613

610614

615+
class GeneratedAlwaysError(SyntaxOrAccessError):
616+
sqlstate = '428C9'
617+
618+
611619
class UndefinedColumnError(SyntaxOrAccessError):
612620
sqlstate = '42703'
613621

@@ -772,6 +780,10 @@ class LockNotAvailableError(ObjectNotInPrerequisiteStateError):
772780
sqlstate = '55P03'
773781

774782

783+
class UnsafeNewEnumValueUsageError(ObjectNotInPrerequisiteStateError):
784+
sqlstate = '55P04'
785+
786+
775787
class OperatorInterventionError(_base.PostgresError):
776788
sqlstate = '57000'
777789

@@ -1007,7 +1019,8 @@ class IndexCorruptedError(InternalServerError):
10071019
'FDWUnableToCreateExecutionError', 'FDWUnableToCreateReplyError',
10081020
'FDWUnableToEstablishConnectionError', 'FeatureNotSupportedError',
10091021
'ForeignKeyViolationError', 'FunctionExecutedNoReturnStatementError',
1010-
'GroupingError', 'HeldCursorRequiresSameIsolationLevelError',
1022+
'GeneratedAlwaysError', 'GroupingError',
1023+
'HeldCursorRequiresSameIsolationLevelError',
10111024
'IdleInTransactionSessionTimeoutError', 'ImplicitZeroBitPadding',
10121025
'InFailedSQLTransactionError',
10131026
'InappropriateAccessModeForBranchTransactionError',
@@ -1073,7 +1086,8 @@ class IndexCorruptedError(InternalServerError):
10731086
'ReadingSQLDataNotPermittedError', 'ReservedNameError',
10741087
'RestrictViolationError', 'SQLRoutineError',
10751088
'SQLStatementNotYetCompleteError', 'SavepointError',
1076-
'SchemaAndDataStatementMixingNotSupportedError', 'SerializationError',
1089+
'SchemaAndDataStatementMixingNotSupportedError',
1090+
'SequenceGeneratorLimitExceededError', 'SerializationError',
10771091
'SnapshotTooOldError', 'SrfProtocolViolatedError',
10781092
'StackedDiagnosticsAccessedWithoutActiveHandlerError',
10791093
'StatementCompletionUnknownError', 'StatementTooComplexError',
@@ -1088,8 +1102,8 @@ class IndexCorruptedError(InternalServerError):
10881102
'UndefinedColumnError', 'UndefinedFileError',
10891103
'UndefinedFunctionError', 'UndefinedObjectError',
10901104
'UndefinedParameterError', 'UndefinedTableError',
1091-
'UniqueViolationError', 'UnterminatedCStringError',
1092-
'UntranslatableCharacterError', 'WindowingError',
1093-
'WithCheckOptionViolationError', 'WrongObjectTypeError',
1094-
'ZeroLengthCharacterStringError'
1105+
'UniqueViolationError', 'UnsafeNewEnumValueUsageError',
1106+
'UnterminatedCStringError', 'UntranslatableCharacterError',
1107+
'WindowingError', 'WithCheckOptionViolationError',
1108+
'WrongObjectTypeError', 'ZeroLengthCharacterStringError'
10951109
)

asyncpg/exceptions/_base.py

+62-28
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
66

77

8+
import asyncpg
89
import sys
910

1011

1112
__all__ = ('PostgresError', 'FatalPostgresError', 'UnknownPostgresError',
12-
'InterfaceError', 'PostgresNotice')
13+
'InterfaceError', 'PostgresLogMessage')
1314

1415

1516
def _is_asyncpg_class(cls):
@@ -18,6 +19,7 @@ def _is_asyncpg_class(cls):
1819

1920

2021
class PostgresMessageMeta(type):
22+
2123
_message_map = {}
2224
_field_map = {
2325
'S': 'severity',
@@ -69,35 +71,31 @@ def get_message_class_for_sqlstate(mcls, code):
6971

7072

7173
class PostgresMessage(metaclass=PostgresMessageMeta):
72-
def __str__(self):
73-
msg = self.args[0]
74-
if self.detail:
75-
msg += '\nDETAIL: {}'.format(self.detail)
76-
if self.hint:
77-
msg += '\nHINT: {}'.format(self.hint)
7874

79-
return msg
75+
@classmethod
76+
def _get_error_class(cls, fields):
77+
sqlstate = fields.get('C')
78+
return type(cls).get_message_class_for_sqlstate(sqlstate)
8079

8180
@classmethod
82-
def _get_error_template(cls, fields, query):
83-
errcode = fields.get('C')
84-
mcls = cls.__class__
85-
exccls = mcls.get_message_class_for_sqlstate(errcode)
81+
def _get_error_dict(cls, fields, query):
8682
dct = {
8783
'query': query
8884
}
8985

86+
field_map = type(cls)._field_map
9087
for k, v in fields.items():
91-
field = mcls._field_map.get(k)
88+
field = field_map.get(k)
9289
if field:
9390
dct[field] = v
9491

95-
return exccls, dct
92+
return dct
9693

9794
@classmethod
98-
def new(cls, fields, query=None):
99-
exccls, dct = cls._get_error_template(fields, query)
95+
def _make_constructor(cls, fields, query=None):
96+
dct = cls._get_error_dict(fields, query)
10097

98+
exccls = cls._get_error_class(fields)
10199
message = dct.get('message', '')
102100

103101
# PostgreSQL will raise an exception when it detects
@@ -122,24 +120,36 @@ def new(cls, fields, query=None):
122120
message = ('cached statement plan is invalid due to a database '
123121
'schema or configuration change')
124122

125-
e = exccls(message)
126-
e.__dict__.update(dct)
127-
128-
return e
123+
return exccls, message, dct
129124

130125
def as_dict(self):
131-
message = {}
126+
dct = {}
132127
for f in type(self)._field_map.values():
133128
val = getattr(self, f)
134129
if val is not None:
135-
message[f] = val
136-
137-
return message
130+
dct[f] = val
131+
return dct
138132

139133

140134
class PostgresError(PostgresMessage, Exception):
141135
"""Base class for all Postgres errors."""
142136

137+
def __str__(self):
138+
msg = self.args[0]
139+
if self.detail:
140+
msg += '\nDETAIL: {}'.format(self.detail)
141+
if self.hint:
142+
msg += '\nHINT: {}'.format(self.hint)
143+
144+
return msg
145+
146+
@classmethod
147+
def new(cls, fields, query=None):
148+
exccls, message, dct = cls._make_constructor(fields, query)
149+
ex = exccls(message)
150+
ex.__dict__.update(dct)
151+
return ex
152+
143153

144154
class FatalPostgresError(PostgresError):
145155
"""A fatal error that should result in server disconnection."""
@@ -153,8 +163,32 @@ class InterfaceError(Exception):
153163
"""An error caused by improper use of asyncpg API."""
154164

155165

156-
class PostgresNotice(PostgresMessage):
157-
sqlstate = '00000'
166+
class PostgresLogMessage(PostgresMessage):
167+
"""A base class for non-error server messages."""
168+
169+
def __str__(self):
170+
return '{}: {}'.format(type(self).__name__, self.message)
158171

159-
def __init__(self, message):
160-
self.args = [message]
172+
def __setattr__(self, name, val):
173+
raise TypeError('instances of {} are immutable'.format(
174+
type(self).__name__))
175+
176+
@classmethod
177+
def new(cls, fields, query=None):
178+
exccls, message_text, dct = cls._make_constructor(fields, query)
179+
180+
if exccls is UnknownPostgresError:
181+
exccls = PostgresLogMessage
182+
183+
if exccls is PostgresLogMessage:
184+
severity = dct.get('severity_en') or dct.get('severity')
185+
if severity and severity.upper() == 'WARNING':
186+
exccls = asyncpg.PostgresWarning
187+
188+
if issubclass(exccls, (BaseException, Warning)):
189+
msg = exccls(message_text)
190+
else:
191+
msg = exccls()
192+
193+
msg.__dict__.update(dct)
194+
return msg

0 commit comments

Comments
 (0)