flinkdemo(flink的默认端口)

本篇文章给大家谈谈flinkdemo,以及flink的默认端口对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

flink中可以实现每n秒执行一个方法的定时任务吗?使用Java自己的定时操作是失效的

是的,Flink中可以使用定时器(Timer)来实现每n秒执行一个方法的定时任务。Flink的定时器分为两种配慧类型:EventTime Timer和ProcessingTime Timer。

其中,EventTime Timer是基于事件时间的定时器,可以使用在基于事件时间处理的Flink应用中,而ProcessingTime Timer是仔卖旁念橡基于处理时间的定时器,可以使用在基于处理时间处理的Flink应用中。根据需求选择对应的定时器类型即可。

下面是一个使用ProcessingTime Timer实现每n秒执行一个方法的示例代码:

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

public class Main {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

env.socketTextStream("localhost", 9999)

.keyBy(0)

.process(new MyKeyedProcessFunction())

.print();

env.execute("ProcessingTime Timer Demo");

}

public static class MyKeyedProcessFunction extends KeyedProcessFunctionString, String, String {

private transient ValueStateLong lastTriggerTimeState;

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

ValueStateDescriptorLong lastTriggerTimeDescriptor = new ValueStateDescriptor("lastTriggerTime", Long.class);

lastTriggerTimeState = getRuntimeContext().getState(lastTriggerTimeDescriptor);

}

@Override

public void processElement(String value, Context ctx, CollectorString out) throws Exception {

// 每n秒触发一次定时器

long currentTime = ctx.timerService().currentProcessingTime();

long lastTriggerTime = lastTriggerTimeState.value() == null ? 0 : lastTriggerTimeState.value();

long interval = 5000; // 每5秒执行一次

if (currentTime - lastTriggerTime = interval) {

lastTriggerTimeState.update(currentTime);

out.collect("执行定时任务");

}

}

}

}

在上述代码中,我们定义了一个KeyedProcessFunction,并在其中使用了ProcessingTime Timer来实现每5秒执行一次定时任务。每次处理元素时,首先获取当前时间,然后与上次触发定时器的时间进行比较,如果时间间隔超过了设定的值,则执行定时任务,并更新上次触发定时器的时间。

需要注意的是,由于Flink是流式计算框架,定时器是基于时间的,因此需要使用TimeCharacteristic.ProcessingTime来指定使用ProcessingTime来计算定时器触发时间。另外,在使用定时器时需要考虑并发问题,例如使用ValueState来存储上次触发定时器的时间。

[img]

Flink Kafka Doris实耐世此战demo

数昌迅据:

sql:

sql :

deal :

sink-kafka数返谈据 :

pyflink消费kafka-connect-jdbc消息(带schema)

1、数据接入

通过kafka的restFul接口创建连接mysql的连接器并启动。迹山

{

    "name": "mysql_stream_test",

    "config": {

        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

        "timestamp.column.name": "",

        "incrementing.column.name": "ID",

        "connection.password": "",

        "validate.non.null": true,

        "tasks.max": 1,

        "batch.max.rows": 100,

        "table.whitelist": "baseqx.test_demo",

        "mode": "incrementing",

        "topic.prefix": "mysql_",

        "connection.user": "",

        "poll.interval.ms": 5000,

        "numeric.mapping": "best_fit",

        "connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3306/枝州则baseqx?useUnicode=truecharacterEncoding=utf8allowMultiQueries=true"

    }

}

2.kafka-connect创建主题中的猛棚默认数据格式为

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":false,"field":"NAME"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"CREATE_TIME"}],"optional":false,"name":"test_demo"},"payload":{"ID":1,"NAME":"prestoEtl","CREATE_TIME":1606902182000}}

3.使用pyflink消费带schema的消息

#!/usr/bin/python3.7

# -*- coding: UTF-8 -*-

from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

from pyflink.table import StreamTableEnvironment, TableConfig, SqlDialect

s_env = StreamExecutionEnvironment.get_execution_environment()

s_env.set_parallelism(1)

st_env = StreamTableEnvironment.create(s_env, TableConfig())

st_env.get_config().set_python_executable("python3")

st_env.use_catalog("default_catalog")

st_env.use_database("default_database")

# DML上可以固定schema为字符串, 用 ROW 函数封装 payload

ddlKafkaConn = """

create table sourceKafkaConn(

    `scheam`    STRING  comment 'kafkaConn每行模式',

    `payload`  ROW(ID BIGINT,NAME STRING,CREATE_TIME STRING)  comment '行数据'

)comment '从kafkaConnect获取带模式的数据'

with(

    'connector' = 'kafka',

    'topic' = 'mysql_test_demo',       

    'properties.bootstrap.servers' = '192.168.113.11:9092',

    'scan.startup.mode' = 'earliest-offset',

    'format' = 'json'

)

"""

# 'connector.startup-mode' = 'earliest-offset 表示读取最早的消息 | latest-offset 表示读取消息队列中最新的消息',

st_env.execute_sql(ddlKafkaConn)

sinkPrint = '''

    CREATE TABLE sinkPrint WITH ('connector' = 'print')

    LIKE sourceKafkaConn (EXCLUDING ALL)

'''

st_env.execute_sql(sinkPrint)

st_env.execute_sql("SHOW TABLES").print()

st_env.sql_query("select scheam,ROW(ID,NAME,CREATE_TIME) as payload from sourceKafkaConn") \

    .insert_into("sinkPrint")

st_env.execute("pyflink-kafka-v4")

4.执行

4.1pythonpyflink-kafka-v4.py

4.2flinkrun-mxxx.xxx.xxx.xxx:8081-pypyflink-kafka-v4.py

5.执行结果

+-----------------+|tablename|+-----------------

+|sinkPrint|

+|sourceKafkaConn|

+-----------------+

2 rowsinset

+I(null,1,prestoEtl,1606902182000)

+I(null,2,执行的非常好,1606902562000)

+I(null,3,使用flink解析topic的schema,1607070278000)

关于flinkdemo和flink的默认端口的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表