@@ -20,6 +20,7 @@ import (
20
20
"bytes"
21
21
"context"
22
22
"crypto/rand"
23
+ "crypto/tls"
23
24
"encoding/json"
24
25
"fmt"
25
26
"io"
@@ -31,16 +32,18 @@ import (
31
32
"reflect"
32
33
"strings"
33
34
"sync"
35
+ "sync/atomic"
34
36
"testing"
35
37
"time"
36
38
37
39
gwebsocket "github.com/gorilla/websocket"
40
+ "github.com/stretchr/testify/require"
38
41
39
42
v1 "k8s.io/api/core/v1"
40
43
apierrors "k8s.io/apimachinery/pkg/api/errors"
41
44
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42
- "k8s.io/apimachinery/pkg/util/httpstream"
43
45
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
46
+ utilnettesting "k8s.io/apimachinery/pkg/util/net/testing"
44
47
"k8s.io/apimachinery/pkg/util/remotecommand"
45
48
"k8s.io/apimachinery/pkg/util/wait"
46
49
"k8s.io/client-go/rest"
@@ -1342,38 +1345,110 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti
1342
1345
return wsStreams , nil
1343
1346
}
1344
1347
1345
- // See (https://github.com/kubernetes/kubernetes/issues/126134).
1346
- func TestWebSocketClient_HTTPSProxyErrorExpected (t * testing.T ) {
1347
- urlStr := "http://127.0.0.1/never-used" + "?" + "stdin=true" + "&" + "stdout=true"
1348
- websocketLocation , err := url .Parse (urlStr )
1349
- if err != nil {
1350
- t .Fatalf ("Unable to parse WebSocket server URL: %s" , urlStr )
1351
- }
1352
- // proxy url with https scheme will trigger websocket dialing error.
1353
- httpsProxyFunc := func (req * http.Request ) (* url.URL , error ) { return url .Parse ("https://127.0.0.1" ) }
1354
- exec , err := NewWebSocketExecutor (& rest.Config {Host : websocketLocation .Host , Proxy : httpsProxyFunc }, "GET" , urlStr )
1355
- if err != nil {
1356
- t .Errorf ("unexpected error creating websocket executor: %v" , err )
1357
- }
1358
- var stdout bytes.Buffer
1359
- options := & StreamOptions {
1360
- Stdout : & stdout ,
1361
- }
1362
- errorChan := make (chan error )
1363
- go func () {
1364
- // Start the streaming on the WebSocket "exec" client.
1365
- errorChan <- exec .StreamWithContext (context .Background (), * options )
1366
- }()
1348
+ func TestWebSocketClient_ProxySucceeds (t * testing.T ) {
1349
+ // Validate websocket proxy succeeds for each of the enumerated schemes.
1350
+ proxySchemes := []string {"http" , "https" }
1351
+ for _ , proxyScheme := range proxySchemes {
1352
+ // Create the proxy handler, keeping track of how many times it was called.
1353
+ var proxyCalled atomic.Int64
1354
+ proxyHandler := utilnettesting .NewHTTPProxyHandler (t , func (req * http.Request ) bool {
1355
+ proxyCalled .Add (1 )
1356
+ return true
1357
+ })
1358
+ defer proxyHandler .Wait ()
1359
+ // Create/Start the proxy server, adding TLS functionality depending on scheme.
1360
+ proxyServer := httptest .NewUnstartedServer (proxyHandler )
1361
+ if proxyScheme == "https" {
1362
+ cert , err := tls .X509KeyPair (localhostCert , localhostKey )
1363
+ if err != nil {
1364
+ t .Errorf ("https (valid hostname): proxy_test: %v" , err )
1365
+ }
1366
+ proxyServer .TLS = & tls.Config {Certificates : []tls.Certificate {cert }}
1367
+ proxyServer .StartTLS ()
1368
+ } else {
1369
+ proxyServer .Start ()
1370
+ }
1371
+ defer proxyServer .Close () //nolint:errcheck
1372
+ proxyLocation , err := url .Parse (proxyServer .URL )
1373
+ require .NoError (t , err )
1374
+ t .Logf ("Proxy URL: %s" , proxyLocation .String ())
1367
1375
1368
- select {
1369
- case <- time .After (wait .ForeverTestTimeout ):
1370
- t .Fatalf ("expect stream to be closed after connection is closed." )
1371
- case err := <- errorChan :
1372
- if err == nil {
1373
- t .Errorf ("expected error but received none" )
1376
+ // Create fake WebSocket server. Copy received STDIN data back onto STDOUT stream.
1377
+ websocketServer := httptest .NewServer (http .HandlerFunc (func (w http.ResponseWriter , req * http.Request ) {
1378
+ conns , err := webSocketServerStreams (req , w , streamOptionsFromRequest (req ))
1379
+ if err != nil {
1380
+ t .Fatalf ("error on webSocketServerStreams: %v" , err )
1381
+ }
1382
+ defer conns .conn .Close () //nolint:errcheck
1383
+ // Loopback the STDIN stream onto the STDOUT stream.
1384
+ _ , err = io .Copy (conns .stdoutStream , conns .stdinStream )
1385
+ if err != nil {
1386
+ t .Fatalf ("error copying STDIN to STDOUT: %v" , err )
1387
+ }
1388
+ }))
1389
+ defer websocketServer .Close () //nolint:errcheck
1390
+
1391
+ // Now create the WebSocket client (executor), and point it to the TLS proxy server.
1392
+ // The proxy server should open a websocket connection to the fake websocket server.
1393
+ websocketServer .URL = websocketServer .URL + "?" + "stdin=true" + "&" + "stdout=true"
1394
+ websocketLocation , err := url .Parse (websocketServer .URL )
1395
+ require .NoError (t , err )
1396
+ clientConfig := & rest.Config {
1397
+ Host : websocketLocation .Host ,
1398
+ // Unused if "http" scheme.
1399
+ TLSClientConfig : rest.TLSClientConfig {CAData : localhostCert },
1400
+ Proxy : func (req * http.Request ) (* url.URL , error ) {
1401
+ return proxyLocation , nil
1402
+ },
1403
+ }
1404
+ exec , err := NewWebSocketExecutor (clientConfig , "GET" , websocketServer .URL )
1405
+ require .NoError (t , err )
1406
+
1407
+ // Generate random data, and set it up to stream on STDIN. The data will be
1408
+ // returned on the STDOUT buffer.
1409
+ randomSize := 1024 * 1024
1410
+ randomData := make ([]byte , randomSize )
1411
+ if _ , err := rand .Read (randomData ); err != nil {
1412
+ t .Errorf ("unexpected error reading random data: %v" , err )
1413
+ }
1414
+ var stdout bytes.Buffer
1415
+ options := & StreamOptions {
1416
+ Stdin : bytes .NewReader (randomData ),
1417
+ Stdout : & stdout ,
1418
+ }
1419
+ errorChan := make (chan error )
1420
+ go func () {
1421
+ // Start the streaming on the WebSocket "exec" client.
1422
+ errorChan <- exec .StreamWithContext (context .Background (), * options )
1423
+ }()
1424
+
1425
+ select {
1426
+ case <- time .After (wait .ForeverTestTimeout ):
1427
+ t .Fatalf ("expect stream to be closed after connection is closed." )
1428
+ case err := <- errorChan :
1429
+ if err != nil {
1430
+ t .Fatalf ("unexpected error: %v" , err )
1431
+ }
1432
+ // Validate remote command v5 protocol was negotiated.
1433
+ streamExec := exec .(* wsStreamExecutor )
1434
+ if remotecommand .StreamProtocolV5Name != streamExec .negotiated {
1435
+ t .Fatalf ("expected remote command v5 protocol, got (%s)" , streamExec .negotiated )
1436
+ }
1437
+ }
1438
+ data , err := io .ReadAll (bytes .NewReader (stdout .Bytes ()))
1439
+ if err != nil {
1440
+ t .Fatalf ("error reading the stream: %v" , err )
1441
+ }
1442
+ // Check the random data sent on STDIN was the same returned on STDOUT.
1443
+ t .Logf ("comparing %d random bytes sent data versus received" , len (randomData ))
1444
+ if ! bytes .Equal (randomData , data ) {
1445
+ t .Errorf ("unexpected data received: %d sent: %d" , len (data ), len (randomData ))
1446
+ } else {
1447
+ t .Log ("success--random bytes are the same" )
1374
1448
}
1375
- if ! httpstream .IsHTTPSProxyError (err ) {
1376
- t .Errorf ("expected https proxy error, got (%s)" , err )
1449
+ // Ensure the proxy was called once
1450
+ if e , a := int64 (1 ), proxyCalled .Load (); e != a {
1451
+ t .Errorf ("expected %d proxy call, got %d" , e , a )
1377
1452
}
1378
1453
}
1379
1454
}
0 commit comments