flinkdemo(flink的默认端口)
本篇文章给大家谈谈flinkdemo,以及flink的默认端口对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、flink中可以实现每n秒执行一个方法的定时任务吗?使用Java自己的定时操作是失效的
- 2、Flink Kafka Doris实战demo
- 3、pyflink消费kafka-connect-jdbc消息(带schema)
flink中可以实现每n秒执行一个方法的定时任务吗?使用Java自己的定时操作是失效的
是的,Flink中可以使用定时器(Timer)来实现每n秒执行一个方法的定时任务。Flink的定时器分为两种配慧类型:EventTime Timer和ProcessingTime Timer。
其中,EventTime Timer是基于事件时间的定时器,可以使用在基于事件时间处理的Flink应用中,而ProcessingTime Timer是仔卖旁念橡基于处理时间的定时器,可以使用在基于处理时间处理的Flink应用中。根据需求选择对应的定时器类型即可。
下面是一个使用ProcessingTime Timer实现每n秒执行一个方法的示例代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.socketTextStream("localhost", 9999)
.keyBy(0)
.process(new MyKeyedProcessFunction())
.print();
env.execute("ProcessingTime Timer Demo");
}
public static class MyKeyedProcessFunction extends KeyedProcessFunctionString, String, String {
private transient ValueStateLong lastTriggerTimeState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptorLong lastTriggerTimeDescriptor = new ValueStateDescriptor("lastTriggerTime", Long.class);
lastTriggerTimeState = getRuntimeContext().getState(lastTriggerTimeDescriptor);
}
@Override
public void processElement(String value, Context ctx, CollectorString out) throws Exception {
// 每n秒触发一次定时器
long currentTime = ctx.timerService().currentProcessingTime();
long lastTriggerTime = lastTriggerTimeState.value() == null ? 0 : lastTriggerTimeState.value();
long interval = 5000; // 每5秒执行一次
if (currentTime - lastTriggerTime = interval) {
lastTriggerTimeState.update(currentTime);
out.collect("执行定时任务");
}
}
}
}
在上述代码中,我们定义了一个KeyedProcessFunction,并在其中使用了ProcessingTime Timer来实现每5秒执行一次定时任务。每次处理元素时,首先获取当前时间,然后与上次触发定时器的时间进行比较,如果时间间隔超过了设定的值,则执行定时任务,并更新上次触发定时器的时间。
需要注意的是,由于Flink是流式计算框架,定时器是基于时间的,因此需要使用TimeCharacteristic.ProcessingTime来指定使用ProcessingTime来计算定时器触发时间。另外,在使用定时器时需要考虑并发问题,例如使用ValueState来存储上次触发定时器的时间。
[img]Flink Kafka Doris实战demo
Flink Kafka Doris实耐世此战demo
数昌迅据:
sql:
sql :
deal :
sink-kafka数返谈据 :
pyflink消费kafka-connect-jdbc消息(带schema)
1、数据接入
通过kafka的restFul接口创建连接mysql的连接器并启动。迹山
{
"name": "mysql_stream_test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "",
"incrementing.column.name": "ID",
"connection.password": "",
"validate.non.null": true,
"tasks.max": 1,
"batch.max.rows": 100,
"table.whitelist": "baseqx.test_demo",
"mode": "incrementing",
"topic.prefix": "mysql_",
"connection.user": "",
"poll.interval.ms": 5000,
"numeric.mapping": "best_fit",
"connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州则baseqx?useUnicode=truecharacterEncoding=utf8allowMultiQueries=true"
}
}
2.kafka-connect创建主题中的猛棚默认数据格式为
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}
3.使用pyflink消费带schema的消息
#!/usr/bin/python3.7
# -*- coding: UTF-8 -*-
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode
from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(1)
st_env = StreamTableEnvironment.create(s_env, TableConfig())
st_env.get_config().set_python_executable("python3")
st_env.use_catalog("default_catalog")
st_env.use_database("default_database")
# DML上可以固定schema为字符串, 用 ROW 函数封装 payload
ddlKafkaConn = """
create table sourceKafkaConn(
`scheam` STRING comment 'kafkaConn每行模式',
`payload` ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING) comment '行数据'
)comment '从kafkaConnect获取带模式的数据'
with(
'connector' = 'kafka',
'topic' = 'mysql_test_demo',
'properties.bootstrap.servers' = '192.168.113.11:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
# 'connector.startup-mode' = 'earliest-offset 表示读取最早的消息 | latest-offset 表示读取消息队列中最新的消息',
st_env.execute_sql(ddlKafkaConn)
sinkPrint = '''
CREATE TABLE sinkPrint WITH ('connector' = 'print')
LIKE sourceKafkaConn (EXCLUDING ALL)
'''
st_env.execute_sql(sinkPrint)
st_env.execute_sql("SHOW TABLES").print()
st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \
.insert_into("sinkPrint")
st_env.execute("pyflink-kafka-v4")
4.执行
4.1pythonpyflink-kafka-v4.py
4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py
5.执行结果
+-----------------+|tablename|+-----------------
+|sinkPrint|
+|sourceKafkaConn|
+-----------------+
2 rowsinset
+I(null,1,prestoEtl,1606902182000)
+I(null,2,执行的非常好,1606902562000)
+I(null,3,使用flink解析topic的schema,1607070278000)
关于flinkdemo和flink的默认端口的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。