@@ -139,7 +139,7 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
139
139
* @param pluginLoadMode
140
140
* @return
141
141
*/
142
- public static boolean checkRemoteSqlPluginPath (String remoteSqlPluginPath , String deployMode , String pluginLoadMode ) {
142
+ private static boolean checkRemoteSqlPluginPath (String remoteSqlPluginPath , String deployMode , String pluginLoadMode ) {
143
143
if (StringUtils .isEmpty (remoteSqlPluginPath )) {
144
144
return StringUtils .equalsIgnoreCase (pluginLoadMode , EPluginLoadMode .SHIPFILE .name ())
145
145
|| StringUtils .equalsIgnoreCase (deployMode , ClusterMode .local .name ());
@@ -176,7 +176,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
176
176
}
177
177
178
178
179
- public static List <URL > getExternalJarUrls (String addJarListStr ) throws java .io .IOException {
179
+ private static List <URL > getExternalJarUrls (String addJarListStr ) throws java .io .IOException {
180
180
List <URL > jarUrlList = Lists .newArrayList ();
181
181
if (Strings .isNullOrEmpty (addJarListStr )) {
182
182
return jarUrlList ;
@@ -240,7 +240,7 @@ private static void sqlTranslation(String localSqlPluginPath,
240
240
}
241
241
}
242
242
243
- public static void registerUserDefinedFunction (SqlTree sqlTree , List <URL > jarUrlList , TableEnvironment tableEnv )
243
+ private static void registerUserDefinedFunction (SqlTree sqlTree , List <URL > jarUrlList , TableEnvironment tableEnv )
244
244
throws IllegalAccessException , InvocationTargetException {
245
245
// udf和tableEnv须由同一个类加载器加载
246
246
ClassLoader levelClassLoader = tableEnv .getClass ().getClassLoader ();
@@ -269,9 +269,9 @@ public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrl
269
269
* @return
270
270
* @throws Exception
271
271
*/
272
- public static Set <URL > registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
272
+ private static Set <URL > registerTable (SqlTree sqlTree , StreamExecutionEnvironment env , StreamTableEnvironment tableEnv , String localSqlPluginPath ,
273
273
String remoteSqlPluginPath , String pluginLoadMode , Map <String , SideTableInfo > sideTableMap , Map <String , Table > registerTableCache ) throws Exception {
274
- Set <URL > pluginClassPatshSets = Sets .newHashSet ();
274
+ Set <URL > pluginClassPathSets = Sets .newHashSet ();
275
275
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner ();
276
276
for (TableInfo tableInfo : sqlTree .getTableInfoMap ().values ()) {
277
277
@@ -325,26 +325,26 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
325
325
registerTableCache .put (tableInfo .getName (), regTable );
326
326
327
327
URL sourceTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), SourceTableInfo .SOURCE_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
328
- pluginClassPatshSets .add (sourceTablePathUrl );
328
+ pluginClassPathSets .add (sourceTablePathUrl );
329
329
} else if (tableInfo instanceof TargetTableInfo ) {
330
330
331
331
TableSink tableSink = StreamSinkFactory .getTableSink ((TargetTableInfo ) tableInfo , localSqlPluginPath );
332
332
TypeInformation [] flinkTypes = FunctionManager .transformTypes (tableInfo .getFieldClasses ());
333
333
tableEnv .registerTableSink (tableInfo .getName (), tableInfo .getFields (), flinkTypes , tableSink );
334
334
335
335
URL sinkTablePathUrl = PluginUtil .buildSourceAndSinkPathByLoadMode (tableInfo .getType (), TargetTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
336
- pluginClassPatshSets .add (sinkTablePathUrl );
336
+ pluginClassPathSets .add (sinkTablePathUrl );
337
337
} else if (tableInfo instanceof SideTableInfo ) {
338
338
String sideOperator = ECacheType .ALL .name ().equals (((SideTableInfo ) tableInfo ).getCacheType ()) ? "all" : "async" ;
339
339
sideTableMap .put (tableInfo .getName (), (SideTableInfo ) tableInfo );
340
340
341
341
URL sideTablePathUrl = PluginUtil .buildSidePathByLoadMode (tableInfo .getType (), sideOperator , SideTableInfo .TARGET_SUFFIX , localSqlPluginPath , remoteSqlPluginPath , pluginLoadMode );
342
- pluginClassPatshSets .add (sideTablePathUrl );
342
+ pluginClassPathSets .add (sideTablePathUrl );
343
343
} else {
344
344
throw new RuntimeException ("not support table type:" + tableInfo .getType ());
345
345
}
346
346
}
347
- return pluginClassPatshSets ;
347
+ return pluginClassPathSets ;
348
348
}
349
349
350
350
/**
@@ -353,7 +353,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
353
353
* @param env
354
354
* @param classPathSet
355
355
*/
356
- public static void registerPluginUrlToCachedFile (StreamExecutionEnvironment env , Set <URL > classPathSet ) {
356
+ private static void registerPluginUrlToCachedFile (StreamExecutionEnvironment env , Set <URL > classPathSet ) {
357
357
int i = 0 ;
358
358
for (URL url : classPathSet ) {
359
359
String classFileName = String .format (CLASS_FILE_NAME_FMT , i );
@@ -362,7 +362,7 @@ public static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env,
362
362
}
363
363
}
364
364
365
- public static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws Exception {
365
+ private static StreamExecutionEnvironment getStreamExeEnv (Properties confProperties , String deployMode ) throws Exception {
366
366
StreamExecutionEnvironment env = !ClusterMode .local .name ().equals (deployMode ) ?
367
367
StreamExecutionEnvironment .getExecutionEnvironment () :
368
368
new MyLocalStreamEnvironment ();
0 commit comments