Skip to content

Commit e11fd56

Browse files
committed
Implement support for multirange types
1 parent a8fc21e commit e11fd56

File tree

8 files changed

+207
-29
lines changed

8 files changed

+207
-29
lines changed

asyncpg/introspection.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@
3939
END) AS basetype,
4040
t.typelem AS elemtype,
4141
elem_t.typdelim AS elemdelim,
42-
range_t.rngsubtype AS range_subtype,
42+
COALESCE(
43+
range_t.rngsubtype,
44+
multirange_t.rngsubtype) AS range_subtype,
4345
(CASE WHEN t.typtype = 'c' THEN
4446
(SELECT
4547
array_agg(ia.atttypid ORDER BY ia.attnum)
@@ -78,6 +80,9 @@
7880
LEFT JOIN pg_range range_t ON (
7981
t.oid = range_t.rngtypid
8082
)
83+
LEFT JOIN pg_range multirange_t ON (
84+
t.oid = multirange_t.rngmultitypid
85+
)
8186
)
8287
'''
8388

asyncpg/protocol/codecs/array.pyx

-12
Original file line numberDiff line numberDiff line change
@@ -858,19 +858,7 @@ cdef arraytext_decode(ConnectionSettings settings, FRBuffer *buf):
858858
return array_decode(settings, buf, <decode_func_ex>&text_decode_ex, NULL)
859859

860860

861-
cdef anyarray_decode(ConnectionSettings settings, FRBuffer *buf):
862-
# Instances of anyarray (or any other polymorphic pseudotype) are
863-
# never supposed to be returned from actual queries.
864-
raise exceptions.ProtocolError(
865-
'unexpected instance of \'anyarray\' type')
866-
867-
868861
cdef init_array_codecs():
869-
register_core_codec(ANYARRAYOID,
870-
NULL,
871-
<decode_func>&anyarray_decode,
872-
PG_FORMAT_BINARY)
873-
874862
# oid[] and text[] are registered as core codecs
875863
# to make type introspection query work
876864
#

asyncpg/protocol/codecs/base.pxd

+18-6
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ ctypedef object (*codec_decode_func)(Codec codec,
2323

2424

2525
cdef enum CodecType:
26-
CODEC_UNDEFINED = 0
27-
CODEC_C = 1
28-
CODEC_PY = 2
29-
CODEC_ARRAY = 3
30-
CODEC_COMPOSITE = 4
31-
CODEC_RANGE = 5
26+
CODEC_UNDEFINED = 0
27+
CODEC_C = 1
28+
CODEC_PY = 2
29+
CODEC_ARRAY = 3
30+
CODEC_COMPOSITE = 4
31+
CODEC_RANGE = 5
32+
CODEC_MULTIRANGE = 6
3233

3334

3435
cdef enum ServerDataFormat:
@@ -95,6 +96,9 @@ cdef class Codec:
9596
cdef encode_range(self, ConnectionSettings settings, WriteBuffer buf,
9697
object obj)
9798

99+
cdef encode_multirange(self, ConnectionSettings settings, WriteBuffer buf,
100+
object obj)
101+
98102
cdef encode_composite(self, ConnectionSettings settings, WriteBuffer buf,
99103
object obj)
100104

@@ -109,6 +113,8 @@ cdef class Codec:
109113

110114
cdef decode_range(self, ConnectionSettings settings, FRBuffer *buf)
111115

116+
cdef decode_multirange(self, ConnectionSettings settings, FRBuffer *buf)
117+
112118
cdef decode_composite(self, ConnectionSettings settings, FRBuffer *buf)
113119

114120
cdef decode_in_python(self, ConnectionSettings settings, FRBuffer *buf)
@@ -139,6 +145,12 @@ cdef class Codec:
139145
str schema,
140146
Codec element_codec)
141147

148+
@staticmethod
149+
cdef Codec new_multirange_codec(uint32_t oid,
150+
str name,
151+
str schema,
152+
Codec element_codec)
153+
142154
@staticmethod
143155
cdef Codec new_composite_codec(uint32_t oid,
144156
str name,

asyncpg/protocol/codecs/base.pyx

+54-2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ cdef class Codec:
7171
'range types is not supported'.format(schema, name))
7272
self.encoder = <codec_encode_func>&self.encode_range
7373
self.decoder = <codec_decode_func>&self.decode_range
74+
elif type == CODEC_MULTIRANGE:
75+
if format != PG_FORMAT_BINARY:
76+
raise exceptions.UnsupportedClientFeatureError(
77+
'cannot decode type "{}"."{}": text encoding of '
78+
'range types is not supported'.format(schema, name))
79+
self.encoder = <codec_encode_func>&self.encode_multirange
80+
self.decoder = <codec_decode_func>&self.decode_multirange
7481
elif type == CODEC_COMPOSITE:
7582
if format != PG_FORMAT_BINARY:
7683
raise exceptions.UnsupportedClientFeatureError(
@@ -122,6 +129,12 @@ cdef class Codec:
122129
codec_encode_func_ex,
123130
<void*>(<cpython.PyObject>self.element_codec))
124131

132+
cdef encode_multirange(self, ConnectionSettings settings, WriteBuffer buf,
133+
object obj):
134+
multirange_encode(settings, buf, obj, self.element_codec.oid,
135+
codec_encode_func_ex,
136+
<void*>(<cpython.PyObject>self.element_codec))
137+
125138
cdef encode_composite(self, ConnectionSettings settings, WriteBuffer buf,
126139
object obj):
127140
cdef:
@@ -209,6 +222,10 @@ cdef class Codec:
209222
return range_decode(settings, buf, codec_decode_func_ex,
210223
<void*>(<cpython.PyObject>self.element_codec))
211224

225+
cdef decode_multirange(self, ConnectionSettings settings, FRBuffer *buf):
226+
return multirange_decode(settings, buf, codec_decode_func_ex,
227+
<void*>(<cpython.PyObject>self.element_codec))
228+
212229
cdef decode_composite(self, ConnectionSettings settings,
213230
FRBuffer *buf):
214231
cdef:
@@ -294,7 +311,11 @@ cdef class Codec:
294311
if self.c_encoder is not NULL or self.py_encoder is not None:
295312
return True
296313

297-
elif self.type == CODEC_ARRAY or self.type == CODEC_RANGE:
314+
elif (
315+
self.type == CODEC_ARRAY
316+
or self.type == CODEC_RANGE
317+
or self.type == CODEC_MULTIRANGE
318+
):
298319
return self.element_codec.has_encoder()
299320

300321
elif self.type == CODEC_COMPOSITE:
@@ -312,7 +333,11 @@ cdef class Codec:
312333
if self.c_decoder is not NULL or self.py_decoder is not None:
313334
return True
314335

315-
elif self.type == CODEC_ARRAY or self.type == CODEC_RANGE:
336+
elif (
337+
self.type == CODEC_ARRAY
338+
or self.type == CODEC_RANGE
339+
or self.type == CODEC_MULTIRANGE
340+
):
316341
return self.element_codec.has_decoder()
317342

318343
elif self.type == CODEC_COMPOSITE:
@@ -358,6 +383,18 @@ cdef class Codec:
358383
None, None, None, 0)
359384
return codec
360385

386+
@staticmethod
387+
cdef Codec new_multirange_codec(uint32_t oid,
388+
str name,
389+
str schema,
390+
Codec element_codec):
391+
cdef Codec codec
392+
codec = Codec(oid)
393+
codec.init(name, schema, 'multirange', CODEC_MULTIRANGE,
394+
element_codec.format, PG_XFORMAT_OBJECT, NULL, NULL,
395+
None, None, element_codec, None, None, None, 0)
396+
return codec
397+
361398
@staticmethod
362399
cdef Codec new_composite_codec(uint32_t oid,
363400
str name,
@@ -536,6 +573,21 @@ cdef class DataCodecConfig:
536573
self._derived_type_codecs[oid, elem_codec.format] = \
537574
Codec.new_range_codec(oid, name, schema, elem_codec)
538575

576+
elif ti['kind'] == b'm':
577+
# Multirange type
578+
579+
if not range_subtype_oid:
580+
raise exceptions.InternalClientError(
581+
f'type record missing base type for multirange {oid}')
582+
583+
elem_codec = self.get_codec(range_subtype_oid, PG_FORMAT_ANY)
584+
if elem_codec is None:
585+
elem_codec = self.declare_fallback_codec(
586+
range_subtype_oid, ti['range_subtype_name'], schema)
587+
588+
self._derived_type_codecs[oid, elem_codec.format] = \
589+
Codec.new_multirange_codec(oid, name, schema, elem_codec)
590+
539591
elif ti['kind'] == b'e':
540592
# Enum types are essentially text
541593
self._set_builtin_type_codec(oid, name, schema, 'scalar',

asyncpg/protocol/codecs/pgproto.pyx

+16-2
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,9 @@ cdef init_pseudo_codecs():
273273
FDW_HANDLEROID, TSM_HANDLEROID, INTERNALOID, OPAQUEOID,
274274
ANYELEMENTOID, ANYNONARRAYOID, ANYCOMPATIBLEOID,
275275
ANYCOMPATIBLEARRAYOID, ANYCOMPATIBLENONARRAYOID,
276-
ANYCOMPATIBLERANGEOID, PG_DDL_COMMANDOID, INDEX_AM_HANDLEROID,
277-
TABLE_AM_HANDLEROID,
276+
ANYCOMPATIBLERANGEOID, ANYCOMPATIBLEMULTIRANGEOID,
277+
ANYRANGEOID, ANYMULTIRANGEOID, ANYARRAYOID,
278+
PG_DDL_COMMANDOID, INDEX_AM_HANDLEROID, TABLE_AM_HANDLEROID,
278279
]
279280

280281
register_core_codec(ANYENUMOID,
@@ -330,6 +331,19 @@ cdef init_pseudo_codecs():
330331
<decode_func>pgproto.bytea_decode,
331332
PG_FORMAT_BINARY)
332333

334+
# These two are internal to BRIN index support and are unlikely
335+
# to be sent, but since I/O functions for these exist, add decoders
336+
# nonetheless.
337+
register_core_codec(PG_BRIN_BLOOM_SUMMARYOID,
338+
NULL,
339+
<decode_func>pgproto.bytea_decode,
340+
PG_FORMAT_BINARY)
341+
342+
register_core_codec(PG_BRIN_MINMAX_MULTI_SUMMARYOID,
343+
NULL,
344+
<decode_func>pgproto.bytea_decode,
345+
PG_FORMAT_BINARY)
346+
333347

334348
cdef init_text_codecs():
335349
textoids = [

asyncpg/protocol/codecs/range.pyx

+52-6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
from asyncpg import types as apg_types
99

10+
from collections.abc import Sequence as SequenceABC
11+
1012
# defined in postgresql/src/include/utils/rangetypes.h
1113
DEF RANGE_EMPTY = 0x01 # range is empty
1214
DEF RANGE_LB_INC = 0x02 # lower bound is inclusive
@@ -139,11 +141,55 @@ cdef range_decode(ConnectionSettings settings, FRBuffer *buf,
139141
empty=(flags & RANGE_EMPTY) != 0)
140142

141143

142-
cdef init_range_codecs():
143-
register_core_codec(ANYRANGEOID,
144-
NULL,
145-
<decode_func>pgproto.text_decode,
146-
PG_FORMAT_TEXT)
144+
cdef multirange_encode(ConnectionSettings settings, WriteBuffer buf,
145+
object obj, uint32_t elem_oid,
146+
encode_func_ex encoder, const void *encoder_arg):
147+
cdef:
148+
WriteBuffer elem_data
149+
150+
if not isinstance(obj, SequenceABC):
151+
raise TypeError(
152+
'expected a sequence (got type {!r})'.format(type(obj).__name__)
153+
)
154+
155+
elem_data = WriteBuffer.new()
156+
157+
for elem in obj:
158+
range_encode(settings, elem_data, elem, elem_oid, encoder, encoder_arg)
159+
160+
# Datum length
161+
buf.write_int32(4 + elem_data.len())
162+
# Number of elements in multirange
163+
buf.write_int32(len(obj))
164+
buf.write_buffer(elem_data)
165+
147166

167+
cdef multirange_decode(ConnectionSettings settings, FRBuffer *buf,
168+
decode_func_ex decoder, const void *decoder_arg):
169+
cdef:
170+
int32_t nelems = hton.unpack_int32(frb_read(buf, 4))
171+
FRBuffer elem_buf
172+
int32_t elem_len
173+
int i
174+
list result
175+
176+
if nelems == 0:
177+
return []
178+
179+
if nelems < 0:
180+
raise exceptions.ProtocolError(
181+
'unexpected multirange size value: {}'.format(nelems))
182+
183+
result = cpython.PyList_New(nelems)
184+
for i in range(nelems):
185+
elem_len = hton.unpack_int32(frb_read(buf, 4))
186+
if elem_len == -1:
187+
raise exceptions.ProtocolError(
188+
'unexpected NULL element in multirange value')
189+
else:
190+
frb_slice_from(&elem_buf, buf, elem_len)
191+
elem = range_decode(settings, &elem_buf, decoder, decoder_arg)
192+
cpython.Py_INCREF(elem)
193+
cpython.PyList_SET_ITEM(result, i, elem)
148194

149-
init_range_codecs()
195+
return result

asyncpg/protocol/pgtypes.pxi

+8
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ DEF JSONPATHOID = 4072
101101
DEF REGNAMESPACEOID = 4089
102102
DEF REGROLEOID = 4096
103103
DEF REGCOLLATIONOID = 4191
104+
DEF ANYMULTIRANGEOID = 4537
105+
DEF ANYCOMPATIBLEMULTIRANGEOID = 4538
106+
DEF PG_BRIN_BLOOM_SUMMARYOID = 4600
107+
DEF PG_BRIN_MINMAX_MULTI_SUMMARYOID = 4601
104108
DEF PG_MCV_LISTOID = 5017
105109
DEF PG_SNAPSHOTOID = 5038
106110
DEF XID8OID = 5069
@@ -116,11 +120,13 @@ BUILTIN_TYPE_OID_MAP = {
116120
ACLITEMOID: 'aclitem',
117121
ANYARRAYOID: 'anyarray',
118122
ANYCOMPATIBLEARRAYOID: 'anycompatiblearray',
123+
ANYCOMPATIBLEMULTIRANGEOID: 'anycompatiblemultirange',
119124
ANYCOMPATIBLENONARRAYOID: 'anycompatiblenonarray',
120125
ANYCOMPATIBLEOID: 'anycompatible',
121126
ANYCOMPATIBLERANGEOID: 'anycompatiblerange',
122127
ANYELEMENTOID: 'anyelement',
123128
ANYENUMOID: 'anyenum',
129+
ANYMULTIRANGEOID: 'anymultirange',
124130
ANYNONARRAYOID: 'anynonarray',
125131
ANYOID: 'any',
126132
ANYRANGEOID: 'anyrange',
@@ -161,6 +167,8 @@ BUILTIN_TYPE_OID_MAP = {
161167
OIDOID: 'oid',
162168
OPAQUEOID: 'opaque',
163169
PATHOID: 'path',
170+
PG_BRIN_BLOOM_SUMMARYOID: 'pg_brin_bloom_summary',
171+
PG_BRIN_MINMAX_MULTI_SUMMARYOID: 'pg_brin_minmax_multi_summary',
164172
PG_DDL_COMMANDOID: 'pg_ddl_command',
165173
PG_DEPENDENCIESOID: 'pg_dependencies',
166174
PG_LSNOID: 'pg_lsn',

tests/test_codecs.py

+53
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,59 @@ async def test_range_types(self):
10421042
dic = {obj_a: 1, obj_b: 2}
10431043
self.assertEqual(len(dic), count)
10441044

1045+
async def test_multirange_types(self):
1046+
"""Test encoding/decoding of multirange types."""
1047+
1048+
if self.server_version < (14, 0):
1049+
self.skipTest("this server does not support multirange types")
1050+
1051+
cases = [
1052+
('int4multirange', [
1053+
[
1054+
[],
1055+
[]
1056+
],
1057+
[
1058+
[()],
1059+
[]
1060+
],
1061+
[
1062+
[asyncpg.Range(empty=True)],
1063+
[]
1064+
],
1065+
[
1066+
[asyncpg.Range(0, 9, lower_inc=False, upper_inc=True)],
1067+
[asyncpg.Range(1, 10)]
1068+
],
1069+
[
1070+
[(1, 9), (9, 11)],
1071+
[asyncpg.Range(1, 12)]
1072+
],
1073+
[
1074+
[(1, 9), (20, 30)],
1075+
[asyncpg.Range(1, 10), asyncpg.Range(20, 31)]
1076+
],
1077+
[
1078+
[(None, 2)],
1079+
[asyncpg.Range(None, 3)],
1080+
]
1081+
])
1082+
]
1083+
1084+
for (typname, sample_data) in cases:
1085+
st = await self.con.prepare(
1086+
"SELECT $1::" + typname
1087+
)
1088+
1089+
for sample, expected in sample_data:
1090+
with self.subTest(sample=sample, typname=typname):
1091+
result = await st.fetchval(sample)
1092+
self.assertEqual(result, expected)
1093+
1094+
with self.assertRaisesRegex(
1095+
asyncpg.DataError, 'expected a sequence'):
1096+
await self.con.fetch("SELECT $1::int4multirange", 1)
1097+
10451098
async def test_extra_codec_alias(self):
10461099
"""Test encoding/decoding of a builtin non-pg_catalog codec."""
10471100
await self.con.execute('''

0 commit comments

Comments
 (0)