目 录
-
项目实战——将Hive表的数据直接导入ElasticSearch
此篇文章不用写代码,简单粗暴,但是相对没有那么灵活;底层采用MapReduce计算框架,导入速度相对较慢! -
项目实战——Spark将Hive表的数据写入ElasticSearch(Java版本)
此篇文章需要Java代码,实现功能和篇幅类似,直接Java一站式解决Hive内用Spark取数,新建ES索引,灌入数据,并且采用ES别名机制,实现ES数据更新的无缝更新,底层采用Spark计算框架,导入速度相对文章1的做法较快的多!; -
项目实战——钉钉报警验证ElasticSearch和Hive数据仓库内的数据质量(Java版本)
此篇文章主要选取关键性指标,数据校验数据源Hive和目标ES内的数据是否一致; -
项目实战——Spark将Hive表的数据写入需要用户名密码认证的ElasticSearch(Java版本)
此篇文章主要讲述如何通过spark将hive数据写入带账号密码权限认证的ElasticSearch 内; -
项目实战(生产环境部署上线)——参数配置化Spark将Hive表的数据写入需要用户名密码认证的ElasticSearch(Java版本))
此篇文章主要讲述如何通过spark将hive数据写入带账号密码权限认证的ElasticSearch 内,同时而是,spark,es建索引参数配置化,每次新增一张表同步到es只需要新增一个xml配置文件即可,也是博主生产环境运用的java代码,弥补下很多老铁吐槽方法4的不足。
综述:
1.如果感觉编码能力有限,又想用到Hive数据导入ElasticSearch,可以考虑文章1;
2.如果有编码能力,个人建议采用文章2和文章3的组合情况(博主推荐),作为离线或者近线数据从数据仓库Hive导入ElasticSearch的架构方案,并且此次分享的Java代码为博主最早实现的版本1,主要在于易懂,实现功能,学者们可以二次加工,请不要抱怨代码写的烂;
3.如果是elasticsearch是自带账号密码权限认证的,如云产品或者自己设置了账号密码认证的,那么办法,只能用文章4了;
4.如果部署上线,还是要看文章5。
- 本人Hive版本:2.3.5
- 本人ES版本:7.7.1
- 本人Spark版本:2.3.3
项目树
此项目主要是承接本文目录文章1或2的后续,因为你不知道将Hive的数据导入到了ElasticSearch后,数据量是否准确,所以需要钉钉报警校验ElasticSearch和Hive数据仓库内的数据质量,注意,这个项目打包后,最好另起一个进程调用,并且开始时间为文章1或者2最大预估的结束时间后的10分钟后调用,这样可以校验两种情形:
- 文章1或者2的项目被调度了,但是造成了数据异常,可以捕捉到;
- 文章1或者2的项目压根就没起来,即超时了,造成了数据异常,亦可被捕捉!
总体项目树图谱如图1所示,编程软件:IntelliJ IDEA 2019.3 x64
,采用Maven
架构;
项目链接地址:项目实战——钉钉报警校验ElasticSearch和Hive数据仓库内的数据质量(Java版本)
feign
:连接ES和Spark客户端相关的Java类;utils
:操作ES和Spark相关的Java类;resources
:日志log
的配置类;pom.xml
:Maven配置文件;
Maven配置文件pox.xml
该项目使用到的Maven依赖包存在pom.xml
上,具体如下所示;.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SparkOnHiveToEs_v1</artifactId>
<version>1.0-SNAPSHOT</version>
<name>SparkOnHiveToEs_v1</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<!--ES本身的依赖-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client -->
<!--ES高级API,用来连接ES的Client等操作-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<!--junit,Test测试使用-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<!--lombok ,用来自动生成对象类的构造函数,get,set属性等-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<!--jackson,用来封装json-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.3.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<!-- https://mvnrepository.com/artifact/net.iharder/base64 -->
<dependency>
<groupId>net.iharder</groupId>
<artifactId>base64</artifactId>
<version>2.3.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- maven 打jar包需要插件 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
<!--<appendAssemblyId>false</appendAssemblyId>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.bjsxt.scalaspark.core.examples.ExecuteLinuxShell</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
日志配置文件
最终这个Job是需要给spark-submit调用的,所以希望有一些有用关键的信息可以通过日志输出,而不是采用System,out.println
的形式输出到console端,所以要用到log.info("关键内容信息")
方法,所以设置两个log
的配置信息,如,只输出bug,不输出warn等,可以根据自己需求来配置,具体两个log配置文件内容如下;
log4j.properties
配置如下;
log4j.rootLogger=INFO, stdout, R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=firestorm.log
log4j.appender.R.MaxFileSize=100KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n
log4j.logger.com.codefutures=INFO
log4j2.xml
配置如下;
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%m%n" />
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>
连接Spark的客户端
要想通过Spark来读取Hive库的数据,首先需要配置连接Spark的客户端,具体代码如下的SparkClient.java
文件;
package cn.focusmedia.esapp.feign;
import org.apache.spark.sql.SparkSession;
public class SparkClient
{
public static SparkSession getSpark()
{
SparkSession spark=SparkSession.builder().appName("SparkToES").enableHiveSupport().getOrCreate();
return spark;
}
}
连接ElasticSearch的客户端
要想操作ES,首先需要配置连接ES的客户端,具体代码如下的EsClient.java
文件;
注意:这里ES集群的信息,最好自己的正式版写在配置文件内,连接去读取配置文件,这样ES集群信息变更只要修改配置文件就行了,我这样写只是为了说明问题,易读易懂!
package cn.focusmedia.esapp.feign;
import org.apache.http.HttpHost;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
@Slf4j
public class EsClient
{
public static RestHighLevelClient getClient()
{
//配置集群连接的IP和端口,正式项目是要走配置文件的,这里偷懒下,就写死吧,也方便说明问题,不要骂我代码太烂就行
//创建HttpHost对象
HttpHost[] myHttpHost = new HttpHost[7];
myHttpHost[0]=new HttpHost("10.121.10.1",9200);
myHttpHost[1]=new HttpHost("10.121.10.2",9200);
myHttpHost[2]=new HttpHost("10.121.10.3",9200);
myHttpHost[3]=new HttpHost("10.121.10.4",9200);
myHttpHost[4]=new HttpHost("10.121.10.5",9200);
myHttpHost[5]=new HttpHost("10.121.10.6",9200);
myHttpHost[6]=new HttpHost("10.121.10.7",9200);
//创建RestClientBuilder对象
RestClientBuilder myRestClientBuilder=RestClient.builder(myHttpHost);
//创建RestHighLevelClient对象
RestHighLevelClient myclient=new RestHighLevelClient(myRestClientBuilder);
log.info("RestClientUtil intfo create rest high level client successful!");
return myclient;
}
}
数据校验Hive表和ElasticSearch工具类实现
utils/EsQueryTest.java
数据校验Hive表和ElasticSearch工具类实现有两个,ES的check查询主要在utils/EsQueryTest.java
文件下,具体设计的内容如下;
package cn.focusmedia.esapp.utils;
import cn.focusmedia.esapp.feign.EsClient;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.aggregations.metrics.ValueCount;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import java.io.IOException;
@Slf4j
public class EsQueryTest
{
static RestHighLevelClient myClient= EsClient.getClient(); //获取操作ES的
//获取ES数据更新的时间
@Test
public static String getEventDay(String index,String fieldname) throws IOException
{
//1。创建request对象,查询用的对象一般都是SearchRequest对象
SearchRequest request = new SearchRequest(index);
//2,指定查询条件,依赖查询条件的对象SearchSourceBuilder的对象
//虽然matchall没有条件,但是还是要指定查询类型为matchall
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
builder.size(1); //ES默认只查询10条记录,即默认size=10,如果需要查询更多,则需要加到size的值
request.source(builder);
//3. 执行查询
SearchResponse response = myClient.search(request, RequestOptions.DEFAULT);
//4. 获取到_source中的数据,并展示
//遍历输出每个文档
SearchHit[] hits = response.getHits().getHits();
String event_day = hits[0].getSourceAsMap().get(fieldname).toString();
log.info("the event_day date is :"+event_day);
return event_day;
}
//获取某索引数据行统计
public static long getESCount( String index,String fieldname) throws IOException {
//创建SearchRequest
SearchRequest mySearchRequest =new SearchRequest(index);
//指定使用的聚合查询方式
SearchSourceBuilder mySearchSourceBuilder=new SearchSourceBuilder();
mySearchSourceBuilder.aggregation(AggregationBuilders.count("agg").field(fieldname));
mySearchRequest.source(mySearchSourceBuilder);
//执行查询
SearchResponse mySearchResponse= myClient.search(mySearchRequest, RequestOptions.DEFAULT);
//获取返回结果
ValueCount agg=mySearchResponse.getAggregations().get("agg");
long value =agg.getValue();
log.info("the index: "+index+" document count is "+value+" .");
return value;
}
//指定某索引的某类型的sum统计,即关键性指标的总和统计,关键性指标根据业务需求来定
public static long getESSumField(String index,String fieldname) throws IOException {
//创建SearchRequest
SearchRequest mySearchRequest =new SearchRequest(index);
//指定使用的聚合查询方式
SearchSourceBuilder mySearchSourceBuilder=new SearchSourceBuilder();
mySearchSourceBuilder.aggregation(AggregationBuilders.sum("agg").field(fieldname.toString()));
mySearchRequest.source(mySearchSourceBuilder);
//执行查询
SearchResponse mySearchResponse= myClient.search(mySearchRequest, RequestOptions.DEFAULT);
//获取返回结果
Sum agg=mySearchResponse.getAggregations().get("agg");
long value =(long)agg.getValue();
log.info("the index: "+index+" fieldname: "+fieldname+" summary is "+value+" .");
return value;
}
}
utils/EsUtils.java
数据校验Hive表和ElasticSearch工具类实现有两个,Spark校验Hive的查询主要在utils/EsUtils.java
文件下,具体设计的内容如下;
package cn.focusmedia.esapp.utils;
import cn.focusmedia.esapp.feign.EsClient;
import cn.focusmedia.esapp.feign.SparkClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.elasticsearch.client.RestHighLevelClient;
import org.apache.spark.sql.Row;
import org.junit.Test;
import java.util.List;
@Slf4j
public class EsUtils
{
static RestHighLevelClient myClient= EsClient.getClient(); //获取操作ES的
static SparkSession spark = SparkClient.getSpark();
public static long getHiveTableCount(String tablename)
{
Dataset<Row> table = spark.table(tablename).repartition(60);
long count = table.count();
log.info("the hive table :"+tablename+" count is "+count+" .");
return count;
}
@Test
public static long getHiveSumFieled(String tablename,String columnName)
{
Dataset<Row> table = spark.table(tablename).repartition(60);
//java匿名内部类实现开始
Dataset<Row> sum = table.groupBy().sum(columnName).as(columnName);
JavaRDD<Long> map = sum.javaRDD().map(new Function<Row, Long>() {
@Override
public Long call(Row row) throws Exception {
return Long.valueOf(row.get(0).toString());
}
});
List<Long> collect = map.collect();
Long sumC=0l;
for (Long aLong : collect) {
sumC+=aLong;
}
//java匿名内部类实现结束
/*
//java匿名内部类实现也可以将dataframe强转成Row[],然后读取Row[]具体对象的单元,具体参考如下
Row[] collect = (Row[])df.collect();//获取第一行 得到的是一个2维度数组
System.out.println(collect[0].get(2).toString());
*/
return sumC;
}
}
钉钉报警工具类的实现
钉钉报警工具类的实现实现主要在utils/SendDingTalkRisk.java
文件下,由于调度服务器可能存在连接外网失败的可能,而钉钉又是阿里的SDK,要想收到通知,必须要连接外网,所以要涉及代理服务器来代替调度服务器发送钉钉,且是https
的代理请求,具体代码如下;
注意:这里的钉钉报警是在群内(3人及以上组成)的机器人作为报警,可以@相关人员,至于添加钉钉机器人的配置,可以参考博客调度Job报错或异常触发钉钉报警(Python 3.x版);
package cn.focusmedia.esapp.utils;
import cn.hutool.json.JSONObject;
import org.apache.http.HttpHost;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.DefaultProxyRoutePlanner;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.Arrays;
public class SendDingTalkRisk
{
public static HttpClientBuilder proxy(String hostOrIP, int port) {
// 依次是代理地址,代理端口号,协议类型
HttpHost proxy = new HttpHost(hostOrIP, port, "http");
DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy);
return HttpClients.custom().setRoutePlanner(routePlanner);
}
/**
* 绕过验证
*
* @return
* @throws NoSuchAlgorithmException
* @throws KeyManagementException
*/
public static SSLContext createIgnoreVerifySSL() throws NoSuchAlgorithmException, KeyManagementException {
SSLContext sc = SSLContext.getInstance("SSLv3");
// 实现一个X509TrustManager接口,用于绕过验证,不用修改里面的方法
X509TrustManager trustManager = new X509TrustManager() {
@Override
public void checkClientTrusted(
java.security.cert.X509Certificate[] paramArrayOfX509Certificate,
String paramString) throws CertificateException {
}
@Override
public void checkServerTrusted(
java.security.cert.X509Certificate[] paramArrayOfX509Certificate,
String paramString) throws CertificateException {
}
@Override
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
};
sc.init(null, new TrustManager[]{
trustManager}, null);
return sc;
}
public static void sendDingTalkRisk(String sendMessage,boolean isAtAll,String[] phone) throws IOException, IOException, KeyManagementException, NoSuchAlgorithmException
{
SSLContext sslcontext = createIgnoreVerifySSL();//绕过证书验证,处理https请求
// 设置协议http和https对应的处理socket链接工厂的对象
Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.INSTANCE)
.register("https", new SSLConnectionSocketFactory(sslcontext))
.build();
PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry);
HttpClients.custom().setConnectionManager(connManager);
//指定代理服务器
CloseableHttpClient httpClient= proxy("10.210.78.25", 3128).setConnectionManager(connManager).build();
//指定钉钉群的机器定url
HttpPost httpPost=new HttpPost("https://oapi.dingtalk.com/robot/send?access_token=9dd43c7484e9eddf929390bsq67c1bb68dcaee1asq634a923b0c7ab8bf69927c08");
//HttpPost httpPost=new HttpPost("https://oapi.dingtalk.com/robot/send?access_token=b6d438cb3af76732b7f3de51693c7996a50b4e51f5e24f1cf99cacc03f5e2ef0");
httpPost.addHeader("Content-Type", "application/json; charset=utf-8");
JSONObject bodys = new JSONObject();
bodys.put("msgtype","text");
JSONObject text=new JSONObject();
text.put("content",sendMessage);
bodys.put("text",text);
JSONObject at=new JSONObject();
at.put("isAtAll",isAtAll);
at.put("atMobiles", Arrays.asList(phone));
bodys.put("at",at);
StringEntity se = new StringEntity(bodys.toJSONString(4), "utf-8");
httpPost.setEntity(se);
httpClient.execute(httpPost);
}
public static void main(String[] args) throws IOException, NoSuchAlgorithmException, KeyManagementException
{
String sendMessage ="数仓小D:测试文本消息!";
boolean isAtAll=false;
//后面的号码是需要@的人手机
SendDingTalkRisk.sendDingTalkRisk(sendMessage,isAtAll, new String[]{
"130****896"});
}
}
主函数调用工具类实现整体功能
主函数的实现的 功能顺序下所示;
- 利用Spark获取Hive的关键指标统计,存入变量
- 利用Java High Level Client操作ES,统计关键性指标,存入变量;
- 查看ES数据的更新时间是否符合预期,且步骤1和步骤2的同一关键性指标进行等值对比,都符合要求的话钉钉输出校验结果,报喜成功;否则,亦输出校验结果,报忧,通知人hurry up去处理!
具体代码如下的app.java
文件:
注意,我这里为了方便,就把要求的变量直接定义到了主函数内,其实真正的项目,最好能把这些变量剥离出来放在一个xml文件内作为配置文件,然后在主函数内去读取这个xml文件 ,这样才是最好的,因为后续新的Hive表要求抽取到ES的话,需要修改相应的xml配置文件即可,非常的方便;这里重点在实现功能,易读易懂,锦上添花的事看大家自己吧!
package cn.focusmedia.esapp;
import cn.focusmedia.esapp.utils.EsQueryTest;
import cn.focusmedia.esapp.utils.EsUtils;
import cn.focusmedia.esapp.utils.SendDingTalkRisk;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.Calendar;
/**
* Hello world!
*
*/
@Slf4j
public class App
{
public void dataCheck() throws Exception {
String index="media_all_rs_v0";
String hiveTable="dw.app_rs_media_galaxy_entrance_key";
String col_entrance_key="entrance_key";
String col_lcd_amount="lcd_amount";
String col_smart_amount="smart_amount";
String col_frame_amount="frame_amount";
String col_event_day="event_day";
//hive table columns DQ
long sparkCount= EsUtils.getHiveTableCount(hiveTable);
long sparkSmartAmount =EsUtils.getHiveSumFieled(hiveTable,col_smart_amount);
long sparkFrameAmount =EsUtils.getHiveSumFieled(hiveTable,col_frame_amount);
long sparkLcdAmount =EsUtils.getHiveSumFieled(hiveTable,col_lcd_amount);
//ES index field DQ
long esCount = EsQueryTest.getESCount(index, col_entrance_key);
long esSmartAmount = EsQueryTest.getESSumField(index, col_smart_amount);
long esFrameAmount = EsQueryTest.getESSumField(index, col_frame_amount);
long esLcdAmount = EsQueryTest.getESSumField(index, col_lcd_amount);
String esEventDay = EsQueryTest.getEventDay(index,col_event_day);
int flat=0;
String dqCount="";
if(sparkCount==esCount)
{
dqCount = "数仓门洞数:" + sparkCount + ";\nES门洞数:" + esCount + ";一致!";
flat+=0;
}
else {
dqCount = "数仓门洞数:" + sparkCount + ";\nES门洞数:" + esCount + ";不一致!";
flat+=1;
}
String smartAmount;
if(sparkSmartAmount==esSmartAmount) {
smartAmount = "数仓智能屏数:" + sparkSmartAmount + ";\nES智能屏数:" + esSmartAmount + ";一致!";
flat+=0;
}
else {
smartAmount = "数仓智能屏数:" + sparkSmartAmount + ";\nES智能屏数:" + esSmartAmount + ";不一致!";
flat+=1;
}
String frameAmount;
if(sparkFrameAmount==esFrameAmount) {
frameAmount = "数仓框架数:" + sparkFrameAmount + ";\nES框架数:" + esFrameAmount + ";一致!";
flat+=0;
}
else {
frameAmount = "数仓框架数:" + sparkFrameAmount + ";\nES框架数:" + esFrameAmount + ";不一致!";
flat+=1;
}
String lcdAmount;
if(sparkLcdAmount==esLcdAmount) {
lcdAmount = "数仓液晶数:" + sparkLcdAmount + ";\nES液晶数:" + esLcdAmount + ";一致 !";
flat+=0;
}
else {
lcdAmount = "数仓液晶数:" + sparkLcdAmount + ";\nES液晶数:" + esLcdAmount + ";不一致 !";
flat+=1;
}
Calendar calendar = Calendar.getInstance();
SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMdd");
calendar.add(Calendar.DAY_OF_MONTH,-1);
String date = fmt.format(calendar.getTime());
String eventDay;
if(date.equals(esEventDay))
{
eventDay = "ES索引:" + index + ";数据日期:" + esEventDay + ";";
flat+=0;
}
else {
eventDay = "ES索引:" + index + ";数据日期:" + esEventDay + ";";
flat+=1;
}
String dataCheck="";
if(flat==0)
{
dataCheck="数据更新成功!";
}
else {
dataCheck="数据更新有误,请及时处理!";
}
// log.info(dqCount+"");
// log.info(smartAmount+"");
// log.info(frameAmount+"");
// log.info(lcdAmount+"");
// log.info(eventDay+"");
// log.info(dataCheck);
String dingContext="数仓小D:\r\n"+dqCount+"\r\n"+smartAmount+"\r\n"+frameAmount+"\r\n"+lcdAmount+"\r\n"+eventDay+"\r\n"+dataCheck+"\r\n";
log.info(dingContext);
String[] phone={
"130****0896","136****8552"};
SendDingTalkRisk.sendDingTalkRisk(dingContext, false, phone);
}
public static void main( String[] args ) throws Exception {
App app = new App();
app.dataCheck();
}
}
打成Jar包并部署
将调试无误的项目打成Jar包,如果还不会打Jar包,可以参考博客IntelliJ IDEA将代码打成Jar包的方式,这里我打成的Jar包名字为DQ_hive_and_es_galaxy.jar
;
将DQ_hive_and_es_galaxy.jar
上传到hdfs的/app/hive_to_es_galaxy/dq_check_jar/DQ_hive_and_es_galaxy.jar
路径下,然后写一个spark-submit
调用的shell脚本DQ_hive_and_es_galaxy.sh
,具体如下:
#!/bin/bash
cur_dir=`pwd`
spark-submit --master yarn --deploy-mode cluster --executor-memory 8G --executor-cores 5 --num-executors 4 --queue etl --conf spark.kryoserializer.buffer.max=256m --conf spark.kryoserializer.buffer=64m --class cn.focusmedia.esapp.App hdfs:///app/hive_to_es_galaxy/dq_check_jar/DQ_hive_and_es_galaxy.jar
dq_check_flag=$?
if [ $dq_check_flag -eq 0 ];then
echo "data quality check run succeed!"
else
echo "data quality check run failed!"
## 以下内容是我们设计的错误报错的钉钉报警,这里可以改成你们自己的报警措施
cd ${cur_dir}/../src/ding_talk_warning_report_py/main/
python3 ding_talk_with_agency.py 235
exit 3
fi
调度shell脚本
最后就是将这个DQ_hive_and_es_galaxy.sh
脚本调度起来,如用Azkaban
调度,设置自己需求的调度频率;
注意,这个项目打包后,最好另起一个进程调用,并且开始时间为文章1或者2最大预估的结束时间后的10分钟后调用,这样可以校验两种情形:
1.文章1或者2的项目被调度了,但是造成了数据异常,可以捕捉到;
2.文章1或者2的项目压根就没起来,即超时了,造成了数据异常,亦可被捕捉!
效 果
具体效果如图2所示;
总 结
有了项目实战——钉钉报警校验ElasticSearch和Hive数据仓库内的数据质量(Java版本),本文目录的文章1和文章2的项目ETL才算完整安全,不然总是少点什么的,万一数据不对,又要被你的需求方拉入小黑屋教育不是,还是好好做好数据校验才是王道!
本文参考链接:https://blog.csdn.net/LXWalaz1s1s/article/details/109266385