1
1
package com .clickhouse .client .api ;
2
2
3
- import com .clickhouse .client .ClickHouseNode ;
4
3
import com .clickhouse .client .api .command .CommandResponse ;
5
4
import com .clickhouse .client .api .command .CommandSettings ;
6
5
import com .clickhouse .client .api .data_formats .ClickHouseBinaryFormatReader ;
36
35
import com .clickhouse .client .api .query .QueryResponse ;
37
36
import com .clickhouse .client .api .query .QuerySettings ;
38
37
import com .clickhouse .client .api .query .Records ;
38
+ import com .clickhouse .client .api .transport .Endpoint ;
39
+ import com .clickhouse .client .api .transport .HttpEndpoint ;
39
40
import com .clickhouse .client .config .ClickHouseClientOption ;
40
41
import com .clickhouse .data .ClickHouseColumn ;
41
42
import com .clickhouse .data .ClickHouseFormat ;
43
+ import com .google .common .collect .ImmutableList ;
42
44
import net .jpountz .lz4 .LZ4Factory ;
43
45
import org .apache .hc .core5 .concurrent .DefaultThreadFactory ;
44
46
import org .apache .hc .core5 .http .ClassicHttpResponse ;
78
80
import java .util .concurrent .TimeUnit ;
79
81
import java .util .concurrent .TimeoutException ;
80
82
import java .util .function .Supplier ;
83
+ import java .util .stream .Collectors ;
81
84
82
85
import static java .time .temporal .ChronoUnit .MILLIS ;
83
86
import static java .time .temporal .ChronoUnit .SECONDS ;
@@ -116,13 +119,12 @@ public class Client implements AutoCloseable {
116
119
117
120
private HttpAPIClientHelper httpClientHelper = null ;
118
121
119
- private final Set <String > endpoints ;
122
+ private final List <Endpoint > endpoints ;
123
+
120
124
private final Map <String , String > configuration ;
121
125
122
126
private final Map <String , String > readOnlyConfig ;
123
127
124
- private final List <ClickHouseNode > serverNodes = new ArrayList <>();
125
-
126
128
// POJO serializer mapping (class -> (schema -> (format -> serializer)))
127
129
private final Map <Class <?>, Map <String , Map <String , POJOSerializer >>> serializers ;
128
130
@@ -154,16 +156,16 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
154
156
155
157
private Client (Set <String > endpoints , Map <String ,String > configuration , boolean useNewImplementation ,
156
158
ExecutorService sharedOperationExecutor , ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy , Object metricsRegistry ) {
157
- this . endpoints = endpoints ;
159
+ // Simple initialization
158
160
this .configuration = configuration ;
159
161
this .readOnlyConfig = Collections .unmodifiableMap (this .configuration );
160
- this .endpoints .forEach (endpoint -> {
161
- this .serverNodes .add (ClickHouseNode .of (endpoint , this .configuration ));
162
- });
163
162
this .metricsRegistry = metricsRegistry ;
163
+
164
+ // Serialization
164
165
this .serializers = new ConcurrentHashMap <>();
165
166
this .deserializers = new ConcurrentHashMap <>();
166
167
168
+ // Operation Execution
167
169
boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey (), false );
168
170
if (isAsyncEnabled && sharedOperationExecutor == null ) {
169
171
this .isSharedOpExecutorOwned = true ;
@@ -172,10 +174,29 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
172
174
this .isSharedOpExecutorOwned = false ;
173
175
this .sharedOperationExecutor = sharedOperationExecutor ;
174
176
}
175
- boolean initSslContext = getEndpoints ().stream ().anyMatch (s -> s .toLowerCase ().contains ("https://" ));
176
- this .httpClientHelper = new HttpAPIClientHelper (configuration , metricsRegistry , initSslContext );
177
+
177
178
this .columnToMethodMatchingStrategy = columnToMethodMatchingStrategy ;
178
179
180
+
181
+ // Transport
182
+ ImmutableList .Builder <Endpoint > tmpEndpoints = ImmutableList .builder ();
183
+ boolean initSslContext = false ;
184
+ for (String ep : endpoints ) {
185
+ try {
186
+ HttpEndpoint endpoint = new HttpEndpoint (ep );
187
+ if (endpoint .isSecure ()) {
188
+ initSslContext = true ;
189
+ }
190
+ LOG .debug ("Adding endpoint: {}" , endpoint );
191
+ tmpEndpoints .add (endpoint );
192
+ } catch (Exception e ) {
193
+ throw new ClientException ("Failed to add endpoint " + ep , e );
194
+ }
195
+ }
196
+
197
+ this .endpoints = tmpEndpoints .build ();
198
+ this .httpClientHelper = new HttpAPIClientHelper (configuration , metricsRegistry , initSslContext );
199
+
179
200
String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
180
201
this .retries = retry == null ? 0 : Integer .parseInt (retry );
181
202
boolean useNativeCompression = !MapUtils .getFlag (configuration , ClientConfigProperties .DISABLE_NATIVE_COMPRESSION .getKey (), false );
@@ -1185,11 +1206,6 @@ private void setDefaults() {
1185
1206
}
1186
1207
}
1187
1208
1188
- private ClickHouseNode getServerNode () {
1189
- // TODO: implement load balancing using existing logic
1190
- return this .serverNodes .get (0 );
1191
- }
1192
-
1193
1209
/**
1194
1210
* Pings the server to check if it is alive
1195
1211
* @return true if the server is alive, false otherwise
@@ -1374,13 +1390,13 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1374
1390
Supplier <InsertResponse > supplier = () -> {
1375
1391
long startTime = System .nanoTime ();
1376
1392
// Selecting some node
1377
- ClickHouseNode selectedNode = getNextAliveNode ();
1393
+ Endpoint selectedEndpoint = getNextAliveNode ();
1378
1394
1379
1395
RuntimeException lastException = null ;
1380
1396
for (int i = 0 ; i <= maxRetries ; i ++) {
1381
1397
// Execute request
1382
1398
try (ClassicHttpResponse httpResponse =
1383
- httpClientHelper .executeRequest (selectedNode , finalSettings .getAllSettings (), lz4Factory ,
1399
+ httpClientHelper .executeRequest (selectedEndpoint , finalSettings .getAllSettings (), lz4Factory ,
1384
1400
out -> {
1385
1401
out .write ("INSERT INTO " .getBytes ());
1386
1402
out .write (tableName .getBytes ());
@@ -1404,7 +1420,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1404
1420
// Check response
1405
1421
if (httpResponse .getCode () == HttpStatus .SC_SERVICE_UNAVAILABLE ) {
1406
1422
LOG .warn ("Failed to get response. Server returned {}. Retrying. (Duration: {})" , httpResponse .getCode (), System .nanoTime () - startTime );
1407
- selectedNode = getNextAliveNode ();
1423
+ selectedEndpoint = getNextAliveNode ();
1408
1424
continue ;
1409
1425
}
1410
1426
@@ -1421,7 +1437,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1421
1437
(i + 1 ), (maxRetries + 1 ), System .nanoTime () - startTime ), e );
1422
1438
if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1423
1439
LOG .warn ("Retrying." , e );
1424
- selectedNode = getNextAliveNode ();
1440
+ selectedEndpoint = getNextAliveNode ();
1425
1441
} else {
1426
1442
throw lastException ;
1427
1443
}
@@ -1591,13 +1607,13 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1591
1607
responseSupplier = () -> {
1592
1608
long startTime = System .nanoTime ();
1593
1609
// Selecting some node
1594
- ClickHouseNode selectedNode = getNextAliveNode ();
1610
+ Endpoint selectedEndpoint = getNextAliveNode ();
1595
1611
1596
1612
RuntimeException lastException = null ;
1597
1613
for (int i = 0 ; i <= retries ; i ++) {
1598
1614
// Execute request
1599
1615
try (ClassicHttpResponse httpResponse =
1600
- httpClientHelper .executeRequest (selectedNode , finalSettings .getAllSettings (), lz4Factory ,
1616
+ httpClientHelper .executeRequest (selectedEndpoint , finalSettings .getAllSettings (), lz4Factory ,
1601
1617
out -> {
1602
1618
writer .onOutput (out );
1603
1619
out .close ();
@@ -1607,7 +1623,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1607
1623
// Check response
1608
1624
if (httpResponse .getCode () == HttpStatus .SC_SERVICE_UNAVAILABLE ) {
1609
1625
LOG .warn ("Failed to get response. Server returned {}. Retrying. (Duration: {})" , System .nanoTime () - startTime , httpResponse .getCode ());
1610
- selectedNode = getNextAliveNode ();
1626
+ selectedEndpoint = getNextAliveNode ();
1611
1627
continue ;
1612
1628
}
1613
1629
@@ -1623,7 +1639,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
1623
1639
(i + 1 ), (retries + 1 ), System .nanoTime () - startTime ), e );
1624
1640
if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1625
1641
LOG .warn ("Retrying." , e );
1626
- selectedNode = getNextAliveNode ();
1642
+ selectedEndpoint = getNextAliveNode ();
1627
1643
} else {
1628
1644
throw lastException ;
1629
1645
}
@@ -1715,20 +1731,20 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
1715
1731
responseSupplier = () -> {
1716
1732
long startTime = System .nanoTime ();
1717
1733
// Selecting some node
1718
- ClickHouseNode selectedNode = getNextAliveNode ();
1734
+ Endpoint selectedEndpoint = getNextAliveNode ();
1719
1735
RuntimeException lastException = null ;
1720
1736
for (int i = 0 ; i <= retries ; i ++) {
1721
1737
try {
1722
1738
ClassicHttpResponse httpResponse =
1723
- httpClientHelper .executeRequest (selectedNode , finalSettings .getAllSettings (), lz4Factory , output -> {
1739
+ httpClientHelper .executeRequest (selectedEndpoint , finalSettings .getAllSettings (), lz4Factory , output -> {
1724
1740
output .write (sqlQuery .getBytes (StandardCharsets .UTF_8 ));
1725
1741
output .close ();
1726
1742
});
1727
1743
1728
1744
// Check response
1729
1745
if (httpResponse .getCode () == HttpStatus .SC_SERVICE_UNAVAILABLE ) {
1730
1746
LOG .warn ("Failed to get response. Server returned {}. Retrying. (Duration: {})" , System .nanoTime () - startTime , httpResponse .getCode ());
1731
- selectedNode = getNextAliveNode ();
1747
+ selectedEndpoint = getNextAliveNode ();
1732
1748
continue ;
1733
1749
}
1734
1750
@@ -1753,7 +1769,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
1753
1769
(i + 1 ), (retries + 1 ), System .nanoTime () - startTime ), e );
1754
1770
if (httpClientHelper .shouldRetry (e , finalSettings .getAllSettings ())) {
1755
1771
LOG .warn ("Retrying." , e );
1756
- selectedNode = getNextAliveNode ();
1772
+ selectedEndpoint = getNextAliveNode ();
1757
1773
} else {
1758
1774
throw lastException ;
1759
1775
}
@@ -1898,8 +1914,9 @@ public <T> List<T> queryAll(String sqlQuery, Class<T> clazz, TableSchema schema)
1898
1914
* @param allocator - optional supplier to create new instances of the DTO.
1899
1915
* @throws IllegalArgumentException when class is not registered or no setters found
1900
1916
* @return List of POJOs filled with data
1901
- * @param <T>
1917
+ * @param <T> type of POJO
1902
1918
*/
1919
+ @ SuppressWarnings ("unchecked" )
1903
1920
public <T > List <T > queryAll (String sqlQuery , Class <T > clazz , TableSchema schema , Supplier <T > allocator ) {
1904
1921
Map <String , POJOSetter > classDeserializers = deserializers .getOrDefault (clazz ,
1905
1922
Collections .emptyMap ()).getOrDefault (schema .getTableName () == null ?
@@ -2180,9 +2197,10 @@ protected int getOperationTimeout() {
2180
2197
/**
2181
2198
* Returns unmodifiable set of endpoints.
2182
2199
* @return - set of endpoints
2200
+ * @deprecated
2183
2201
*/
2184
2202
public Set <String > getEndpoints () {
2185
- return Collections . unmodifiableSet ( endpoints );
2203
+ return endpoints . stream (). map ( Endpoint :: getBaseURL ). collect ( Collectors . toSet () );
2186
2204
}
2187
2205
2188
2206
public String getUser () {
@@ -2236,8 +2254,8 @@ public void updateBearerToken(String bearer) {
2236
2254
this .configuration .put (ClientConfigProperties .httpHeader (HttpHeaders .AUTHORIZATION ), "Bearer " + bearer );
2237
2255
}
2238
2256
2239
- private ClickHouseNode getNextAliveNode () {
2240
- return serverNodes .get (0 );
2257
+ private Endpoint getNextAliveNode () {
2258
+ return endpoints .get (0 );
2241
2259
}
2242
2260
2243
2261
public static final String VALUES_LIST_DELIMITER = "," ;
0 commit comments