目 录

  1. 项目实战——将Hive表的数据直接导入ElasticSearch
      此篇文章不用写代码,简单粗暴,但是相对没有那么灵活;底层采用MapReduce计算框架,导入速度相对较慢!

  2. 项目实战——Spark将Hive表的数据写入ElasticSearch(Java版本)
      此篇文章需要Java代码,实现功能和篇幅类似,直接Java一站式解决Hive内用Spark取数,新建ES索引,灌入数据,并且采用ES别名机制,实现ES数据更新的无缝更新,底层采用Spark计算框架,导入速度相对文章1的做法较快的多!;

  3. 项目实战——钉钉报警验证ElasticSearch和Hive数据仓库内的数据质量(Java版本)
      此篇文章主要选取关键性指标,数据校验数据源Hive和目标ES内的数据是否一致;

  4. 项目实战——Spark将Hive表的数据写入需要用户名密码认证的ElasticSearch(Java版本)
      此篇文章主要讲述如何通过spark将hive数据写入带账号密码权限认证的ElasticSearch 内;

  5. 项目实战(生产环境部署上线)——参数配置化Spark将Hive表的数据写入需要用户名密码认证的ElasticSearch(Java版本))
      此篇文章主要讲述如何通过spark将hive数据写入带账号密码权限认证的ElasticSearch 内,同时而是,spark,es建索引参数配置化,每次新增一张表同步到es只需要新增一个xml配置文件即可,也是博主生产环境运用的java代码,弥补下很多老铁吐槽方法4的不足。

  综述:
  1.如果感觉编码能力有限,又想用到Hive数据导入ElasticSearch,可以考虑文章1;
  2.如果有编码能力,个人建议采用文章2和文章3的组合情况(博主推荐),作为离线或者近线数据从数据仓库Hive导入ElasticSearch的架构方案,并且此次分享的Java代码为博主最早实现的版本1,主要在于易懂,实现功能,学者们可以二次加工,请不要抱怨代码写的烂;
  3.如果是elasticsearch是自带账号密码权限认证的,如云产品或者自己设置了账号密码认证的,那么办法,只能用文章4了;
  4.如果部署上线,还是要看文章5。

  • 本人Hive版本:2.3.5

  • 本人ES版本:7.7.1

  • 本人Spark版本:2.3.3

    背 景

      ElasticSearch像是可以配置用户名,密码认证的,特别是云产品,公司如果买的ElasticSearch的云服务,那必然是带用户名密码认真的,即,当你访问你的ES时,默认一般是9200端口时会弹出如图1的提示,需要你填写用户名密码;

在这里插入图片描述

图1 访问ES时提示需要用户名密码

解决方案

ping通ES的机器

  在你要访问的源机器ping通需要目标端的es机器ip,ping不通,找网管;

telnet通ES的机器的端口

  在你要访问的源机器telnet通需要目标端的es机器ip和端口,telnet不通,找网管;

拿到用户名和密码

  既然是用户名和密码认证,当然需要向管理员拿到账号和密码,拿到用户名和密码后,先去测试下该用户名能否登陆es,并且能否读写权限,读写,创建index(非必要),可以在kibana上验证,认证访问,最好在你跑程序的地方,跑一下RESTFul风格的代码,如下(linux环境shell命令行内直接跑);

# 用户名密码有转移字符,记得前面加\转移,如abc!123,写成abc\!123 
# 用户名密码有转移字符,记得前面加\转移,如abc!123,写成abc\!123 
# 用户名密码有转移字符,记得前面加\转移,如abc!123,写成abc\!123 
curl -k -u user:password -XGET http://es-ip:9200/your_index/_search 

  windows cmd下:

# 注意用户名密码后面是@符号,用户名密码有转译字符可不转译,别乱搞 
# 注意用户名密码后面是@符号,用户名密码有转译字符可不转译,别乱搞 
# 注意用户名密码后面是@符号,用户名密码有转译字符可不转译,别乱搞 
curl "http://user:password@es-ip:9200/your_index/_search" 

  如果能获取到数据,说明网络,账号一切都Ok,加上kibana能读写index,说明权限Ok,否则,哪一环出了问题去找到相关的人员解决,准备工作都Ok了,再去写代码,不然代码一直报错,让你怀疑人生;

代码调整

  基础代码和项目实战——Spark将Hive表的数据写入ElasticSearch(Java版本)差不多,这里就不多做累赘,就重点讲讲需要调整的地方;

  • 回顾下不需要账号密码认证的 RestHighLevelClient
 
   public static RestHighLevelClient getClient() 
  {
    
      final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); 
      credentialsProvider.setCredentials(AuthScope.ANY, 
              new UsernamePasswordCredentials("elastic", "FZ_elastic")); 
      //配置集群连接的IP和端口,正式项目是要走配置文件的,这里偷懒下,就写死吧,也方便说明问题,不要骂我代码太烂就行 
      //创建HttpHost对象 
      HttpHost[] myHttpHost = new HttpHost[1]; 
      myHttpHost[0]=new HttpHost("10.156.10.49",9200); 
//        HttpHost[] myHttpHost = new HttpHost[7]; 
//        myHttpHost[0]=new HttpHost("10.156.10.14",9200); 
//        myHttpHost[1]=new HttpHost("10.156.10.12",9200); 
//        myHttpHost[2]=new HttpHost("10.156.10.13",9200); 
//        myHttpHost[3]=new HttpHost("10.156.10.13",9200); 
//        myHttpHost[4]=new HttpHost("10.156.10.10",9200); 
//        myHttpHost[5]=new HttpHost("10.156.10.11",9200); 
//        myHttpHost[6]=new HttpHost("10.156.10.17",9200); 
 
      //创建RestClientBuilder对象 
      RestClientBuilder myRestClientBuilder=RestClient.builder(myHttpHost); 
 
      //创建RestHighLevelClient对象 
      RestHighLevelClient myclient=new RestHighLevelClient(myRestClientBuilder); 
 
      log.info("RestClientUtil intfo create rest high level client successful!"); 
 
      return myclient; 
  } 
  • 需要账号密码认证的 RestHighLevelClient
    public static RestHighLevelClient getClient() 
   {
    
       final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); 
       credentialsProvider.setCredentials(AuthScope.ANY, 
               new UsernamePasswordCredentials("user", "password"));  //es账号密码 
 
       //如果是多台,请仿照不需要账号密码的方式生成列表对象,这里就写一台为例 
       RestClientBuilder builder = RestClient.builder( 
               new HttpHost("10.156.10.49", 9200)) 
               .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    
                   @Override 
                   public HttpAsyncClientBuilder customizeHttpClient( 
                           HttpAsyncClientBuilder httpClientBuilder) {
    
                       httpClientBuilder.disableAuthCaching(); 
                       return httpClientBuilder 
                               .setDefaultCredentialsProvider(credentialsProvider); 
                   } 
               }); 
 
      //创建RestHighLevelClient对象 
      RestHighLevelClient myclient=new RestHighLevelClient(builder); 
 
      log.info("RestClientUtil intfo create rest high level client successful!"); 
 
      return myclient; 
 
   } 
  • 单元测试有用户名和密码认证是否能登陆到es,看看能否获取到es上自创建的index:dw_book的id=1的数据,能拉去下来,说明认证通过,否则,认证失败;
    @Test 
    public  void getClient1() throws IOException {
    
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); 
        credentialsProvider.setCredentials(AuthScope.ANY, 
                new UsernamePasswordCredentials("test_user", "testpaswd1234"));  //es账号密码 
 
        RestClientBuilder builder = RestClient.builder( 
                new HttpHost("es-cn-m7r1vo8m8000ox0sn.elasticsearch.aliyuncs.com", 9200)) 
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    
                    @Override 
                    public HttpAsyncClientBuilder customizeHttpClient( 
                            HttpAsyncClientBuilder httpClientBuilder) {
    
                        httpClientBuilder.disableAuthCaching(); 
                        return httpClientBuilder 
                                .setDefaultCredentialsProvider(credentialsProvider); 
                    } 
                }); 
 
        //创建RestHighLevelClient对象 
        RestHighLevelClient myclient=new RestHighLevelClient(builder); 
 
        log.info("RestClientUtil intfo create rest high level client successful!"); 
 
        System.out.println(myclient); 
 
        String index="dw_book"; 
 
 
 
        //1.创建GetRequest 
        GetRequest getRequest = new GetRequest(index,"1"); 
 
        //2.执行查询 
        GetResponse response = myclient.get(getRequest, RequestOptions.DEFAULT); 
 
 
        //3.输出结果 
        System.out.println(response.getSourceAsMap()); 
 
         
    } 

  通过单元测试,看看是否能把测试的ES index dw_book(自己先在Kibana上建好,然后随便添加点数据进去)数据search出来,能拉取到数据,Ok,证明认证通过了,能准确的返回clinet
  注意:如果你认为这就结束了吗?其实一开始本人也是这么认为的,实际上还不够,如果这样,会出现一个经典的bug,百度,必应了好久才解决……

EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens  
if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance  
without the proper setting 'es.nodes.wan.only 

  原因是原来spark导入ES的配置conf也要做相应的调整;

Spark的conf调整

  回顾下不用认证的spark对象初始化代码;

package cn.focusmedia.esapp.feign; 
 
import org.apache.spark.sql.SparkSession; 
 
public class SparkClient 
{
    
  public static SparkSession getSpark() 
  {
    
      SparkSession spark=SparkSession.builder().appName("SparkToES").enableHiveSupport().getOrCreate(); 
      return spark; 
  } 
 
} 
 
  //数据写入ES 
  public static void tableToEs(String index,String index_auto_create,String es_mapping_id,String table_name,String es_nodes) 
  {
    
      SparkSession spark = SparkClient.getSpark(); 
      Dataset<Row> table = spark.table(table_name).repartition(60); 
      JavaEsSparkSQL.saveToEs(table,index, ImmutableMap.of("es.index.auto.create", index_auto_create,"es.resource", index, "es.mapping.id" ,es_mapping_id,"es.nodes" ,es_nodes)); 
      log.info("Spark data from hive to ES index: "+index+" is over,go to alias index! "); 
      spark.stop(); 
  } 

  这样就可以获取到spark对象去交互hive内的表了,但是需要认证的spark,预加载的配置需要修改下,具体如下;

    //数据写入ES 
 public static void tableToEs(String index,String index_auto_create,String table_name,String es_nodes,String wan_only) 
 {
    
     SparkConf conf=new SparkConf().setMaster("yarn").setAppName("SparkToES"); 
     conf.set("es.nodes",es_nodes); 
     conf.set("es.net.http.auth.user" ,"test_user"); 
     conf.set("es.net.http.auth.pass","testpaswd1234"); 
     conf.set("es.nodes.wan.only",wan_only); 
     conf.set("es.nodes.discovery","false"); 
     conf.set("es.index.auto.create",index_auto_create); 
     conf.set("es.resource",index); 
 
     SparkSession spark = SparkSession 
             .builder() 
             .config(conf) 
             .appName("SparkToES") 
             .enableHiveSupport() 
             .config("spark.sql.hive.convertMetastoreParquet", false) 
             .getOrCreate(); 
 
     Dataset<Row> table = spark.table(table_name).repartition(60); 
     JavaEsSparkSQL.saveToEs(table,index); 
 
//        JavaEsSparkSQL.saveToEs(table,index, ImmutableMap.of("es.index.auto.create", index_auto_create,"es.resource", index, "es.mapping.id" 
//               ,es_mapping_id,"es.nodes" ,es_nodes,"es.nodes.wan.only",wan_only)); 
 
     log.info("Spark data from hive to ES index: "+index+" is over,go to alias index! "); 
     spark.stop(); 
 } 
 

  通过这些配置后,其他的操作和   基础代码和项目实战——Spark将Hive表的数据写入ElasticSearch(Java版本)一样就行了,数据就能完美的写入有账号密码认证的ES内了,大功告成;


本文参考链接:https://blog.csdn.net/LXWalaz1s1s/article/details/110178172
评论关闭
IT虾米网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!