MySQL在ROW模式下如何通过binlog提取SQL语句

96次阅读
没有评论

共计 15518 个字符,预计需要花费 39 分钟才能阅读完成。

丸趣 TV 小编给大家分享一下 MySQL 在 ROW 模式下如何通过 binlog 提取 SQL 语句,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

Linux
基于 row 模式的 binlog,生成 DML(insert/update/delete)的 rollback 语句
通过 mysqlbinlog -v 解析 binlog 生成可读的 sql 文件
提取需要处理的有效 sql
  ### 开头的行. 如果输入的 start-position 位于某个 event group 中间,则会导致 无法识别 event 错误

将 INSERT/UPDATE/DELETE 的 sql 反转, 并且 1 个完整 sql 只能占 1 行
  INSERT: INSERT INTO = DELETE FROM, SET = WHERE
  UPDATE: WHERE = SET, SET = WHERE
  DELETE: DELETE FROM = INSERT INTO, WHERE = SET
用列名替换位置 @{1,2,3}
  通过 desc table 获得列顺序及对应的列名
  特殊列类型 value 做特别处理
逆序

注意:
  表结构与现在的表结构必须相同[谨记]
  由于 row 模式是幂等的,并且恢复是一次性,所以只提取 sql,不提取 BEGIN/COMMIT
  只能对 INSERT/UPDATE/DELETE 进行处理

mysql select * from yoon;
+———-+————+———–+———————+
| actor_id | first_name | last_name | last_update         |
+———-+————+———–+———————+
|        1 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        2 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        3 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        4 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        5 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        6 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        7 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        8 | HANK       | YOON      | 2006-02-15 04:34:33 |
|        9 | HANK       | YOON      | 2006-02-15 04:34:33 |
|       10 | HANK       | YOON      | 2006-02-15 04:34:33 |
|       11 | HANK       | YOON      | 2006-02-15 04:34:33 |
+———-+————+———–+———————+
11 rows in set (0.00 sec)

mysql delete from yoon;
Query OK, 11 rows affected (1.03 sec)

mysql select * from yoon;
Empty set (0.00 sec)

命令之间的空格一定要注意,否则就会无法提取 SQL 语句:
[root@hank-yoon data]# perl binlog-rollback.pl -f mysql-bin.000001 -o /export/data/mysql/data/yoon.sql -u root -p yoon
Warning: Using a password on the command line interface can be insecure.
[root@hank-yoon data]# ls
auto.cnf            hank     ibdata2      ib_logfile1  modify.pl  mysql-bin.000001  performance_schema  test  yoon.sql
binlog-rollback.pl  ibdata1  ib_logfile0  ib_logfile2  mysql      mysql-bin.index   sakila              yoon
[root@hank-yoon data]# cat yoon.sql 
INSERT INTO `yoon`.`yoon` SET `actor_id`=11, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=10, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=9, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=8, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=7, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=6, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=5, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=4, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=3, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=2, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=1, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);

mysql INSERT INTO `yoon`.`yoon` SET `actor_id`=11, `first_name`= HANK , `last_name`= YOON , `last_update`=from_unixtime(1139949273);
Query OK, 1 row affected (0.01 sec)

mysql select * from yoon;
+———-+————+———–+———————+
| actor_id | first_name | last_name | last_update         |
+———-+————+———–+———————+
|       11 | HANK       | YOON      | 2006-02-15 04:34:33 |
+———-+————+———–+———————+

点击 (此处) 折叠或打开

#!/usr/lib/perl -w

use strict;

use warnings;

use Class::Struct;

use Getopt::Long qw(:config no_ignore_case); # GetOption

# register handler system signals

use sigtrap handler , \ sig_int, normal-signals

# catch signal

sub sig_int(){

 my ($signals) = @_;

 print STDERR # Caught SIG$signals.\n

 exit 1;

}

my %opt;

my $srcfile;

my $host = 127.0.0.1

my $port = 3306;

my ($user,$pwd);

my ($MYSQL, $MYSQLBINLOG, $ROLLBACK_DML);

my $outfile = /dev/null

my (%do_dbs,%do_tbs);

# tbname= tbcol, tbcol: @n= colname,type

my %tbcol_pos;

my $SPLITER_COL = ,

my $SQLTYPE_IST = INSERT

my $SQLTYPE_UPD = UPDATE

my $SQLTYPE_DEL = DELETE

my $SQLAREA_WHERE = WHERE

my $SQLAREA_SET = SET

my $PRE_FUNCT = ==========================

# =========================================================

# 基于 row 模式的 binlog,生成 DML(insert/update/delete)的 rollback 语句

# 通过 mysqlbinlog -v 解析 binlog 生成可读的 sql 文件

# 提取需要处理的有效 sql

#   ### 开头的行. 如果输入的 start-position 位于某个 event group 中间,则会导致 无法识别 event 错误

#

# 将 INSERT/UPDATE/DELETE 的 sql 反转, 并且 1 个完整 sql 只能占 1 行

#  INSERT: INSERT INTO = DELETE FROM, SET = WHERE

#  UPDATE: WHERE = SET, SET = WHERE

#  DELETE: DELETE FROM = INSERT INTO, WHERE = SET

# 用列名替换位置 @{1,2,3}

#   通过 desc table 获得列顺序及对应的列名

#   特殊列类型 value 做特别处理

# 逆序

#

# 注意:

#   表结构与现在的表结构必须相同[谨记]

#   由于 row 模式是幂等的,并且恢复是一次性,所以只提取 sql,不提取 BEGIN/COMMIT

#   只能对 INSERT/UPDATE/DELETE 进行处理

# ========================================================

sub main{

 # get input option

  get_options();

 #

  init_tbcol();

 #

  do_binlog_rollback();

}

main();

# —————————————————————————————-

# Func : get options and set option flag

# —————————————————————————————-

sub get_options{

 #Get options info

 GetOptions(\%opt,

  help , # OUT : print help info

  f|srcfile=s , # IN : binlog file

  o|outfile=s , # out : output sql file

  h|host=s , # IN : host

  u|user=s , # IN : user

  p|password=s , # IN : password

  P|port=i , # IN : port

  start-datetime=s , # IN : start datetime

  stop-datetime=s , # IN : stop datetime

  start-position=i , # IN : start position

  stop-position=i , # IN : stop position

  d|database=s , # IN : database, split comma

  T|table=s , # IN : table, split comma

  i|ignore , # IN : ignore binlog check ddl and so on

  debug , # IN : print debug information

  ) or print_usage();

 if (!scalar(%opt)) {

  print_usage();

 }

 # Handle for options

 if ($opt{ f}){

 $srcfile = $opt{f

 }else{

  merror(please input binlog file

 }

 $opt{h} and $host = $opt{h

 $opt{u} and $user = $opt{u

 $opt{p} and $pwd = $opt{p

 $opt{P} and $port = $opt{P

 if ($opt{ o}) {

 $outfile = $opt{o

 # 清空 outfile

 `echo $outfile`;

 }

 #

 $MYSQL = qq{mysql -h$host -u$user -p $pwd -P$port};

  mdebug(get_options::MYSQL\n\t$MYSQL

 # 提取 binlog, 不需要显示列定义信息,用 -v,而不用 -vv

 $MYSQLBINLOG = qq{mysqlbinlog -v};

 $MYSQLBINLOG .= –start-position= .$opt{start-position} if $opt{start-position

 $MYSQLBINLOG .= –stop-position= .$opt{stop-position} if $opt{stop-postion

 $MYSQLBINLOG .= –start-datetime= .$opt{start-datetime}. if $opt{start-datetime

 $MYSQLBINLOG .= –stop-datetime= $opt{stop-datetime} if $opt{stop-datetime

 $MYSQLBINLOG .= $srcfile

  mdebug(get_options::MYSQLBINLOG\n\t$MYSQLBINLOG

 # 检查 binlog 中是否含有 ddl sql: CREATE|ALTER|DROP|RENAME

  check_binlog() unless ($opt{ i

 # 不使用 mysqlbinlog 过滤,USE dbname; 方式可能会漏掉某些 sql,所以不在 mysqlbinlog 过滤

 # 指定数据库

 if ($opt{ d}){

 my @dbs = split(/,/,$opt{ d

 foreach my $db (@dbs){

 $do_dbs{$db}=1;

 }

 }

 # 指定表

 if ($opt{ T}){

 my @tbs = split(/,/,$opt{ T

 foreach my $tb (@tbs){

 $do_tbs{$tb}=1;

 }

 }

 # 提取有效 DML SQL

 $ROLLBACK_DML = $MYSQLBINLOG. | grep ^###

 # 去掉注释: ### –

 # 删除首尾空格

 $ROLLBACK_DML .= | sed s/###\\s*//g;s/\\s*\$//g

  mdebug(rollback dml\n\t$ROLLBACK_DML

 

 # 检查内容是否为空

 my $cmd = $ROLLBACK_DML | wc -l

  mdebug(check contain dml sql\n\t$cmd

 my $size = `$cmd`;

 chomp($size);

 unless ($size 0){

  merror(binlog DML is empty:$ROLLBACK_DML

 };

# —————————————————————————————-

# Func : check binlog contain DDL

# —————————————————————————————-

sub check_binlog{

  mdebug($PRE_FUNCT check_binlog

 my $cmd = $MYSQLBINLOG

 $cmd .= | grep -E -i ^(CREATE|ALTER|DROP|RENAME)

  mdebug(check binlog has DDL cmd\n\t$cmd

 my $ddlcnt = `$cmd`;

 chomp($ddlcnt);

 my $ddlnum = `$cmd | wc -l`;

 chomp($ddlnum);

 my $res = 0;

 if ($ddlnum 0){

 # 在 ddl sql 前面加上前缀 DDL

 $ddlcnt = `echo $ddlcnt | sed s/^//g

  merror(binlog contain $ddlnum DDL:$MYSQLBINLOG. ddl sql:\n$ddlcnt

 }

 return $res;

}

# —————————————————————————————-

# Func : init all table column order

# if input –database –table params, only get set table column order

# —————————————————————————————-

sub init_tbcol{

  mdebug($PRE_FUNCT init_tbcol

 # 提取 DML 语句

 my $cmd .= $ROLLBACK_DML | grep -E ^(INSERT|UPDATE|DELETE)

 # 提取表名,并去重

 #$cmd .= | awk {if (\$1 ~ \ ^UPDATE\) {print \$2}else {print \$3}} | uniq

 $cmd .= | awk {if (\$1 ~ \ ^UPDATE\) {print \$2}else {print \$3}} | sort | uniq

  mdebug(get table name cmd\n\t$cmd

 open ALLTABLE, $cmd | or die can t open file:$cmd\n

 while (my $tbname = ALLTABLE){

 chomp($tbname);

 #if (exists $tbcol_pos{$tbname}){

 # next;

 #}

  init_one_tbcol($tbname) unless (ignore_tb($tbname));

 

 }

 close ALLTABLE or die can t close file:$cmd\n

 # init tb col

 foreach my $tb (keys %tbcol_pos){

  mdebug(tbname- $tb

 my %colpos = %{$tbcol_pos{$tb}};

 foreach my $pos (keys %colpos){

 my $col = $colpos{$pos};

 my ($cname,$ctype) = split(/$SPLITER_COL/, $col);

  mdebug(\tpos- $pos,cname- $cname,ctype- $ctype

 }

 }

};

# —————————————————————————————-

# Func : init one table column order

# —————————————————————————————-

sub init_one_tbcol{

 my $tbname = shift;

  mdebug($PRE_FUNCT init_one_tbcol

 # 获取表结构及列顺序

 my $cmd = $MYSQL. –skip-column-names –silent -e desc $tbname

 # 提取列名,并拼接

 $cmd .= | awk -F\ \\t\ \ {print NR\ $SPLITER_COL`\ \$1\ `$SPLITER_COL\ \$2}

  mdebug(get table column infor cmd\n\t$cmd

 open TBCOL, $cmd | or die can t open desc $tbname;

 my %colpos;

 while (my $line = TBCOL){

 chomp($line);

 my ($pos,$col,$coltype) = split(/$SPLITER_COL/,$line);

  mdebug(linesss=$line\n\t\tpos=$pos\n\t\tcol=$col\n\t\ttype=$coltype

 $colpos{$pos} = $col.$SPLITER_COL.$coltype;

 }

 close TBCOL or die can t colse desc $tbname

 $tbcol_pos{$tbname} = \%colpos;

}

# —————————————————————————————-

# Func : rollback sql: INSERT/UPDATE/DELETE

# —————————————————————————————-

sub do_binlog_rollback{

 my $binlogfile = $ROLLBACK_DML

  mdebug($PRE_FUNCT do_binlog_rollback

 # INSERT|UPDATE|DELETE

 my $sqltype;

 # WHERE|SET

 my $sqlarea;

 

 my ($tbname, $sqlstr) = (,

 my ($notignore, $isareabegin) = (0,0);

 # output sql file

 open SQLFILE, $outfile or die Can t open sql file:$outfile

 # binlog file

 open BINLOG, $binlogfile | or die Can t open file: $binlogfile

 while (my $line = BINLOG){

 chomp($line);

 if ($line =~ /^(INSERT|UPDATE|DELETE)/){

 # export sql

 if ($sqlstr ne){

 $sqlstr .= \n

 print SQLFILE $sqlstr;

  mdebug(export sql\n\t .$sqlstr);

 $sqlstr =

 }

 if ($line =~ /^INSERT/){

 $sqltype = $SQLTYPE_IST;

 $tbname = `echo $line | awk {print \$3}

 chomp($tbname);

 $sqlstr = qq{DELETE FROM $tbname};

 }elsif ($line =~ /^UPDATE/){

 $sqltype = $SQLTYPE_UPD;

 $tbname = `echo $line | awk {print \$2}

 chomp($tbname);

 $sqlstr = qq{UPDATE $tbname};

 }elsif ($line =~ /^DELETE/){

 $sqltype = $SQLTYPE_DEL; 

 $tbname = `echo $line | awk {print \$3}

 chomp($tbname);

 $sqlstr = qq{INSERT INTO $tbname};

 }

 # check ignore table

 if(ignore_tb($tbname)){

 $notignore = 0;

  mdebug(#IGNORE#:line: .$line);

 $sqlstr =

 }else{

 $notignore = 1;

  mdebug(#DO#:line: .$line);

 }

 }else {

 if($notignore){

  merror(can t get tbname) unless (defined($tbname));

 if ($line =~ /^WHERE/){

 $sqlarea = $SQLAREA_WHERE;

 $sqlstr .= qq{SET};

 $isareabegin = 1;

 }elsif ($line =~ /^SET/){

 $sqlarea = $SQLAREA_SET;

 $sqlstr .= qq{WHERE};

 $isareabegin = 1;

 }elsif ($line =~ /^\@/){

 $sqlstr .= deal_col_value($tbname, $sqltype, $sqlarea, $isareabegin, $line);

 $isareabegin = 0;

 }else{

  mdebug(::unknown sql: .$line);

 }

 }

 }

 }

 # export last sql

 if ($sqlstr ne){

 $sqlstr .= \n

 print SQLFILE $sqlstr;

  mdebug(export sql\n\t .$sqlstr);

 }

 

 close BINLOG or die Can t close binlog file: $binlogfile

 close SQLFILE or die Can t close out sql file: $outfile

 # 逆序

 # 1!G: 只有第一行不执行 G, 将 hold space 中的内容 append 回到 pattern space

 # h: 将 pattern space 拷贝到 hold space

 # $!d: 除最后一行都删除

 my $invert = sed -i 1!G;h;\$!d $outfile

 my $res = `$invert`;

  mdebug(inverter order sqlfile :$invert

}

# —————————————————————————————-

# Func : transfer column pos to name

# deal column value

#

# deal_col_value($tbname, $sqltype, $sqlarea, $isareabegin, $line);

# —————————————————————————————-

sub deal_col_value($$$$$){

 my ($tbname, $sqltype, $sqlarea, $isareabegin, $line) = @_;

  mdebug($PRE_FUNCT deal_col_value

  mdebug(input:tbname- $tbname,type- $sqltype,area- $sqlarea,areabegin- $isareabegin,line- $line

 my @vals = split(/=/, $line);

 my $pos = substr($vals[0],1);

 my $valstartpos = length($pos)+2;

 my $val = substr($line,$valstartpos);

 my %tbcol = %{$tbcol_pos{$tbname}};

 my ($cname,$ctype) = split(/$SPLITER_COL/,$tbcol{$pos});

  merror(can t get $tbname column $cname type) unless (defined($cname) || defined($ctype));

  mdebug(column infor:cname- $cname,type- $ctype

 # join str

 my $joinstr;

 if ($isareabegin){

 $joinstr =

 }else{

 # WHERE 被替换为 SET, 使用 , 连接

 if ($sqlarea eq $SQLAREA_WHERE){

 $joinstr = ,

 # SET 被替换为 WHERE 使用 AND 连接

 }elsif ($sqlarea eq $SQLAREA_SET){

 $joinstr = AND

 }else{

  merror(!!!!!!The scripts error

 }

 }

 

 #

 my $newline = $joinstr;

 # NULL value

 if (($val eq NULL) ($sqlarea eq $SQLAREA_SET)){

 $newline .= qq{$cname IS NULL};

 }else{

 # timestamp: record seconds

 if ($ctype eq timestamp){

 $newline .= qq{$cname=from_unixtime($val)};

 # datetime: @n=yyyy-mm-dd hh::ii::ss

 }elsif ($ctype eq datetime){

 $newline .= qq{$cname= $val

 }else{

 $newline .= qq{$cname=$val};

 }

 }

  mdebug(\told $line\n\tnew $newline

 

 return $newline;

}

# —————————————————————————————-

# Func : check is ignore table

# params: IN table full name #  format:`dbname`.`tbname`

# RETURN:

# 0 not ignore

# 1 ignore

# —————————————————————————————-

sub ignore_tb($){

 my $fullname = shift;

 # 删除 `

 $fullname =~ s/`//g;

 my ($dbname,$tbname) = split(/\./,$fullname);

 my $res = 0;

 

 # 指定了数据库

 if ($opt{ d}){

 # 与指定库相同

 if ($do_dbs{$dbname}){

 # 指定表

 if ($opt{ T}){

 # 与指定表不同

 unless ($do_tbs{$tbname}){

 $res = 1;

 }

 }

 # 与指定库不同

 }else{

 $res = 1;

 }

 }

 # mdebug(Table check ignore:$fullname- $res

 return $res;

}

# —————————————————————————————-

# Func : print debug msg

# —————————————————————————————-

sub mdebug{

 my (@msg) = @_;

 print @msg\n if ($opt{ debug

}

# —————————————————————————————-

# Func : print error msg and exit

# —————————————————————————————-

sub merror{

 my (@msg) = @_;

 print :@msg\n

  print_usage();

 exit(1);

}

# —————————————————————————————-

# Func : print usage

# —————————————————————————————-

sub print_usage{

 print EOF;

==========================================================================================

Command line options :

 –help # OUT : print help info

 -f, –srcfile # IN : binlog file. [required]

 -o, –outfile # OUT : output sql file. [required]

 -h, –host # IN : host. default 127.0.0.1

 -u, –user # IN : user. [required]

 -p, –password # IN : password. [required]

 -P, –port # IN : port. default 3306

 –start-datetime # IN : start datetime

 –stop-datetime # IN : stop datetime

 –start-position # IN : start position

 –stop-position # IN : stop position

 -d, –database # IN : database, split comma

 -T, –table # IN : table, split comma. [required] set -d

 -i, –ignore # IN : ignore binlog check contain DDL(CREATE|ALTER|DROP|RENAME)

 –debug # IN : print debug information

Sample :

 shell perl binlog-rollback.pl -f mysql-bin.000001 -o /tmp/t.sql -u user -p pwd

 shell perl binlog-rollback.pl -f mysql-bin.000001 -o /tmp/t.sql -u user -p pwd -i

 shell perl binlog-rollback.pl -f mysql-bin.000001 -o /tmp/t.sql -u user -p pwd –debug

 shell perl binlog-rollback.pl -f mysql-bin.000001 -o /tmp/t.sql -h 192.168.1.2 -u user -p pwd -P 3307

 shell perl binlog-rollback.pl -f mysql-bin.000001 -o /tmp/t.sql -u user -p pwd –start-position=107

 shell perl binlog-rollback.pl -f mysql-bin.000001 -o /tmp/t.sql -u user -p pwd –start-position=107 –stop-position=10000

 shell perl binlog-rollback.pl -f mysql-bin.000001 -o /tmp/t.sql -u user -p pwd -d db1,db2

 shell perl binlog-rollback.pl -f mysql-bin.0000* -o /tmp/t.sql -u user -p pwd -d db1,db2 -T tb1,tb2

==========================================================================================

EOF

 exit;

}

1;

看完了这篇文章,相信你对“MySQL 在 ROW 模式下如何通过 binlog 提取 SQL 语句”有了一定的了解,如果想了解更多相关知识,欢迎关注丸趣 TV 行业资讯频道,感谢各位的阅读!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-07-27发表,共计15518字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)