DataX二次开发——新增HiveReader插件

科技资讯 投稿 7500 0 评论

DataX二次开发——新增HiveReader插件

一、研发背景

二、HiveReader插件介绍

    hivereader插件比较简单,共有三个类,两个配置文件。其中:

    HiveReader:实现DataX框架核心方法,是具体逻辑。
  • HiveReaderErrorCode:继承了DataX框架的ErrorCode类,是用于统一异常处理DataXException类中调用,具体是新增了一个枚举值。
  • HiveConnByKerberos:是在检测到Hive具备Kerberos认证要求时,进行认证的工具类。
  • plugin.json:DataX插件固定的配置文件,用于指定插件的入口类。
  • plugin_job_template.json:二次开发插件,一般需要提供一下具体的使用方式,此json文件即为HiveReader插件的配置方式说明。

   2.1 HiveReader类

    首先是HiveReader类,需要注意的是一些常量或枚举值,需要自行添加,其中DataBaseType枚举类中,需要新增Hive枚举项并添加Hive的驱动类全路径,具体见注释,另外就是Kerberos认证相关的几个配置,一个是keytab的路径,一个是krb5.conf的路径,另外一个是principle的值。

package com.alibaba.datax.plugin.reader.hivereader; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.rdbms.reader.CommonRdbmsReader; import com.alibaba.datax.rdbms.util.DataBaseType; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.security.authentication.util.KerberosName; import java.lang.reflect.Field; import java.util.List; import static com.alibaba.datax.common.base.Constant.DEFAULT_FETCH_SIZE;//2048,可根据条件自己取值 import static com.alibaba.datax.common.base.Key.FETCH_SIZE; // 参数名:"fetchSize" @Slf4j public class HiveReader extends Reader { //此处需现在com.sinosig.plumber.rdbms.util.DataBaseType枚举类中添加Hive类型,内容为:Hive("hive2", "org.apache.hive.jdbc.HiveDriver", private static final DataBaseType DATABASE_TYPE = DataBaseType.Hive; public static class Job extends Reader.Job { private Configuration originalConfig = null; private CommonRdbmsReader.Job commonRdbmsReaderJob; @Override public void init( { this.originalConfig = getPluginJobConf(; Boolean haveKerberos = this.originalConfig.getBool(Key.HAVE_KERBEROS, false; if (haveKerberos { log.info("检测到kerberos认证,正在进行认证"; org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration(; String kerberosKeytabFilePath = this.originalConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH; String kerberosPrincipal = this.originalConfig.getString(Key.KERBEROS_PRINCIPAL; String krb5Path = this.originalConfig.getString(Key.KRB5_CONF_FILE_PATH; hadoopConf.set("hadoop.security.authentication", "kerberos"; hadoopConf.set("hive.security.authentication", "kerberos"; hadoopConf.set("hadoop.security.authorization", "true"; System.setProperty("java.security.krb5.conf",krb5Path; refreshConfig(; HiveConnByKerberos.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf,krb5Path; } this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE; this.originalConfig = commonRdbmsReaderJob.init(originalConfig; } @Override public void preCheck( { this.commonRdbmsReaderJob.preCheck(originalConfig, DATABASE_TYPE; } @Override public List<Configuration> split(int adviceNumber { return this.commonRdbmsReaderJob.split(originalConfig, adviceNumber; } @Override public void post( { this.commonRdbmsReaderJob.post(originalConfig; } @Override public void destroy( { this.commonRdbmsReaderJob.destroy(originalConfig; } } public static class Task extends Reader.Task { private Configuration readerSliceConfig; private CommonRdbmsReader.Task commonRdbmsReaderTask; @Override public void init( { this.readerSliceConfig = getPluginJobConf(; this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(, getTaskId(; this.commonRdbmsReaderTask.init(this.readerSliceConfig; } @Override public void startRead(RecordSender recordSender { int fetchSize = this.readerSliceConfig.getInt(FETCH_SIZE, DEFAULT_FETCH_SIZE; this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(, fetchSize; } @Override public void post( { this.commonRdbmsReaderTask.post(readerSliceConfig; } @Override public void destroy( { this.commonRdbmsReaderTask.destroy(readerSliceConfig; } } /** 刷新krb内容信息 */ public static void refreshConfig( { try { sun.security.krb5.Config.refresh(; Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm"; defaultRealmField.setAccessible(true; defaultRealmField.set( null, org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm(; // reload java.security.auth.login.config javax.security.auth.login.Configuration.setConfiguration(null; } catch (Exception e { log.warn( "resetting default realm failed, current default realm will still be used.", e; } } }

 2.2 HiveConnByKerberos类

    HiveConnByKerberos类比较简单,是一个通用的Kerberos认证的接口。

package com.alibaba.datax.plugin.reader.hivereader; import com.alibaba.datax.common.exception.PlumberException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.UserGroupInformation; @Slf4j public class HiveConnByKerberos { public static void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5conf { System.setProperty("java.security.krb5.conf",krb5conf; if (StringUtils.isNotBlank(kerberosPrincipal && StringUtils.isNotBlank(kerberosKeytabFilePath { UserGroupInformation.setConfiguration(hadoopConf; try { UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath; } catch (Exception e { log.error("kerberos认证失败"; String message = String.format("kerberos认证失败,请检查 " + "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]", kerberosKeytabFilePath, kerberosPrincipal; e.printStackTrace(; throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e; } } } }

 

2.3 HiveReaderErrorCode类

package com.alibaba.datax.plugin.reader.hivereader;

import com.alibaba.datax.common.spi.ErrorCode;

public enum HiveReaderErrorCode
        implements ErrorCode
{
    KERBEROS_LOGIN_ERROR("HiveReader-13", "KERBEROS认证失败";

    private final String code;
    private final String description;

    HiveReaderErrorCode(String code, String description
    {
        this.code = code;
        this.description = description;
    }

    @Override
    public String getCode(
    {
        return this.code;
    }

    @Override
    public String getDescription(
    {
        return this.description;
    }

    @Override
    public String toString(
    {
        return String.format("Code:[%s], Description:[%s]. ", this.code, this.description;
    }
}

2.4 plugin.json文件

{
  "name": "hivereader",
  "class": "com.alibaba.datax.plugin.reader.hivereader.HiveReader",
  "description": "Retrieve data from Hive via jdbc",
  "developer": "wxm"
}

2.5 plugin_job_template.json文件

    这块需要注意的一个问题是,如果Kerberos认证的Hive连接URL有两种方式,如果是基于zookeeper的方式,则需保证运行DataX服务的节点与zookeeper节点网络是打通的,并且一定不要忘记写上具体的Hive库名。

{ "name": "hivereader", "parameter": { "column": [ "*" ], "username": "hive", "password": "",
"preSql":"show databases;", "connection": [ { "jdbcUrl": [ "jdbc:hive2://localhost:10000/default;principal=hive/_HOST@EXAMPLE.COM" ], "table": [ "hive_reader" ] } ], "where": "logdate='20211013'" , "haveKerberos": true, "kerberosKeytabFilePath": "/etc/security/keytabs/hive.headless.keytab", "kerberosPrincipal": "hive@EXAMPLE.COM" } }

 

编程笔记 » DataX二次开发——新增HiveReader插件

赞同 (37) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽