-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Implement zlib compression #1487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 80 commits
Commits
Show all changes
97 commits
Select commit
Hold shift + click to select a range
e6c682c
packets: implemented compression protocol
77f6792
packets: implemented compression protocol CR changes
a0cf94b
packets: implemented compression protocol: remove bytes.Reset for bac…
4cdff28
Merge branch 'master' of https://github.com/go-sql-driver/mysql
d0ea1a4
reading working
477c9f8
writerly changes
996ed2d
PR 649: adding compression (second code review)
f74faed
do not query max_allowed_packet by default (#680)
julienschmidt b3a093e
packets: do not call function on nulled value (#678)
julienschmidt 5eaa5ff
ColumnType interfaces (#667)
julienschmidt ee46028
Add Aurora errno to rejectReadOnly check (#634)
jeffcharles 93aed73
allow successful TravisCI runs in forks (#639)
jmhodges 4f10ee5
Drop support for Go 1.6 and lower (#696)
julienschmidt 59b0f90
Make gofmt happy (#704)
julienschmidt 3fbf53a
Added support for custom string types in ConvertValue. (#623)
dsmontoya f9c6a2c
Implement NamedValueChecker for mysqlConn (#690)
pushrax 6046bf0
Fix Valuers by returning driver.ErrSkip if couldn't convert type inte…
randomjunk 385673a
statement: Fix conversion of Valuer (#710)
linxGnu 9031984
Fixed imports for appengine/cloudsql (#700)
rrbrussell 6992fad
Fix tls=true didn't work with host without port (#718)
methane 386f84b
Differentiate between BINARY and CHAR (#724)
kwoodhouse93 f853432
Test with latest Go patch versions (#693)
AlekSi d1a8b86
Fix prepared statement (#734)
methane 3167920
driver.ErrBadConn when init packet read fails (#736)
fb33a2c
packets: implemented compression protocol
f174605
packets: implemented compression protocol CR changes
dbd1e2b
third code review changes
3e12e32
PR 649: minor cleanup
17a06f1
Merge branch 'master' into master
methane 60bdaec
Sort AUTHORS
methane 422ab6f
Update dsn.go
methane ee2a1c7
Merge remote-tracking branch 'upstream/master' into compression
1f38652
Please linter.
d43864e
Formatting.
1c2ac70
Unexport constructors.
944e638
Fix tests.
15017fc
Update AUTHORS.
f400590
Formatting.
084dafb
Update README feature list.
ee87a7d
Fix TLS.
b8cfe77
Formatting.
d2501ec
Tidy up.
59c3cf1
Fix compression negotiations.
d1aef08
Format README.
09a4fb8
Add usage instructions to README.
e523af2
Add minCompressLength param.
efbc53b
Fix non-compression of small packets.
ac5cb7a
Merge remote-tracking branch 'upstream/master' into compression
7610823
Rename fields for clarity.
eb449fa
Simplify compressedWriter.Write.
75c2480
Disable compression by default.
6a38735
Merge remote-tracking branch 'upstream/master' into compression
8b8b428
Fix bytes.NewBuffer usage.
bc3ad68
Revert README formatting.
b6d9883
Update README with compression usage.
850c83f
Merge remote-tracking branch 'upstream/master' into compression
methane 5ec621c
simplify
methane 3d0d418
change minCompressLength to 150
methane 31b8b38
fixup
methane 9f797b1
remove unnecessary test
methane d7ed578
code cleanup and minor improvements
methane 0f9ec9f
remove test depends on compressed output
methane a64171f
cleanup
methane d78cdf8
fix test
methane eb42024
fix sync error
methane 679cc53
fix sync error again
methane 876af07
fix todo
methane d5ad92e
merge compressedReader and compressedWriter
methane 1c05916
use sync.Pool for zlib
methane 39e52e4
cleanup
methane 0e3ace3
code cleanup
methane 750fe2a
fix typo
methane 0512769
move const flag
methane 60ce788
remove writer from compressor
methane ee70acf
remove packetWriter and simplify tests
methane 1e78561
run tests with compression
methane 77d86ec
fix tests
methane e1dc557
wip
methane 406bce2
Merge branch 'master' into compression
methane e9f5b24
fix some errors
methane 243b3df
Merge branch 'master' into compression
methane a82ca05
Merge branch 'master' into compression
methane 1fee4a0
remove traceLogger
methane 3062a2f
fix README
methane 700db22
compress: better buffer reuse
methane 3f89621
some refactoring
methane e8b96f2
move writeTimeout from mysqlConn to buffer
methane 9fad4c0
refactoring
methane 44d7dee
fix tests
methane 25cf587
sequenceNr -> sequence
methane 0bc8145
allow returning ErrBadConn on compression
methane d2ecd57
simplify
methane 39db0ba
refactor
methane 962608a
refactor
methane 59d0d57
refactor
methane d58a709
cleanup
methane 994cd82
error check
methane File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,246 @@ | ||
// Go MySQL Driver - A MySQL-Driver for Go's database/sql package | ||
// | ||
// Copyright 2024 The Go-MySQL-Driver Authors. All rights reserved. | ||
// | ||
// This Source Code Form is subject to the terms of the Mozilla Public | ||
// License, v. 2.0. If a copy of the MPL was not distributed with this file, | ||
// You can obtain one at http://mozilla.org/MPL/2.0/. | ||
|
||
package mysql | ||
|
||
import ( | ||
"bytes" | ||
"compress/zlib" | ||
"fmt" | ||
"io" | ||
"sync" | ||
) | ||
|
||
var ( | ||
zrPool *sync.Pool // Do not use directly. Use zDecompress() instead. | ||
zwPool *sync.Pool // Do not use directly. Use zCompress() instead. | ||
) | ||
|
||
func init() { | ||
zrPool = &sync.Pool{ | ||
New: func() any { return nil }, | ||
} | ||
zwPool = &sync.Pool{ | ||
New: func() any { | ||
zw, err := zlib.NewWriterLevel(new(bytes.Buffer), 2) | ||
if err != nil { | ||
panic(err) // compress/zlib return non-nil error only if level is invalid | ||
} | ||
return zw | ||
}, | ||
} | ||
} | ||
|
||
func zDecompress(src, dst []byte) (int, error) { | ||
br := bytes.NewReader(src) | ||
var zr io.ReadCloser | ||
var err error | ||
|
||
if a := zrPool.Get(); a == nil { | ||
if zr, err = zlib.NewReader(br); err != nil { | ||
return 0, err | ||
} | ||
} else { | ||
zr = a.(io.ReadCloser) | ||
if zr.(zlib.Resetter).Reset(br, nil); err != nil { | ||
return 0, err | ||
} | ||
} | ||
defer func() { | ||
zr.Close() | ||
zrPool.Put(zr) | ||
}() | ||
|
||
lenRead := 0 | ||
size := len(dst) | ||
|
||
for lenRead < size { | ||
n, err := zr.Read(dst[lenRead:]) | ||
lenRead += n | ||
|
||
if err == io.EOF { | ||
if lenRead < size { | ||
return lenRead, io.ErrUnexpectedEOF | ||
} | ||
} else if err != nil { | ||
return lenRead, err | ||
} | ||
} | ||
return lenRead, nil | ||
} | ||
|
||
func zCompress(src []byte, dst io.Writer) error { | ||
zw := zwPool.Get().(*zlib.Writer) | ||
zw.Reset(dst) | ||
if _, err := zw.Write(src); err != nil { | ||
return err | ||
} | ||
zw.Close() | ||
zwPool.Put(zw) | ||
return nil | ||
} | ||
|
||
type decompressor struct { | ||
mc *mysqlConn | ||
// read buffer (FIFO). | ||
// We can not reuse already-read buffer until dropping Go 1.20 support. | ||
// It is because of database/mysql's weired behavior. | ||
// See https://github.com/go-sql-driver/mysql/issues/1435 | ||
bytesBuf []byte | ||
} | ||
|
||
func newDecompressor(mc *mysqlConn) *decompressor { | ||
return &decompressor{ | ||
mc: mc, | ||
} | ||
} | ||
|
||
func (c *decompressor) readNext(need int) ([]byte, error) { | ||
for len(c.bytesBuf) < need { | ||
if err := c.uncompressPacket(); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
data := c.bytesBuf[:need:need] // prevent caller writes into r.bytesBuf | ||
c.bytesBuf = c.bytesBuf[need:] | ||
return data, nil | ||
} | ||
|
||
func (c *decompressor) uncompressPacket() error { | ||
header, err := c.mc.buf.readNext(7) // size of compressed header | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// compressed header structure | ||
comprLength := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) | ||
uncompressedLength := int(uint32(header[4]) | uint32(header[5])<<8 | uint32(header[6])<<16) | ||
compressionSequence := uint8(header[3]) | ||
if debugTrace { | ||
traceLogger.Printf("uncompress cmplen=%v uncomplen=%v pkt_cmp_seq=%v expected_cmp_seq=%v\n", | ||
comprLength, uncompressedLength, compressionSequence, c.mc.sequence) | ||
} | ||
if compressionSequence != c.mc.sequence { | ||
// return ErrPktSync | ||
// server may return error packet (e.g. 1153 Got a packet bigger than 'max_allowed_packet' bytes) | ||
// before receiving all packets from client. In this case, seqnr is younger than expected. | ||
if debugTrace { | ||
traceLogger.Printf("WARN: unexpected cmpress seq nr: expected %v, got %v", | ||
c.mc.sequence, compressionSequence) | ||
} | ||
// TODO(methane): report error when the packet is not an error packet. | ||
} | ||
c.mc.sequence = compressionSequence + 1 | ||
c.mc.compressSequence = c.mc.sequence | ||
|
||
comprData, err := c.mc.buf.readNext(comprLength) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// if payload is uncompressed, its length will be specified as zero, and its | ||
// true length is contained in comprLength | ||
if uncompressedLength == 0 { | ||
c.bytesBuf = append(c.bytesBuf, comprData...) | ||
return nil | ||
} | ||
|
||
// use existing capacity in bytesBuf if possible | ||
offset := len(c.bytesBuf) | ||
if cap(c.bytesBuf)-offset < uncompressedLength { | ||
old := c.bytesBuf | ||
c.bytesBuf = make([]byte, offset, offset+uncompressedLength) | ||
copy(c.bytesBuf, old) | ||
} | ||
|
||
lenRead, err := zDecompress(comprData, c.bytesBuf[offset:offset+uncompressedLength]) | ||
if err != nil { | ||
return err | ||
} | ||
if lenRead != uncompressedLength { | ||
return fmt.Errorf("invalid compressed packet: uncompressed length in header is %d, actual %d", | ||
uncompressedLength, lenRead) | ||
} | ||
c.bytesBuf = c.bytesBuf[:offset+uncompressedLength] | ||
return nil | ||
} | ||
|
||
const maxPayloadLen = maxPacketSize - 4 | ||
|
||
// writeCompressed sends one or some packets with compression. | ||
// Use this instead of mc.netConn.Write() when mc.compress is true. | ||
func (mc *mysqlConn) writeCompressed(packets []byte) (int, error) { | ||
totalBytes := len(packets) | ||
dataLen := len(packets) | ||
blankHeader := make([]byte, 7) | ||
var buf bytes.Buffer | ||
|
||
for dataLen > 0 { | ||
payloadLen := dataLen | ||
if payloadLen > maxPayloadLen { | ||
payloadLen = maxPayloadLen | ||
} | ||
payload := packets[:payloadLen] | ||
uncompressedLen := payloadLen | ||
|
||
if _, err := buf.Write(blankHeader); err != nil { | ||
return 0, err | ||
} | ||
|
||
// If payload is less than minCompressLength, don't compress. | ||
if uncompressedLen < minCompressLength { | ||
if _, err := buf.Write(payload); err != nil { | ||
return 0, err | ||
} | ||
uncompressedLen = 0 | ||
} else { | ||
zCompress(payload, &buf) | ||
} | ||
|
||
if err := mc.writeCompressedPacket(buf.Bytes(), uncompressedLen); err != nil { | ||
return 0, err | ||
} | ||
dataLen -= payloadLen | ||
packets = packets[payloadLen:] | ||
buf.Reset() | ||
} | ||
|
||
return totalBytes, nil | ||
} | ||
|
||
// writeCompressedPacket writes a compressed packet with header. | ||
// data should start with 7 size space for header followed by payload. | ||
func (mc *mysqlConn) writeCompressedPacket(data []byte, uncompressedLen int) error { | ||
comprLength := len(data) - 7 | ||
if debugTrace { | ||
traceLogger.Printf( | ||
"writeCompressedPacket: comprLength=%v, uncompressedLen=%v, seq=%v", | ||
comprLength, uncompressedLen, mc.compressSequence) | ||
} | ||
|
||
// compression header | ||
data[0] = byte(0xff & comprLength) | ||
data[1] = byte(0xff & (comprLength >> 8)) | ||
data[2] = byte(0xff & (comprLength >> 16)) | ||
|
||
data[3] = mc.compressSequence | ||
|
||
// this value is never greater than maxPayloadLength | ||
data[4] = byte(0xff & uncompressedLen) | ||
data[5] = byte(0xff & (uncompressedLen >> 8)) | ||
data[6] = byte(0xff & (uncompressedLen >> 16)) | ||
|
||
if _, err := mc.netConn.Write(data); err != nil { | ||
mc.log("writing compressed packet:", err) | ||
return err | ||
} | ||
|
||
mc.compressSequence++ | ||
return nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.