目 录

  1. 项目实战——将Hive表的数据直接导入ElasticSearch
      此篇文章不用写代码,简单粗暴,但是相对没有那么灵活;底层采用MapReduce计算框架,导入速度相对较慢!

  2. 项目实战——Spark将Hive表的数据写入ElasticSearch(Java版本)
      此篇文章需要Java代码,实现功能和篇幅类似,直接Java一站式解决Hive内用Spark取数,新建ES索引,灌入数据,并且采用ES别名机制,实现ES数据更新的无缝更新,底层采用Spark计算框架,导入速度相对文章1的做法较快的多!;

  3. 项目实战——钉钉报警验证ElasticSearch和Hive数据仓库内的数据质量(Java版本)
      此篇文章主要选取关键性指标,数据校验数据源Hive和目标ES内的数据是否一致;

  4. 项目实战——Spark将Hive表的数据写入需要用户名密码认证的ElasticSearch(Java版本)
      此篇文章主要讲述如何通过spark将hive数据写入带账号密码权限认证的ElasticSearch 内;

  5. 项目实战(生产环境部署上线)——参数配置化Spark将Hive表的数据写入需要用户名密码认证的ElasticSearch(Java版本))
      此篇文章主要讲述如何通过spark将hive数据写入带账号密码权限认证的ElasticSearch 内,同时而是,spark,es建索引参数配置化,每次新增一张表同步到es只需要新增一个xml配置文件即可,也是博主生产环境运用的java代码,弥补下很多老铁吐槽方法4的不足。

  综述:
  1.如果感觉编码能力有限,又想用到Hive数据导入ElasticSearch,可以考虑文章1;
  2.如果有编码能力,个人建议采用文章2和文章3的组合情况(博主推荐),作为离线或者近线数据从数据仓库Hive导入ElasticSearch的架构方案,并且此次分享的Java代码为博主最早实现的版本1,主要在于易懂,实现功能,学者们可以二次加工,请不要抱怨代码写的烂;
  3.如果是elasticsearch是自带账号密码权限认证的,如云产品或者自己设置了账号密码认证的,那么办法,只能用文章4了;
  4.如果部署上线,还是要看文章5。

  • 本人Hive版本:2.3.5
  • 本人ES版本:7.7.1
  • 本人Spark版本:2.3.3

项目树

  此项目主要是承接本文目录文章1或2的后续,因为你不知道将Hive的数据导入到了ElasticSearch后,数据量是否准确,所以需要钉钉报警校验ElasticSearch和Hive数据仓库内的数据质量,注意,这个项目打包后,最好另起一个进程调用,并且开始时间为文章1或者2最大预估的结束时间后的10分钟后调用,这样可以校验两种情形:

  1. 文章1或者2的项目被调度了,但是造成了数据异常,可以捕捉到;
  2. 文章1或者2的项目压根就没起来,即超时了,造成了数据异常,亦可被捕捉!

  总体项目树图谱如图1所示,编程软件:IntelliJ IDEA 2019.3 x64,采用Maven架构;
  项目链接地址:项目实战——钉钉报警校验ElasticSearch和Hive数据仓库内的数据质量(Java版本)

  • feign:连接ES和Spark客户端相关的Java类;
  • utils:操作ES和Spark相关的Java类;
  • resources:日志log的配置类;
  • pom.xml:Maven配置文件;
    在这里插入图片描述
图1 项目树图谱

Maven配置文件pox.xml

  该项目使用到的Maven依赖包存在pom.xml上,具体如下所示;.

<?xml version="1.0" encoding="UTF-8"?> 
 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
  <modelVersion>4.0.0</modelVersion> 
 
  <groupId>org.example</groupId> 
  <artifactId>SparkOnHiveToEs_v1</artifactId> 
  <version>1.0-SNAPSHOT</version> 
 
  <name>SparkOnHiveToEs_v1</name> 
  <!-- FIXME change it to the project's website --> 
  <url>http://www.example.com</url> 
 
  <properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <maven.compiler.source>1.7</maven.compiler.source> 
    <maven.compiler.target>1.7</maven.compiler.target> 
  </properties> 
 
  <dependencies> 
    <dependency> 
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>4.11</version> 
      <scope>test</scope> 
    </dependency> 
 
    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch --> 
    <!--ES本身的依赖--> 
    <dependency> 
      <groupId>org.elasticsearch</groupId> 
      <artifactId>elasticsearch</artifactId> 
      <version>7.7.1</version> 
    </dependency> 
 
    <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client --> 
    <!--ES高级API,用来连接ES的Client等操作--> 
    <dependency> 
      <groupId>org.elasticsearch.client</groupId> 
      <artifactId>elasticsearch-rest-high-level-client</artifactId> 
      <version>7.7.1</version> 
    </dependency> 
 
    <!-- https://mvnrepository.com/artifact/junit/junit --> 
    <!--junit,Test测试使用--> 
    <dependency> 
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>4.12</version> 
      <scope>test</scope> 
    </dependency> 
 
 
    <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> 
    <!--lombok ,用来自动生成对象类的构造函数,get,set属性等--> 
    <dependency> 
      <groupId>org.projectlombok</groupId> 
      <artifactId>lombok</artifactId> 
      <version>1.18.12</version> 
      <scope>provided</scope> 
    </dependency> 
    <dependency> 
      <groupId>org.testng</groupId> 
      <artifactId>testng</artifactId> 
      <version>RELEASE</version> 
      <scope>compile</scope> 
    </dependency> 
 
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> 
    <!--jackson,用来封装json--> 
    <dependency> 
      <groupId>com.fasterxml.jackson.core</groupId> 
      <artifactId>jackson-databind</artifactId> 
      <version>2.11.0</version> 
    </dependency> 
 
    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 --> 
    <dependency> 
      <groupId>org.elasticsearch</groupId> 
      <artifactId>elasticsearch-spark-20_2.11</artifactId> 
      <version>7.7.1</version> 
    </dependency> 
 
 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> 
    <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>2.3.3</version> 
    </dependency> 
 
    <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_2.11</artifactId> 
      <version>2.3.3</version> 
    </dependency> 
 
    <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-hive_2.11</artifactId> 
      <version>2.3.3</version> 
      <scope>compile</scope> 
    </dependency> 
 
    <dependency> 
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>4.12</version> 
      <scope>compile</scope> 
    </dependency> 
 
    <dependency> 
      <groupId>org.apache.logging.log4j</groupId> 
      <artifactId>log4j-core</artifactId> 
      <version>2.9.1</version> 
    </dependency> 
 
    <dependency> 
      <groupId>org.apache.logging.log4j</groupId> 
      <artifactId>log4j-api</artifactId> 
      <version>2.9.1</version> 
    </dependency> 
 
    <dependency> 
      <groupId>cn.hutool</groupId> 
      <artifactId>hutool-all</artifactId> 
      <version>4.3.1</version> 
      <scope>compile</scope> 
    </dependency> 
    <dependency> 
      <groupId>com.google.guava</groupId> 
      <artifactId>guava</artifactId> 
      <version>20.0</version> 
      <scope>compile</scope> 
    </dependency> 
    <dependency> 
      <groupId>org.projectlombok</groupId> 
      <artifactId>lombok</artifactId> 
      <version>1.18.8</version> 
      <scope>provided</scope> 
    </dependency> 
    <dependency> 
      <groupId>com.alibaba</groupId> 
      <artifactId>fastjson</artifactId> 
      <version>1.2.51</version> 
    </dependency> 
 
    <!-- https://mvnrepository.com/artifact/net.iharder/base64 --> 
    <dependency> 
      <groupId>net.iharder</groupId> 
      <artifactId>base64</artifactId> 
      <version>2.3.9</version> 
    </dependency> 
  </dependencies> 
 
 
  <build> 
    <plugins> 
      <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 --> 
      <plugin> 
        <groupId>org.scala-tools</groupId> 
        <artifactId>maven-scala-plugin</artifactId> 
        <version>2.15.2</version> 
        <executions> 
          <execution> 
            <goals> 
              <goal>compile</goal> 
              <goal>testCompile</goal> 
            </goals> 
          </execution> 
        </executions> 
      </plugin> 
 
      <!-- maven 打jar包需要插件 --> 
      <plugin> 
        <artifactId>maven-assembly-plugin</artifactId> 
        <version>2.4</version> 
        <configuration> 
          <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” --> 
          <!--<appendAssemblyId>false</appendAssemblyId>--> 
          <descriptorRefs> 
            <descriptorRef>jar-with-dependencies</descriptorRef> 
          </descriptorRefs> 
          <archive> 
            <manifest> 
              <mainClass>com.bjsxt.scalaspark.core.examples.ExecuteLinuxShell</mainClass> 
            </manifest> 
          </archive> 
        </configuration> 
        <executions> 
          <execution> 
 
            <id>make-assembly</id> 
            <phase>package</phase> 
            <goals> 
              <goal>assembly</goal> 
            </goals> 
          </execution> 
        </executions> 
      </plugin> 
    </plugins> 
  </build> 
</project> 
 

日志配置文件

  最终这个Job是需要给spark-submit调用的,所以希望有一些有用关键的信息可以通过日志输出,而不是采用System,out.println的形式输出到console端,所以要用到log.info("关键内容信息")方法,所以设置两个log的配置信息,如,只输出bug,不输出warn等,可以根据自己需求来配置,具体两个log配置文件内容如下;
  log4j.properties配置如下;

log4j.rootLogger=INFO, stdout, R 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%5p - %m%n 
log4j.appender.R=org.apache.log4j.RollingFileAppender 
log4j.appender.R.File=firestorm.log 
log4j.appender.R.MaxFileSize=100KB 
log4j.appender.R.MaxBackupIndex=1 
log4j.appender.R.layout=org.apache.log4j.PatternLayout 
log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n 
log4j.logger.com.codefutures=INFO 

  log4j2.xml配置如下;

<?xml version="1.0" encoding="UTF-8"?> 
 
<Configuration status="warn"> 
    <Appenders> 
        <Console name="Console" target="SYSTEM_OUT"> 
            <PatternLayout pattern="%m%n" /> 
        </Console> 
    </Appenders> 
    <Loggers> 
        <Root level="INFO"> 
            <AppenderRef ref="Console" /> 
        </Root> 
    </Loggers> 
</Configuration> 

连接Spark的客户端

  要想通过Spark来读取Hive库的数据,首先需要配置连接Spark的客户端,具体代码如下的SparkClient.java文件;

package cn.focusmedia.esapp.feign; 
 
import org.apache.spark.sql.SparkSession; 
 
public class SparkClient 
{
    
    public static SparkSession getSpark() 
    {
    
        SparkSession spark=SparkSession.builder().appName("SparkToES").enableHiveSupport().getOrCreate(); 
        return spark; 
    } 
 
} 
 

连接ElasticSearch的客户端

  要想操作ES,首先需要配置连接ES的客户端,具体代码如下的EsClient.java文件;
   注意:这里ES集群的信息,最好自己的正式版写在配置文件内,连接去读取配置文件,这样ES集群信息变更只要修改配置文件就行了,我这样写只是为了说明问题,易读易懂!

package cn.focusmedia.esapp.feign; 
 
import org.apache.http.HttpHost; 
import lombok.extern.slf4j.Slf4j; 
import org.elasticsearch.client.RestClient; 
import org.elasticsearch.client.RestClientBuilder; 
import org.elasticsearch.client.RestHighLevelClient; 
 
@Slf4j 
public class EsClient 
{
    
    public static RestHighLevelClient getClient() 
    {
    
        //配置集群连接的IP和端口,正式项目是要走配置文件的,这里偷懒下,就写死吧,也方便说明问题,不要骂我代码太烂就行 
        //创建HttpHost对象 
        HttpHost[] myHttpHost = new HttpHost[7]; 
        myHttpHost[0]=new HttpHost("10.121.10.1",9200); 
        myHttpHost[1]=new HttpHost("10.121.10.2",9200); 
        myHttpHost[2]=new HttpHost("10.121.10.3",9200); 
        myHttpHost[3]=new HttpHost("10.121.10.4",9200); 
        myHttpHost[4]=new HttpHost("10.121.10.5",9200); 
        myHttpHost[5]=new HttpHost("10.121.10.6",9200); 
        myHttpHost[6]=new HttpHost("10.121.10.7",9200); 
 
        //创建RestClientBuilder对象 
        RestClientBuilder myRestClientBuilder=RestClient.builder(myHttpHost); 
 
        //创建RestHighLevelClient对象 
        RestHighLevelClient myclient=new RestHighLevelClient(myRestClientBuilder); 
 
        log.info("RestClientUtil intfo create rest high level client successful!"); 
 
        return myclient; 
    } 
} 
 

数据校验Hive表和ElasticSearch工具类实现

utils/EsQueryTest.java

  数据校验Hive表和ElasticSearch工具类实现有两个,ES的check查询主要在utils/EsQueryTest.java文件下,具体设计的内容如下;

package cn.focusmedia.esapp.utils; 
 
import cn.focusmedia.esapp.feign.EsClient; 
import lombok.extern.slf4j.Slf4j; 
import org.elasticsearch.action.search.SearchRequest; 
import org.elasticsearch.action.search.SearchResponse; 
import org.elasticsearch.client.RequestOptions; 
import org.elasticsearch.client.RestHighLevelClient; 
import org.elasticsearch.index.query.QueryBuilders; 
import org.elasticsearch.search.SearchHit; 
import org.elasticsearch.search.aggregations.AggregationBuilders; 
import org.elasticsearch.search.aggregations.metrics.Sum; 
import org.elasticsearch.search.aggregations.metrics.ValueCount; 
import org.elasticsearch.search.builder.SearchSourceBuilder; 
import org.junit.Test; 
 
import java.io.IOException; 
 
@Slf4j 
 public  class EsQueryTest 
{
    
    static RestHighLevelClient myClient= EsClient.getClient();  //获取操作ES的 
 
    //获取ES数据更新的时间 
    @Test 
    public static String getEventDay(String index,String fieldname) throws IOException 
    {
    
 
        //1。创建request对象,查询用的对象一般都是SearchRequest对象 
        SearchRequest request = new SearchRequest(index); 
 
        //2,指定查询条件,依赖查询条件的对象SearchSourceBuilder的对象 
        //虽然matchall没有条件,但是还是要指定查询类型为matchall 
        SearchSourceBuilder builder = new SearchSourceBuilder(); 
        builder.query(QueryBuilders.matchAllQuery()); 
        builder.size(1);   //ES默认只查询10条记录,即默认size=10,如果需要查询更多,则需要加到size的值 
        request.source(builder); 
 
        //3. 执行查询 
        SearchResponse response = myClient.search(request, RequestOptions.DEFAULT); 
 
        //4. 获取到_source中的数据,并展示 
        //遍历输出每个文档 
        SearchHit[] hits = response.getHits().getHits(); 
        String event_day = hits[0].getSourceAsMap().get(fieldname).toString(); 
        log.info("the event_day date is :"+event_day); 
        return event_day; 
 
    } 
     
    //获取某索引数据行统计 
    public static long getESCount( String index,String fieldname) throws IOException {
    
        //创建SearchRequest 
        SearchRequest mySearchRequest =new SearchRequest(index); 
 
        //指定使用的聚合查询方式 
        SearchSourceBuilder mySearchSourceBuilder=new SearchSourceBuilder(); 
        mySearchSourceBuilder.aggregation(AggregationBuilders.count("agg").field(fieldname)); 
 
        mySearchRequest.source(mySearchSourceBuilder); 
 
        //执行查询 
        SearchResponse mySearchResponse= myClient.search(mySearchRequest, RequestOptions.DEFAULT); 
 
        //获取返回结果 
        ValueCount agg=mySearchResponse.getAggregations().get("agg"); 
        long value =agg.getValue(); 
        log.info("the index: "+index+" document count is "+value+" ."); 
        return value; 
    } 
 
    //指定某索引的某类型的sum统计,即关键性指标的总和统计,关键性指标根据业务需求来定 
    public static long getESSumField(String index,String fieldname) throws IOException {
    
        //创建SearchRequest 
        SearchRequest mySearchRequest =new SearchRequest(index); 
 
        //指定使用的聚合查询方式 
        SearchSourceBuilder mySearchSourceBuilder=new SearchSourceBuilder(); 
        mySearchSourceBuilder.aggregation(AggregationBuilders.sum("agg").field(fieldname.toString())); 
 
        mySearchRequest.source(mySearchSourceBuilder); 
 
        //执行查询 
        SearchResponse mySearchResponse= myClient.search(mySearchRequest, RequestOptions.DEFAULT); 
 
        //获取返回结果 
        Sum agg=mySearchResponse.getAggregations().get("agg"); 
        long value =(long)agg.getValue(); 
        log.info("the index: "+index+" fieldname: "+fieldname+" summary is "+value+" ."); 
        return value; 
    } 
 
    } 
 

utils/EsUtils.java

  数据校验Hive表和ElasticSearch工具类实现有两个,Spark校验Hive的查询主要在utils/EsUtils.java文件下,具体设计的内容如下;

package cn.focusmedia.esapp.utils; 
 
import cn.focusmedia.esapp.feign.EsClient; 
import cn.focusmedia.esapp.feign.SparkClient; 
import lombok.extern.slf4j.Slf4j; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.sql.*; 
 
import org.elasticsearch.client.RestHighLevelClient; 
import org.apache.spark.sql.Row; 
import org.junit.Test; 
 
import java.util.List; 
 
 
@Slf4j 
public class EsUtils 
{
    
    static RestHighLevelClient myClient= EsClient.getClient();  //获取操作ES的 
    static SparkSession spark = SparkClient.getSpark(); 
 
 
    public static long getHiveTableCount(String tablename) 
    {
    
        Dataset<Row> table = spark.table(tablename).repartition(60); 
        long count = table.count(); 
        log.info("the hive table :"+tablename+" count is "+count+" ."); 
        return count; 
    } 
 
    @Test 
    public static long getHiveSumFieled(String tablename,String columnName) 
    {
    
        Dataset<Row> table = spark.table(tablename).repartition(60); 
         
        //java匿名内部类实现开始 
        Dataset<Row> sum = table.groupBy().sum(columnName).as(columnName); 
        JavaRDD<Long> map = sum.javaRDD().map(new Function<Row, Long>() {
    
            @Override 
            public Long call(Row row) throws Exception {
    
                return Long.valueOf(row.get(0).toString()); 
            } 
        }); 
        List<Long> collect = map.collect(); 
        Long sumC=0l; 
        for (Long aLong : collect) {
    
            sumC+=aLong; 
        } 
        //java匿名内部类实现结束 
 
       /* 
        //java匿名内部类实现也可以将dataframe强转成Row[],然后读取Row[]具体对象的单元,具体参考如下 
        Row[] collect = (Row[])df.collect();//获取第一行 得到的是一个2维度数组 
        System.out.println(collect[0].get(2).toString()); 
       */ 
        return sumC; 
 
 
    } 
 
} 
 

钉钉报警工具类的实现

  钉钉报警工具类的实现实现主要在utils/SendDingTalkRisk.java文件下,由于调度服务器可能存在连接外网失败的可能,而钉钉又是阿里的SDK,要想收到通知,必须要连接外网,所以要涉及代理服务器来代替调度服务器发送钉钉,且是https的代理请求,具体代码如下;
  注意:这里的钉钉报警是在群内(3人及以上组成)的机器人作为报警,可以@相关人员,至于添加钉钉机器人的配置,可以参考博客调度Job报错或异常触发钉钉报警(Python 3.x版);

package cn.focusmedia.esapp.utils; 
 
import cn.hutool.json.JSONObject; 
import org.apache.http.HttpHost; 
import org.apache.http.client.methods.HttpPost; 
import org.apache.http.config.Registry; 
import org.apache.http.config.RegistryBuilder; 
import org.apache.http.conn.socket.ConnectionSocketFactory; 
import org.apache.http.conn.socket.PlainConnectionSocketFactory; 
import org.apache.http.conn.ssl.SSLConnectionSocketFactory; 
import org.apache.http.entity.StringEntity; 
import org.apache.http.impl.client.CloseableHttpClient; 
import org.apache.http.impl.client.HttpClientBuilder; 
import org.apache.http.impl.client.HttpClients; 
import org.apache.http.impl.conn.DefaultProxyRoutePlanner; 
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; 
 
import javax.net.ssl.SSLContext; 
import javax.net.ssl.TrustManager; 
import javax.net.ssl.X509TrustManager; 
import java.io.IOException; 
import java.security.KeyManagementException; 
import java.security.NoSuchAlgorithmException; 
import java.security.cert.CertificateException; 
import java.util.Arrays; 
 
public class SendDingTalkRisk 
{
    
    public static HttpClientBuilder proxy(String hostOrIP, int port) {
    
        // 依次是代理地址,代理端口号,协议类型 
        HttpHost proxy = new HttpHost(hostOrIP, port, "http"); 
        DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(proxy); 
        return HttpClients.custom().setRoutePlanner(routePlanner); 
    } 
 
    /** 
     * 绕过验证 
     * 
     * @return 
     * @throws NoSuchAlgorithmException 
     * @throws KeyManagementException 
     */ 
    public static SSLContext createIgnoreVerifySSL() throws NoSuchAlgorithmException, KeyManagementException {
    
        SSLContext sc = SSLContext.getInstance("SSLv3"); 
 
        // 实现一个X509TrustManager接口,用于绕过验证,不用修改里面的方法 
        X509TrustManager trustManager = new X509TrustManager() {
    
            @Override 
            public void checkClientTrusted( 
                    java.security.cert.X509Certificate[] paramArrayOfX509Certificate, 
                    String paramString) throws CertificateException {
    
            } 
            @Override 
            public void checkServerTrusted( 
                    java.security.cert.X509Certificate[] paramArrayOfX509Certificate, 
                    String paramString) throws CertificateException {
    
            } 
            @Override 
            public java.security.cert.X509Certificate[] getAcceptedIssuers() {
    
                return null; 
            } 
        }; 
        sc.init(null, new TrustManager[]{
   trustManager}, null); 
        return sc; 
    } 
 
    public static void sendDingTalkRisk(String sendMessage,boolean isAtAll,String[] phone) throws IOException, IOException, KeyManagementException, NoSuchAlgorithmException 
    {
    
        SSLContext sslcontext = createIgnoreVerifySSL();//绕过证书验证,处理https请求 
           // 设置协议http和https对应的处理socket链接工厂的对象 
        Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() 
         .register("http", PlainConnectionSocketFactory.INSTANCE) 
         .register("https", new SSLConnectionSocketFactory(sslcontext)) 
         .build(); 
        PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(socketFactoryRegistry); 
        HttpClients.custom().setConnectionManager(connManager); 
        //指定代理服务器 
        CloseableHttpClient httpClient= proxy("10.210.78.25", 3128).setConnectionManager(connManager).build(); 
        //指定钉钉群的机器定url 
         
        HttpPost httpPost=new HttpPost("https://oapi.dingtalk.com/robot/send?access_token=9dd43c7484e9eddf929390bsq67c1bb68dcaee1asq634a923b0c7ab8bf69927c08"); 
        //HttpPost httpPost=new HttpPost("https://oapi.dingtalk.com/robot/send?access_token=b6d438cb3af76732b7f3de51693c7996a50b4e51f5e24f1cf99cacc03f5e2ef0"); 
 
        httpPost.addHeader("Content-Type", "application/json; charset=utf-8"); 
        JSONObject bodys = new JSONObject(); 
        bodys.put("msgtype","text"); 
        JSONObject text=new JSONObject(); 
 
        text.put("content",sendMessage); 
        bodys.put("text",text); 
        JSONObject at=new JSONObject(); 
        at.put("isAtAll",isAtAll); 
        at.put("atMobiles", Arrays.asList(phone)); 
        bodys.put("at",at); 
 
        StringEntity se = new StringEntity(bodys.toJSONString(4), "utf-8"); 
        httpPost.setEntity(se); 
        httpClient.execute(httpPost); 
    } 
 
    public static void main(String[] args) throws IOException, NoSuchAlgorithmException, KeyManagementException 
    {
    
        String sendMessage ="数仓小D:测试文本消息!"; 
        boolean isAtAll=false; 
        //后面的号码是需要@的人手机 
        SendDingTalkRisk.sendDingTalkRisk(sendMessage,isAtAll, new String[]{
   "130****896"}); 
    } 
} 
 

主函数调用工具类实现整体功能

  主函数的实现的 功能顺序下所示;

  1. 利用Spark获取Hive的关键指标统计,存入变量
  2. 利用Java High Level Client操作ES,统计关键性指标,存入变量;
  3. 查看ES数据的更新时间是否符合预期,且步骤1和步骤2的同一关键性指标进行等值对比,都符合要求的话钉钉输出校验结果,报喜成功;否则,亦输出校验结果,报忧,通知人hurry up去处理!

  具体代码如下的app.java文件:

   注意,我这里为了方便,就把要求的变量直接定义到了主函数内,其实真正的项目,最好能把这些变量剥离出来放在一个xml文件内作为配置文件,然后在主函数内去读取这个xml文件 ,这样才是最好的,因为后续新的Hive表要求抽取到ES的话,需要修改相应的xml配置文件即可,非常的方便;这里重点在实现功能,易读易懂,锦上添花的事看大家自己吧!

package cn.focusmedia.esapp; 
 
import cn.focusmedia.esapp.utils.EsQueryTest; 
import cn.focusmedia.esapp.utils.EsUtils; 
import cn.focusmedia.esapp.utils.SendDingTalkRisk; 
import lombok.extern.slf4j.Slf4j; 
 
import java.text.SimpleDateFormat; 
import java.util.Calendar; 
 
/** 
 * Hello world! 
 * 
 */ 
@Slf4j 
public class App  
{
    
    public void dataCheck() throws Exception {
    
        String index="media_all_rs_v0"; 
        String hiveTable="dw.app_rs_media_galaxy_entrance_key"; 
 
        String col_entrance_key="entrance_key"; 
        String col_lcd_amount="lcd_amount"; 
        String col_smart_amount="smart_amount"; 
        String col_frame_amount="frame_amount"; 
        String col_event_day="event_day"; 
 
        //hive table columns DQ 
        long sparkCount= EsUtils.getHiveTableCount(hiveTable); 
        long sparkSmartAmount =EsUtils.getHiveSumFieled(hiveTable,col_smart_amount); 
        long sparkFrameAmount =EsUtils.getHiveSumFieled(hiveTable,col_frame_amount); 
        long sparkLcdAmount =EsUtils.getHiveSumFieled(hiveTable,col_lcd_amount); 
 
        //ES index field DQ 
        long esCount = EsQueryTest.getESCount(index, col_entrance_key); 
        long esSmartAmount = EsQueryTest.getESSumField(index, col_smart_amount); 
        long esFrameAmount = EsQueryTest.getESSumField(index, col_frame_amount); 
        long esLcdAmount = EsQueryTest.getESSumField(index, col_lcd_amount); 
        String esEventDay = EsQueryTest.getEventDay(index,col_event_day); 
 
        int flat=0; 
        String dqCount=""; 
        if(sparkCount==esCount) 
        {
    
            dqCount = "数仓门洞数:" + sparkCount + ";\nES门洞数:" + esCount + ";一致!"; 
            flat+=0; 
        } 
        else {
    
            dqCount = "数仓门洞数:" + sparkCount + ";\nES门洞数:" + esCount + ";不一致!"; 
            flat+=1; 
        } 
 
        String smartAmount; 
        if(sparkSmartAmount==esSmartAmount) {
    
            smartAmount = "数仓智能屏数:" + sparkSmartAmount + ";\nES智能屏数:" + esSmartAmount + ";一致!"; 
            flat+=0; 
        } 
        else {
    
            smartAmount  = "数仓智能屏数:" + sparkSmartAmount + ";\nES智能屏数:" + esSmartAmount + ";不一致!"; 
            flat+=1; 
        } 
 
 
        String frameAmount; 
        if(sparkFrameAmount==esFrameAmount) {
    
            frameAmount = "数仓框架数:" + sparkFrameAmount + ";\nES框架数:" + esFrameAmount + ";一致!"; 
            flat+=0; 
        } 
        else {
    
            frameAmount = "数仓框架数:" + sparkFrameAmount + ";\nES框架数:" + esFrameAmount + ";不一致!"; 
            flat+=1; 
        } 
 
 
        String lcdAmount; 
        if(sparkLcdAmount==esLcdAmount) {
    
            lcdAmount = "数仓液晶数:" + sparkLcdAmount + ";\nES液晶数:" + esLcdAmount + ";一致 !"; 
            flat+=0; 
        } 
        else {
    
            lcdAmount = "数仓液晶数:" + sparkLcdAmount + ";\nES液晶数:" + esLcdAmount + ";不一致 !"; 
            flat+=1; 
        } 
 
        Calendar calendar = Calendar.getInstance(); 
        SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMdd"); 
        calendar.add(Calendar.DAY_OF_MONTH,-1); 
        String date = fmt.format(calendar.getTime()); 
 
        String eventDay; 
 
        if(date.equals(esEventDay)) 
        {
    
            eventDay = "ES索引:" + index + ";数据日期:" + esEventDay + ";"; 
            flat+=0; 
        } 
        else {
    
            eventDay = "ES索引:" + index + ";数据日期:" + esEventDay + ";"; 
            flat+=1; 
        } 
 
        String dataCheck=""; 
        if(flat==0) 
        {
    
            dataCheck="数据更新成功!"; 
        } 
        else {
    
            dataCheck="数据更新有误,请及时处理!"; 
 
        } 
 
//        log.info(dqCount+""); 
//        log.info(smartAmount+""); 
//        log.info(frameAmount+""); 
//        log.info(lcdAmount+""); 
//        log.info(eventDay+""); 
//        log.info(dataCheck); 
 
        String dingContext="数仓小D:\r\n"+dqCount+"\r\n"+smartAmount+"\r\n"+frameAmount+"\r\n"+lcdAmount+"\r\n"+eventDay+"\r\n"+dataCheck+"\r\n"; 
        log.info(dingContext); 
 
 
        String[] phone={
   "130****0896","136****8552"}; 
        SendDingTalkRisk.sendDingTalkRisk(dingContext, false, phone); 
 
    } 
 
    public static void main( String[] args ) throws Exception {
    
        App app = new App(); 
        app.dataCheck(); 
 
    } 
} 
 

打成Jar包并部署

  将调试无误的项目打成Jar包,如果还不会打Jar包,可以参考博客IntelliJ IDEA将代码打成Jar包的方式,这里我打成的Jar包名字为DQ_hive_and_es_galaxy.jar;
  DQ_hive_and_es_galaxy.jar上传到hdfs的/app/hive_to_es_galaxy/dq_check_jar/DQ_hive_and_es_galaxy.jar路径下,然后写一个spark-submit调用的shell脚本DQ_hive_and_es_galaxy.sh,具体如下:

#!/bin/bash 
 
cur_dir=`pwd` 
 
spark-submit --master yarn --deploy-mode cluster --executor-memory 8G --executor-cores 5 --num-executors 4 --queue etl --conf spark.kryoserializer.buffer.max=256m --conf spark.kryoserializer.buffer=64m  --class cn.focusmedia.esapp.App  hdfs:///app/hive_to_es_galaxy/dq_check_jar/DQ_hive_and_es_galaxy.jar 
dq_check_flag=$? 
if [ $dq_check_flag -eq 0 ];then 
    echo "data quality check run succeed!" 
 
else 
    echo "data quality check run failed!" 
    ## 以下内容是我们设计的错误报错的钉钉报警,这里可以改成你们自己的报警措施 
    cd ${cur_dir}/../src/ding_talk_warning_report_py/main/ 
    python3 ding_talk_with_agency.py 235 
    exit 3 
fi 
 
 
 
 

调度shell脚本

  最后就是将这个DQ_hive_and_es_galaxy.sh脚本调度起来,如用Azkaban调度,设置自己需求的调度频率;
  注意,这个项目打包后,最好另起一个进程调用,并且开始时间为文章1或者2最大预估的结束时间后的10分钟后调用,这样可以校验两种情形:

  1.文章1或者2的项目被调度了,但是造成了数据异常,可以捕捉到;
  2.文章1或者2的项目压根就没起来,即超时了,造成了数据异常,亦可被捕捉!

效 果

  具体效果如图2所示;
在这里插入图片描述

图2 钉钉校验结果报警

总 结

  有了项目实战——钉钉报警校验ElasticSearch和Hive数据仓库内的数据质量(Java版本),本文目录的文章1和文章2的项目ETL才算完整安全,不然总是少点什么的,万一数据不对,又要被你的需求方拉入小黑屋教育不是,还是好好做好数据校验才是王道!


本文参考链接:https://blog.csdn.net/LXWalaz1s1s/article/details/109266385
评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!