5
5
import com .clickhouse .client .api .data_formats .ClickHouseBinaryFormatReader ;
6
6
import com .clickhouse .client .api .data_formats .NativeFormatReader ;
7
7
import com .clickhouse .client .api .data_formats .RowBinaryFormatReader ;
8
- import com .clickhouse .client .api .data_formats .RowBinaryFormatSerializer ;
9
8
import com .clickhouse .client .api .data_formats .RowBinaryWithNamesAndTypesFormatReader ;
10
9
import com .clickhouse .client .api .data_formats .RowBinaryWithNamesFormatReader ;
11
10
import com .clickhouse .client .api .data_formats .internal .BinaryStreamReader ;
12
11
import com .clickhouse .client .api .data_formats .internal .MapBackedRecord ;
13
12
import com .clickhouse .client .api .data_formats .internal .ProcessParser ;
14
- import com .clickhouse .client .api .data_formats .internal .SerializerUtils ;
15
13
import com .clickhouse .client .api .enums .Protocol ;
16
14
import com .clickhouse .client .api .enums .ProxyType ;
17
15
import com .clickhouse .client .api .http .ClickHouseHttpProto ;
18
- import com .clickhouse .client .api .insert .DataSerializationException ;
19
16
import com .clickhouse .client .api .insert .InsertResponse ;
20
17
import com .clickhouse .client .api .insert .InsertSettings ;
21
- import com .clickhouse .client .api .insert .POJOSerializer ;
22
18
import com .clickhouse .client .api .internal .ClickHouseLZ4OutputStream ;
23
19
import com .clickhouse .client .api .internal .ClientStatisticsHolder ;
24
20
import com .clickhouse .client .api .internal .HttpAPIClientHelper ;
31
27
import com .clickhouse .client .api .metrics .ClientMetrics ;
32
28
import com .clickhouse .client .api .metrics .OperationMetrics ;
33
29
import com .clickhouse .client .api .query .GenericRecord ;
34
- import com .clickhouse .client .api .query .POJOSetter ;
35
30
import com .clickhouse .client .api .query .QueryResponse ;
36
31
import com .clickhouse .client .api .query .QuerySettings ;
37
32
import com .clickhouse .client .api .query .Records ;
33
+ import com .clickhouse .client .api .serde .DataSerializationException ;
34
+ import com .clickhouse .client .api .serde .POJOFieldDeserializer ;
35
+ import com .clickhouse .client .api .serde .POJOFieldSerializer ;
36
+ import com .clickhouse .client .api .serde .POJOSerDe ;
38
37
import com .clickhouse .client .api .transport .Endpoint ;
39
38
import com .clickhouse .client .api .transport .HttpEndpoint ;
40
39
import com .clickhouse .client .config .ClickHouseClientOption ;
54
53
import java .io .InputStream ;
55
54
import java .io .OutputStream ;
56
55
import java .lang .reflect .InvocationTargetException ;
57
- import java .lang .reflect .Method ;
58
56
import java .net .URL ;
59
57
import java .nio .charset .StandardCharsets ;
60
58
import java .time .Duration ;
116
114
*
117
115
*/
118
116
public class Client implements AutoCloseable {
117
+ private static final Logger LOG = LoggerFactory .getLogger (Client .class );
119
118
120
119
private HttpAPIClientHelper httpClientHelper = null ;
121
120
122
121
private final List <Endpoint > endpoints ;
123
-
124
122
private final Map <String , String > configuration ;
125
123
126
124
private final Map <String , String > readOnlyConfig ;
125
+
126
+ private final POJOSerDe pojoSerDe ;
127
127
128
- // POJO serializer mapping (class -> (schema -> (format -> serializer)))
129
- private final Map <Class <?>, Map <String , Map <String , POJOSerializer >>> serializers ;
130
-
131
- // POJO deserializer mapping (class -> (schema -> (format -> deserializer)))
132
- private final Map <Class <?>, Map <String , Map <String , POJOSetter >>> deserializers ;
133
-
134
- private static final Logger LOG = LoggerFactory .getLogger (Client .class );
135
128
private final ExecutorService sharedOperationExecutor ;
136
129
137
130
private final boolean isSharedOpExecutorOwned ;
138
131
139
132
private final Map <String , ClientStatisticsHolder > globalClientStats = new ConcurrentHashMap <>();
140
133
141
- private Map <String , TableSchema > tableSchemaCache = new ConcurrentHashMap <>();
142
- private Map <String , Boolean > tableSchemaHasDefaults = new ConcurrentHashMap <>();
134
+ private final Map <String , TableSchema > tableSchemaCache = new ConcurrentHashMap <>();
143
135
144
- private final ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy ;
136
+ private final Map < String , Boolean > tableSchemaHasDefaults = new ConcurrentHashMap <>() ;
145
137
146
138
// Server context
147
139
private String serverVersion ;
@@ -162,8 +154,7 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
162
154
this .metricsRegistry = metricsRegistry ;
163
155
164
156
// Serialization
165
- this .serializers = new ConcurrentHashMap <>();
166
- this .deserializers = new ConcurrentHashMap <>();
157
+ this .pojoSerDe = new POJOSerDe (columnToMethodMatchingStrategy );
167
158
168
159
// Operation Execution
169
160
boolean isAsyncEnabled = MapUtils .getFlag (this .configuration , ClientConfigProperties .ASYNC_OPERATIONS .getKey (), false );
@@ -175,9 +166,6 @@ private Client(Set<String> endpoints, Map<String,String> configuration, boolean
175
166
this .sharedOperationExecutor = sharedOperationExecutor ;
176
167
}
177
168
178
- this .columnToMethodMatchingStrategy = columnToMethodMatchingStrategy ;
179
-
180
-
181
169
// Transport
182
170
ImmutableList .Builder <Endpoint > tmpEndpoints = ImmutableList .builder ();
183
171
boolean initSslContext = false ;
@@ -1243,7 +1231,6 @@ public boolean ping(long timeout) {
1243
1231
* @param schema - correlating table schema
1244
1232
*/
1245
1233
public synchronized void register (Class <?> clazz , TableSchema schema ) {
1246
- LOG .debug ("Registering POJO: {}" , clazz .getName ());
1247
1234
String schemaKey ;
1248
1235
if (schema .getTableName () != null && schema .getQuery () == null ) {
1249
1236
schemaKey = schema .getTableName ();
@@ -1253,55 +1240,9 @@ public synchronized void register(Class<?> clazz, TableSchema schema) {
1253
1240
throw new IllegalArgumentException ("Table schema has both query and table name set. Only one is allowed." );
1254
1241
}
1255
1242
tableSchemaCache .put (schemaKey , schema );
1243
+ tableSchemaHasDefaults .put (schemaKey , schema .hasDefaults ());
1256
1244
1257
- ColumnToMethodMatchingStrategy matchingStrategy = columnToMethodMatchingStrategy ;
1258
-
1259
- //Create a new POJOSerializer with static .serialize(object, columns) methods
1260
- Map <String , Method > classGetters = new HashMap <>();
1261
- Map <String , Method > classSetters = new HashMap <>();
1262
- for (Method method : clazz .getMethods ()) {//Clean up the method names
1263
- if (matchingStrategy .isGetter (method .getName ())) {
1264
- String methodName = matchingStrategy .normalizeMethodName (method .getName ());
1265
- classGetters .put (methodName , method );
1266
- } else if (matchingStrategy .isSetter (method .getName ())) {
1267
- String methodName = matchingStrategy .normalizeMethodName (method .getName ());
1268
- classSetters .put (methodName , method );
1269
- }
1270
- }
1271
-
1272
- Map <String , POJOSerializer > schemaSerializers = new HashMap <>();
1273
- Map <String , POJOSetter > schemaDeserializers = new ConcurrentHashMap <>();
1274
- boolean defaultsSupport = schema .hasDefaults ();
1275
- tableSchemaHasDefaults .put (schemaKey , defaultsSupport );
1276
- for (ClickHouseColumn column : schema .getColumns ()) {
1277
- String propertyName = columnToMethodMatchingStrategy .normalizeColumnName (column .getColumnName ());
1278
- Method getterMethod = classGetters .get (propertyName );
1279
- if (getterMethod != null ) {
1280
- schemaSerializers .put (column .getColumnName (), (obj , stream ) -> {
1281
- Object value = getterMethod .invoke (obj );
1282
-
1283
- if (RowBinaryFormatSerializer .writeValuePreamble (stream , defaultsSupport , column , value )) {
1284
- SerializerUtils .serializeData (stream , value , column );
1285
- }
1286
- });
1287
- } else {
1288
- LOG .warn ("No getter method found for column: {}" , propertyName );
1289
- }
1290
-
1291
- // Deserialization stuff
1292
- Method setterMethod = classSetters .get (propertyName );
1293
- if (setterMethod != null ) {
1294
- schemaDeserializers .put (column .getColumnName (), SerializerUtils .compilePOJOSetter (setterMethod , column ));
1295
- } else {
1296
- LOG .warn ("No setter method found for column: {}" , propertyName );
1297
- }
1298
- }
1299
-
1300
- Map <String , Map <String , POJOSerializer >> classSerializers = serializers .computeIfAbsent (clazz , k -> new HashMap <>());
1301
- Map <String , Map <String , POJOSetter >> classDeserializers = deserializers .computeIfAbsent (clazz , k -> new HashMap <>());
1302
-
1303
- classSerializers .put (schemaKey , schemaSerializers );
1304
- classDeserializers .put (schemaKey , schemaDeserializers );
1245
+ pojoSerDe .registerClass (clazz , schema );
1305
1246
}
1306
1247
1307
1248
/**
@@ -1367,14 +1308,14 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1367
1308
throw new IllegalArgumentException ("Table schema not found for table: " + tableName + ". Did you forget to register it?" );
1368
1309
}
1369
1310
//Lookup the Serializer for the POJO
1370
- Map <String , POJOSerializer > classSerializers = serializers . getOrDefault (data .get (0 ).getClass (), Collections . emptyMap ())
1371
- . getOrDefault ( tableName , Collections . emptyMap () );
1372
- List <POJOSerializer > serializersForTable = new ArrayList <>();
1311
+ Map <String , POJOFieldSerializer > classSerializers = pojoSerDe . getFieldSerializers (data .get (0 ).getClass (),
1312
+ tableSchema );
1313
+ List <POJOFieldSerializer > serializersForTable = new ArrayList <>();
1373
1314
for (ClickHouseColumn column : tableSchema .getColumns ()) {
1374
1315
if (column .hasDefault () && column .getDefaultValue () != ClickHouseColumn .DefaultValue .DEFAULT ) {
1375
1316
continue ;
1376
1317
}
1377
- POJOSerializer serializer = classSerializers .get (column .getColumnName ());
1318
+ POJOFieldSerializer serializer = classSerializers .get (column .getColumnName ());
1378
1319
if (serializer == null ) {
1379
1320
throw new IllegalArgumentException ("No serializer found for column '" + column .getColumnName () + "'. Did you forget to register it?" );
1380
1321
}
@@ -1405,7 +1346,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
1405
1346
out .write (" \n " .getBytes ());
1406
1347
for (Object obj : data ) {
1407
1348
1408
- for (POJOSerializer serializer : serializersForTable ) {
1349
+ for (POJOFieldSerializer serializer : serializersForTable ) {
1409
1350
try {
1410
1351
serializer .serialize (obj , out );
1411
1352
} catch (InvocationTargetException | IllegalAccessException | IOException e ) {
@@ -1918,9 +1859,7 @@ public <T> List<T> queryAll(String sqlQuery, Class<T> clazz, TableSchema schema)
1918
1859
*/
1919
1860
@ SuppressWarnings ("unchecked" )
1920
1861
public <T > List <T > queryAll (String sqlQuery , Class <T > clazz , TableSchema schema , Supplier <T > allocator ) {
1921
- Map <String , POJOSetter > classDeserializers = deserializers .getOrDefault (clazz ,
1922
- Collections .emptyMap ()).getOrDefault (schema .getTableName () == null ?
1923
- schema .getQuery () : schema .getTableName (), Collections .emptyMap ());
1862
+ Map <String , POJOFieldDeserializer > classDeserializers = pojoSerDe .getFieldDeserializers (clazz , schema );
1924
1863
1925
1864
if (classDeserializers .isEmpty ()) {
1926
1865
throw new IllegalArgumentException ("No deserializers found for the query and class '" + clazz + "'. Did you forget to register it?" );
0 commit comments