flink-sql读取kafka数据(json)

报错:Could not find any factories that implement ‘org.apache.flink.table.factories.DeserializationFormatFactory’ in the classpath.

at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.orgapacheapacheapacheflinktabletabletableplannercalcitecalcitecalciteFlinkPlannerImpl$rel(FlinkPlannerImpl.scala:164)atorg.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)atorg.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)atorg.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)atKafkaTest.main(KafkaTest.java:77)Causedby:org.apache.flink.table.api.ValidationException:Couldnotfindanyfactoriesthatimplement′org.apache.flink.table.factories.DeserializationFormatFactory′intheclasspath.atorg.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)atorg.apache.flink.table.factories.FactoryUtilrel(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at KafkaTest.main(KafkaTest.java:77) Caused by: org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath. at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229) at org.apache.flink.table.factories.FactoryUtilrel(FlinkPlannerImpl.scala:164)atorg.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:524)atorg.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)atorg.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)atorg.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)atKafkaTest.main(KafkaTest.java:77)Causedby:org.apache.flink.table.api.ValidationException:Couldnotfindanyfactoriesthatimplementorg.apache.flink.table.factories.DeserializationFormatFactoryintheclasspath.atorg.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)atorg.apache.flink.table.factories.FactoryUtilTableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)
at org.apache.flink.table.factories.FactoryUtilTableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)atorg.apache.flink.table.factories.FactoryUtilTableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426) at org.apache.flink.table.factories.FactoryUtilTableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)atorg.apache.flink.table.factories.FactoryUtilTableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
… 20 more

原因:没有引入flink-json的依赖
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.11.2</version>
        </dependency>

参考: https://blog.csdn.net/weixin_44056920/article/details/109499099
官网:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/formats/json.html
不过官网说是内置,不清楚是什么意思
在这里插入图片描述


创建 TableEnvironment时create飘红报错

Static interface method calls are not supported at language level ‘5’

StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);

在这里插入图片描述

解决办法:

pom中添加:

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>


报错:No ExecutorFactory found to execute the application

Exception in thread “main” org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:721)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at KafkaTest.main(KafkaTest.java:77)
Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1801)
at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:705)
… 3 more

解决办法

pom中引入

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.11.2</version>
        </dependency>



报错:SQL parse failed. Bang equal ‘!=’ is not allowed under the current SQL conformance level

SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread “main” org.apache.flink.table.api.SqlParserException: SQL parse failed. Bang equal ‘!=’ is not allowed under the current SQL conformance level
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at KafkaTest.main(KafkaTest.java:84)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Bang equal ‘!=’ is not allowed under the current SQL conformance level
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
… 3 more
Caused by: org.apache.calcite.runtime.CalciteException: Bang equal ‘!=’ is not allowed under the current SQL conformance level
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.calcite.runtime.ResourcesExInstWithCause.ex(Resources.java:457)atorg.apache.calcite.runtime.ResourcesExInstWithCause.ex(Resources.java:457) at org.apache.calcite.runtime.ResourcesExInstWithCause.ex(Resources.java:457)atorg.apache.calcite.runtime.ResourcesExInst.ex(Resources.java:550)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.BinaryRowOperator(FlinkSqlParserImpl.java:27424)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression2(FlinkSqlParserImpl.java:17007)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.Expression(FlinkSqlParserImpl.java:16758)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.WhereOpt(FlinkSqlParserImpl.java:12927)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlSelect(FlinkSqlParserImpl.java:6914)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQuery(FlinkSqlParserImpl.java:658)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.LeafQueryOrExpr(FlinkSqlParserImpl.java:16741)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.QueryOrExpr(FlinkSqlParserImpl.java:16204)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.OrderedQueryOrExpr(FlinkSqlParserImpl.java:532)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.RichSqlInsert(FlinkSqlParserImpl.java:5515)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3271)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
… 5 more

解决办法: 把 sql中的 != 改成 <>
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐