tonglin0325的个人主页

MySQL学习笔记——binlog

1.docker部署MySQL

amd64的机器可以使用centos的MySQL5.7的镜像:https://hub.docker.com/r/centos/mysql-57-centos7/

arm64和amd64的机器也可以使用MySQL8.0的镜像:https://hub.docker.com/layers/library/mysql/8.0.29/images/sha256-44f98f4dd825a945d2a6a4b7b2f14127b5d07c5aaa07d9d232c2b58936fb76dc

启动MySQL5.7的容器

1
2
docker run --name mysqltest -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.44

启动MySQL8.0的容器

1
2
docker run --name mysqltest -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0.29

如果想指定mysql配置和data挂载路径,可以先进入容器中将mysql的配置先拷贝出来

进入容器查看MySQL的配置路径

1
2
3
4
sh-4.4# mysql --help | grep my.cnf
order of preference, my.cnf, $MYSQL_TCP_PORT,
/etc/my.cnf /etc/mysql/my.cnf /usr/etc/my.cnf ~/.my.cnf

参考:Docker安装MySQL 并挂载数据及配置文件,设置远程访问权限

将配置/etc/my.cnf拷贝到宿主机

1
2
docker cp mysqltest:/etc/my.cnf /Users/lintong/Downloads/mysql8.0/config/

指定mysql配置和data挂载路径启动docker mysql

mysql8.0

1
2
3
4
5
docker run --name mysqltest \
-v /Users/lintong/Downloads/mysql8.0/config/my.cnf:/etc/my.cnf \
-v /Users/lintong/Downloads/mysql8.0/data:/var/lib/mysql \
-p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0.29

mysql5.7

1
2
3
4
5
docker run --name mysqltest \
-v /Users/lintong/Downloads/mysql5.7/config/my.cnf:/etc/my.cnf \
-v /Users/lintong/Downloads/mysql5.7/data:/var/lib/mysql \
-p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7.44

2.开启binlog

查看binlog是否开启,MySQL8.0默认是开启的

1
2
3
4
5
6
7
8
9
10
11
12
mysql>  show variables like '%log_bin%';
+---------------------------------+--------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------+
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+--------------------------------+

低版本默认是关闭的

如果是mysql5.7的话,需要在my.cnf配置中添加如下配置,参考:MySQL-开启binlog

1
2
[mysqld]<br />log-bin=mysql-bin<br />server-id=1

其他配置

1
2
3
4
5
6
7
8
9
10
11
#设置日志格式
binlog_format = mixed
#设置binlog清理时间
expire_logs_days = 5
#binlog每个日志文件大小
max_binlog_size = 50m
#binlog缓存大小
binlog_cache_size = 4m
#最大binlog缓存大小
max_binlog_cache_size = 512m

参考:Docker内部MySQL开启binlog日志

可以在data目录下看到生成的binlog文件

使用命令查看binlog列表

1
2
3
4
5
6
7
8
9
10
mysql> show binary logs;
+------------------+-----------+
| Log_name | File_size |
+------------------+-----------+
| mysql-bin.000001 | 177 |
| mysql-bin.000002 | 2947794 |
| mysql-bin.000003 | 154 |
+------------------+-----------+
3 rows in set (0.01 sec)

查看当前记录的binlog文件的文件名和偏移

1
2
3
4
5
6
7
8
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000004 | 154 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

可以使用mysqlbinlog命令来查看binlog,如果想镜像中自带mysqlbinlog命令,可以使用debian的镜像,比如

1
2
3
4
5
docker run --name mysqltest \
-v /Users/lintong/Downloads/mysql5.7/config/my.cnf:/etc/my.cnf \
-v /Users/lintong/Downloads/mysql5.7/data:/var/lib/mysql \
-p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7-debian

查看指定binlog文件的内容,可以看到这里先创建了一个test database,然后create了一张名为user的表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
mysql> show binlog events in 'mysql-bin.000005';
+------------------+-----+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
+------------------+-----+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| mysql-bin.000005 | 4 | Format_desc | 1 | 123 | Server ver: 5.7.42-log, Binlog ver: 4 |
| mysql-bin.000005 | 123 | Previous_gtids | 1 | 154 | |
| mysql-bin.000005 | 154 | Anonymous_Gtid | 1 | 219 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000005 | 219 | Query | 1 | 313 | create database test |
| mysql-bin.000005 | 313 | Anonymous_Gtid | 1 | 378 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |
| mysql-bin.000005 | 378 | Query | 1 | 675 | use `test`; create table user
(
id bigint unsigned auto_increment comment ''
primary key,
username varchar(128) not null comment '',
email varchar(128) not null comment ''
)
comment '' charset = utf8mb4 |
+------------------+-----+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
6 rows in set (0.00 sec)

查看binlog的格式,binlog的格式有3种,分别为STATEMENT,ROW和MIXED

1
2
3
4
5
6
7
8
mysql> SHOW VARIABLES LIKE '%binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.01 sec)

可以使用mysqlbinlog命令将binlog导成sql文件

1
2
mysqlbinlog --no-defaults --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000005 > /tmp/binlog005.sql

也可以指定开始和结束时间来导出binlog,或者指定position

1
2
mysqlbinlog --no-defaults --base64-output=decode-rows -v --start-datetime='2024-08-16 00:00:00' --stop-datetime='2024-08-16 23:00:00' /var/lib/mysql/mysql-bin.000005 > /tmp/binlog005.sql

查看mysql的时区,可以看出使用的是UTC时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
mysql> show variables like '%time_zone%';
+------------------+--------+
| Variable_name | Value |
+------------------+--------+
| system_time_zone | UTC |
| time_zone | SYSTEM |
+------------------+--------+
2 rows in set (0.02 sec)

mysql> select now();
+---------------------+
| now() |
+---------------------+
| 2024-08-17 15:29:48 |
+---------------------+
1 row in set (0.00 sec)

全文 >>

Flink学习笔记——内存调优

flink内存分布

task manager

参考:Flink重点难点:Flink任务综合调优(Checkpoint/反压/内存)

1.堆外内存不足:java.lang.OutOfMemoryError: Direct buffer memory

报错如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. <br />The direct memory can be allocated by user code or some of its dependencies. <br />In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. <br />The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. <br />In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:695)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:247)
at sun.nio.ch.IOUtil.write(IOUtil.java:58)
at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
at java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
at java.nio.channels.Channels.writeFully(Channels.java:101)
at java.nio.channels.Channels.access$000(Channels.java:61)
at java.nio.channels.Channels$1.write(Channels.java:174)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.security.DigestOutputStream.write(DigestOutputStream.java:145)
at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.write(MultipartUploadOutputStream.java:172)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.lambda$write$0(SizeAwareFSDataOutputStream.java:58)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:106)
at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.java:124)
at org.apache.hudi.common.fs.SizeAwareFSDataOutputStream.write(SizeAwareFSDataOutputStream.java:55)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:63)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:175)
at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:404)
at org.apache.hudi.io.HoodieAppendHandle.close(HoodieAppendHandle.java:439)
at org.apache.hudi.io.FlinkAppendHandle.close(FlinkAppendHandle.java:99)
at org.apache.hudi.execution.ExplicitWriteHandler.closeOpenHandle(ExplicitWriteHandler.java:62)
at org.apache.hudi.execution.ExplicitWriteHandler.finish(ExplicitWriteHandler.java:52)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:41)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

可能需要调整的是taskmanager的内存参数, taskmanager.memory.task.off-heap.size 或者 taskmanager.memory.framework.off-heap.size,在启动flink session cluster的时候添加如下配置

需要注意的是,需要在启动session cluster的时候配置-D参数,在flink run的时候添加内存参数是无法生效的

1
2
/usr/lib/flink/bin/yarn-session.sh -s 1 -jm 51200 -tm 51200 -qu data -D taskmanager.memory.task.off-heap.size=4G -D taskmanager.memory.framework.off-heap.size=4G --detached

点到task manager的页面查看,配置的4G内存已经生效

这是由于flink off-heap size默认只有128M,需要进行调整,如下

参考:Flink 运行错误 java.lang.OutOfMemoryError: Direct buffer memory

其他调优:Flink性能调优

在flink cdc写hudi的场景下,建议使用BUCKET index type替换默认的FLINK STATE index type,FLINK STATE index type是in-memory的,十分消耗内存

参考:HUDI-0.11.0 BUCKET index on Flink 新特性试用

全文 >>

使用joda-time处理时间

引入joda-time

1
2
3
4
5
6
7
<!--jodatime-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10</version>
</dependency>

1.字符串转joda-time的DateTime

parse日期

1
2
3
DateTimeFormatter fmt = DateTimeFormat.forPattern("yyyy-MM-dd");
DateTime dateTime = DateTime.parse(date, fmt);

parse时间戳

1
2
3
DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
DateTime dt = DateTime.parse(dateStr, format);

转换时区

1
2
3
DateTimeFormatter format = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss yyyy").withLocale(Locale.ENGLISH);
DateTime dt = DateTime.parse(dateStr, format);

2.joda-time的DateTime转字符串

1
2
3
4
String date = dt.toString("yyyy-MM-dd");
String datetime = dt.toString("yyyy-MM-dd HH:mm:ss");
String datetime = dt.toString("yyyy-MM-dd HH:00:00"); // 整点

3.获取当前时间

1
2
DateTime now = new DateTime();

4.比较2个DateTime的时间差

1
2
Days.daysBetween(dateTime, now).getDays() > 7

5.jodatime添加时区

1
2
DateTime now = new DateTime(DateTimeZone.UTC);

Java泛型

泛型就是指在对象建立时不指定类中属性的具体类型,而由外部在声明及实例化对喜爱时指定类型。

在泛型的指定中无法指定基本数据类型的,必须设置成一个类,这样在设置一个数字时就必须使用包装类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Point<T>{		//此处T可以是任意的标识符号,T是type的简称
private T var; //此变量的类型由外部决定

public T getVar() { //返回值的类型由外部决定
return var;
}

public void setVar(T var) { //设置的类型由外部指定
this.var = var;
}
}

public class Generics_demo {

public static void main(String[] args) {
// TODO 自动生成的方法存根
// Point<Integer> p = new Point<Integer>(); //里面的var类型为Integer类型
// p.setVar(30); //设置数字,自动装箱
// System.out.println(p.getVar()*2); //计算结果,按数字取出
Point<String> p = new Point<String>(); //里面的var类型为Integer类型
p.setVar("张三"); //设置数字,自动装箱
System.out.println(p.getVar().length()); //计算结果,按数字取出
}

}

全文 >>

Java同步synchronized与死锁

多个线程要操作同一资源时就有可能出现资源的同步问题

同步就是指多个操作在同一个时间段内只能有一个线程进行,其他线程要等待此线程完成之后才可以继续执行。

解决资源共享的同步操作,可以使用同步代码块同步方法两种方式完成。

全文 >>

CDH学习笔记——角色组

1.对于机型不同的机器,可以通过角色组来进行统一归类管理

比如对于HDFS组件,有的机型的磁盘为12块,有的机型的磁盘为16块,那么可以通过角色组将配置一致的机器分到一起

在HDFS组件下,选择实例

全文 >>