基于spark stream 实现mysql错误日志的监控

说明:很多听起来高大上的东西实际并不一定有多难,下面演示下基于spark stream的数据库监控功能,用到的环境为mysql 8,spark 1.6 ,flume 1.7 ;通过这个简单功能的扩展,就可以实现一个定制的监控系统;

1.mysql 配置

mysql错误日志的配置:

log_output=FILE 

log_error=/Volumes/ssd/hadoop/soft/mysql-8/mysql8/log/HuoSi.err    

log_error_verbosity= 3                         

在配置完成以上的参数后,基本可以记录到想要的错误信息了,下面看看具体的错误日志的格式:

error

     这里的的数字部分分布代表:

1、时间

2、同一个错误在同一个实例生命周期出现的次数,但是不完全准确(需要再深入确定)

3、错误级别,有三种:note ,warning,error

4、错误具体内容

2、Flume配置

flume 版本 1.7,下载地址:http://flume.apache.org/download.html,下载编译好的版本,可以直接运行

flume 的配置文件:

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command=tail -F /Volumes/ssd/hadoop/soft/mysql-8/mysql8/log/HuoSi.err

a1.sources.r1.channels=c1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = localhost

a1.sinks.k1.port = 3333

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

运行使用的命令:

bin/flume-ng agent –conf conf –conf-file conf/flume.push.conf –name a1 -Dflume.root.logger=INFO,console

3、spark 设置

如果只是开发测试,那么只要在工程项目里添加了 spark的相关jar文件,即可进行测试,如果是要用来spark 任务提交的方式进行,那么就需要搭建spark运行环境,关于spark的搭建,可以参看apache的官方文档,这里就不在细说,我这里是代码样例测试,是在开发环境里直接测试的,开发工具为:IDEA 15,为何要用这个工具呢,在以后的项目架构中详述;

error3

下面看看测试代码,实际非常简单:

package HuoSi;

/**
 * Created by yc on 10/20/16.
 */

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
public class MySpark {

    public static void main(String[] args) throws Exception {

        String host ="localhost";
        int port = 3333;
        Duration batchInterval = new Duration(5000);
        SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount").setMaster("local[2]");
       // JavaSparkContext sc = new JavaSparkContext(sparkConf);
       
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
        JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
                FlumeUtils.createStream(ssc, host, port);
        flumeStream.count();
        flumeStream.count().map(new Function<Long, String>() {
            // @Override
           
public String call(Long in) {
                return "获思测试==收到问题日志信息: " + in + " from mysql error.";
            }
        }).print();
        ssc.start();
        ssc.awaitTermination();
    }
}

 

运行:

/Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -Didea.launcher.port=7543 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA

partitions

………………中间省略N行

——————————————-

Time: 1476940510000 ms

——————————————-

获思测试==收到问题日志信息: 0 from mysql error.

出现上述信息,说明,可以正常运行;

首先运行spark 流的代码,然后启动 flume,然后刻意产生mysql的错误日志信息,spark流中就可以显示监控的信息;

error4

至此测试完成,通过测试可以实现用spark stream 对mysql 错误日志的监控,扩展一下将收到的错误信息进行分类等操作,给维护者发送比如短信告警等通知,和具体的错误信息,就可以实现基本的监控系统,最主要的是方便实际的定制;

发表评论

电子邮件地址不会被公开。 必填项已用*标注