【flink番外篇】21、Flink 通过SQL client 和 table api注册catalog示例

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、创建 Flink 表并将其注册到 Catalog
    • 1、使用 SQL DDL
    • 2、maven依赖
    • 3、使用 Table API 创建hive表并注册到hivecatalog示例
    • 4、使用 SQL语句 创建hive表并注册到hivecatalog示例
    • 5、验证
      • 1)、打包、上传
      • 2)、提交任务
      • 3)、验证

本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,其他依赖如下:

hadoop的版本是3.1.4

hive的版本是3.1.2

flink的环境版本是1.3.6

一、创建 Flink 表并将其注册到 Catalog

1、使用 SQL DDL

用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。

JdbcCatalog不能创建库或表,官方示例写的不明确;hivecatalog可以创建表。

本示例是以mysql为基础,flink 版本为1.17。

// the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);
-----Jdbccatalog不能创建表,hivecatalog可以创建表----
Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);

Flink SQL> SHOW TABLES;
mytable

-----------------------具体示例如下-----------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
>     'type' = 'jdbc',
>     'default-database' = 'test?useSSL=false',
>     'username' = 'root',
>     'password' = 'root',
>     'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
|    alan_catalog |
| default_catalog |
+-----------------+
2 rows in set

Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
|          azkaban |
|          cdhhive |
|           cdhhue |
......
| spring_boot_plus |
|   springbootmall |
|             test |
|           zipkin |
+------------------+
29 rows in set

Flink SQL> use test;
[INFO] Execute statement succeed.

Flink SQL> show tables;

+------------------------------+
|                   table name |
+------------------------------+
|                  permissions |
|                       person |
|                   personinfo |
|                         role |
|                         user |
+------------------------------+
34 rows in set

Flink SQL> select * from person;

Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.

Flink SQL> select * from person;

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 测试修改go语言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再试一试 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     测试go语言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows

2、maven依赖

		UTF-8
		UTF-8
		1.8
		1.8
		1.8
		2.12
		1.13.6



	
		jdk.tools
		jdk.tools
		1.8
		system
		${JAVA_HOME}/lib/tools.jar
	
	
		org.apache.flink
		flink-clients_2.12
		${flink.version}
	
	
		org.apache.flink
		flink-scala_2.12
		${flink.version}
	
	
		org.apache.flink
		flink-java
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-streaming-scala_2.12
		${flink.version}
	
	
		org.apache.flink
		flink-streaming-java_2.12
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-table-api-scala-bridge_2.12
		${flink.version}
	
	
		org.apache.flink
		flink-table-api-java-bridge_2.12
		${flink.version}
	
	
	
		org.apache.flink
		flink-table-planner-blink_2.12
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-table-common
		${flink.version}
	
	
	
		org.apache.flink
		flink-connector-kafka_2.12
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-sql-connector-kafka_2.12
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-connector-jdbc_2.12
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-csv
		${flink.version}
	
	
		org.apache.flink
		flink-json
		${flink.version}
	

	
		org.apache.flink
		flink-connector-hive_2.12
		${flink.version}
		provided
	
	
		org.apache.hive
		hive-metastore
		2.1.0
	
	
		org.apache.hive
		hive-exec
		3.1.2
		provided
	

	
		org.apache.flink
		flink-shaded-hadoop-2-uber
		2.7.5-10.0
		provided
	

	
		mysql
		mysql-connector-java
		5.1.38
		provided
		<!--8.0.20 -->
	

	
	
		org.slf4j
		slf4j-log4j12
		1.7.7
		runtime
	
	
		log4j
		log4j
		1.2.17
		runtime
	

	
		com.alibaba
		fastjson
		1.2.44
	

	
		org.projectlombok
		lombok
		1.18.2
		provided
	



3、使用 Table API 创建hive表并注册到hivecatalog示例

用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。

下文示例是以hivecatalog为例,关于更多的hivecatalog将在其他的专题中介绍。

需要说明的是本示例运行时需要将hadoop环境中的/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar复制一份到flink的lib目录(/usr/local/bigdata/flink-1.13.5/lib),此处做法的原因是本人的hadoop环境中配置了lzo的压缩方式。

hadoop的版本是3.1.4

hive的版本是3.1.2

flink的环境版本是1.3.6

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */
public class TestHiveCatalogDemo {

	/**
	 * @param args
	 * @throws DatabaseNotExistException
	 * @throws CatalogException
	 * @throws DatabaseAlreadyExistException
	 * @throws TableAlreadyExistException
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String name = "alan_hive";
		// testhive 数据库名称
		String defaultDatabase = "testhive";
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		// 使用注册的catalog
		tenv.useCatalog("alan_hive");

		List tables = hiveCatalog.listTables(defaultDatabase);
		for (String table : tables) {
			System.out.println("Database:testhive  tables:" + table);
		}
//创建数据库
//	    public CatalogDatabaseImpl(Map properties, @Nullable String comment) {
//	        this.properties = checkNotNull(properties, "properties cannot be null");
//	        this.comment = comment;
//	    }
		Map properties = new HashMap();
//		properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
//		properties.put("connector", "COLLECTION");
		CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
		String newDatabaseName = "alan_hivecatalog_hivedb";

		hiveCatalog.createDatabase(newDatabaseName, cd, true);
//创建表
		String tableName = "alan_hivecatalog_hivedb_testTable";
		// public ObjectPath(String databaseName, String objectName)
		ObjectPath path = new ObjectPath(newDatabaseName, tableName);

//		public CatalogTableImpl( TableSchema tableSchema, Map properties, String comment) 
		TableSchema schema = TableSchema.builder()
				.field("id", DataTypes.INT())
				.field("name", DataTypes.STRING())
				.field("age", DataTypes.INT())
				.build();
		
//		public CatalogTableImpl(TableSchema tableSchema, Map properties, String comment) 
		CatalogTable catalogTable = new CatalogTableImpl(schema, properties, "this is table comment");
		hiveCatalog.createTable(path, catalogTable, true);

		List newTables = hiveCatalog.listTables(newDatabaseName);
		for (String table : newTables) {
			System.out.println("Database:alan_hivecatalog_hivedb  tables:" + table);
		}
		
//插入数据
		String insertSQL = "insert into " + newDatabaseName + "." + tableName + " values (1,'alan',18)";
		tenv.executeSql(insertSQL);
// 查询数据
		String selectSQL = "select * from " + newDatabaseName + "." + tableName;
		Table table = tenv.sqlQuery(selectSQL);
		table.printSchema();
		DataStream<Tuple2> result = tenv.toRetractStream(table, Row.class);
		result.print();
		env.execute();
	}

}

4、使用 SQL语句 创建hive表并注册到hivecatalog示例

本示例功能与上述的示例功能一样,其区别是使用的实现方式不同,即一个是通过api建表,一个是通过sql建表。

import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */
public class TestCreateHiveTable {
	public static final String tableName = "alan_hivecatalog_hivedb_testTable";
	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
																					  "  id INT,\n" + 
																					  "  name STRING,\n" + 
																					  "  age INT" + ") " + 
																					  "TBLPROPERTIES (\n" + 
																					  "  'sink.partition-commit.delay'='5 s',\n" + 
																					  "  'sink.partition-commit.trigger'='partition-time',\n" + 
																					  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";

	/**
	 * @param args
	 * @throws DatabaseAlreadyExistException
	 * @throws CatalogException
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
		String name = "alan_hive";
		// default 数据库名称
		String defaultDatabase = "default";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		tenv.useCatalog("alan_hive");

//		Map properties = new HashMap();
//		CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
		String newDatabaseName = "alan_hivecatalog_hivedb";
//		if (hiveCatalog.databaseExists(newDatabaseName)) {
//			hiveCatalog.dropDatabase(newDatabaseName, true);
//		}
//		hiveCatalog.createDatabase(newDatabaseName, cd, true);
		tenv.useDatabase(newDatabaseName);

		// 创建表
		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
//		if(hiveCatalog.tableExists( new ObjectPath(newDatabaseName, tableName))) {
//			hiveCatalog.dropTable( new ObjectPath(newDatabaseName, tableName), true);
//		}
		tenv.executeSql(hive_create_table_sql);

		// 插入数据
//		String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
		String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
		tenv.executeSql(insertSQL);

		// 查询数据
//		String selectSQL = "select * from " + tableName;
		String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
		Table table = tenv.sqlQuery(selectSQL);
		table.printSchema();
		DataStream<Tuple2> result = tenv.toRetractStream(table, Row.class);
		result.print();
		env.execute();
	}

}

5、验证

本示例是在flink集群中以命令形式提交的任务,其实通过web ui页面提交任务一样,不再赘述。

前提:

1、hadoop环境好用

2、hive环境好用

3、flink与hive集成环境完成且好用

4、启动flink集群,本文是以yarn-session形式启动的

1)、打包、上传

pom.xml文件中配置打包插件

	src/main/java
	
		
		
			org.apache.maven.plugins
			maven-compiler-plugin
			3.5.1
			
				1.8
				1.8
				<!--${project.build.sourceEncoding} -->
			
		
		
			org.apache.maven.plugins
			maven-surefire-plugin
			2.18.1
			
				false
				true
				
					**/*Test.*
					**/*Suite.*
				
			
		
		
		
			org.apache.maven.plugins
			maven-shade-plugin
			2.3
			
				
					package
					
						shade
					
					
						
							
								*:*
								
									
									META-INF/*.SF
									META-INF/*.DSA
									META-INF/*.RSA
								
							
						
						
							
								
								 org.table_sql.TestCreateHiveTable
							
						
					
				
			
		
	

在cmd中打包或在开发工具中打包,本处是以cmd命令行打包

mvn package  -Dmaven.test.skip=true

# 直到看到
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  18.304 s

将打包后的jar文件上传至flink集群中并运行即可。

2)、提交任务

#文件位置 /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar
#如果配置了flink的环境变量直接运行下面的命令;如果没有配置flink的环境变量则需要切换到flink的bin目录运行下面命令
flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar  org.table_sql.TestHiveCatalogDemo

3)、验证

# 1、提交任务后运行情况
[alanchan@server1 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar  org.table_sql.TestHiveCatalogDemo
2023-08-31 00:18:01,185 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
2023-08-31 00:18:01,185 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
Hive Session ID = 4c3ab8b5-d99e-4e2f-9362-fcbcae8047fa
Hive Session ID = d3fc6679-9b60-47a9-b9e7-d125e3240196
2023-08-31 00:18:07,578 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/usr/local/bigdata/flink-1.13.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-08-31 00:18:07,778 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:07,787 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:07,860 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 2161b431ad0310df06417a3232ca5e60
Hive Session ID = 90444eb0-7fc9-4ac9-adb1-44df145739c7
(
  `id` INT,
  `name` STRING,
  `age` INT
)
2023-08-31 00:18:17,806 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2023-08-31 00:18:17,871 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 0
2023-08-31 00:18:18,115 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:18,116 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:18,119 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 16a85c80862dac9035c62563b39a9fb7
Program execution finished
Job with JobID 16a85c80862dac9035c62563b39a9fb7 has finished.
Job Runtime: 6652 ms

# 2、在flink sql cli中查询表及其数据
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.

Flink SQL> select * from alan_hivecatalog_hivedb_testtable;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                           alan |          18 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row

#以上,验证完毕

以上,本文演示了Flink 将表注册到catalog中,其中用sql client展示了连接mysql,通过table api 和sql 演示了将表注册到hivecatalog中。

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/0c82c97aea.html