前言


一、Transformer是什么?

在数据同步、传输过程中,存在用户对于数据传输进行特殊定制化的需求场景,包括裁剪列、转换列等工作,可以借助ETL的T过程实现(Transformer)。DataX包含了完整的E(Extract)、T(Transformer)、L(Load)支持。

运行模型


更多详情参考官方介绍: 官方介绍

二、使用步骤

1. 开发流程

    1.从 Github 上 clone DataX 项目源码到本地,在根目录下找到 transformer 文件夹;
    2.在 com.alibaba.datax.transport.transformer 路径下找到transformer,
继承 Transformer 并参考已有的transformer类实现接口,按你的需求接收参数,用于从 job 配置文件接收命令;
    3.在 core\src\main\java\com\alibaba\datax\core\transport\transformer 目录的 TransformerRegistry 类中注册你编写的 transformer 类
    4. mvn打包编译,调试 mvn clean package -DskipTests assembly:assembly

2.示例

代码如下(示例):

  1. 以下以非对称加密做个演示,继承Transformer 编写自己的处理类
    datax/transformer/src/main/java/com/alibaba/datax/transformer/AESTransformer.java
public class AESTransformer extends Transformer {

    private static final Logger LOG = LoggerFactory.getLogger(AESTransformer.class);
    public static final String ENCRYPT_KEY = "种子key";

    int columnIndex;

    public AESTransformer() {
        setTransformerName("dx_aes");
        LOG.info("Using AES preserve masker");
    }

    @Override
    public Record evaluate(Record record, Object... paras) {
        try {
            if (paras.length < 1) {
                throw new RuntimeException("dx_aes transformer缺少参数");
            }
            columnIndex = (Integer) paras[0];
        } catch (Exception e) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
        }
        Column column = record.getColumn(columnIndex);
        try {
            String oriValue = column.asString();
            if (oriValue == null) {
                return record;
            }
            if(column.getType() == Column.Type.STRING) {
                EncryptUtil encryptUtil = EncryptUtil.getInstance();
                String newValue = encryptUtil.AESencode(oriValue, ENCRYPT_KEY);
                record.setColumn(columnIndex, new StringColumn(newValue));
            }
        } catch (Exception e) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
        }
        return record;
    }
}
  1. 注册 (节选部分源码)
/**
 * no comments.
 * Created by liqiang on 16/3/3.
 */
public class TransformerRegistry {

    private static final Logger LOG = LoggerFactory.getLogger(TransformerRegistry.class);
    private static Map<String, TransformerInfo> registedTransformer = new HashMap<String, TransformerInfo>();

    static {
        /**
         * add native transformer
         * local storage and from server will be delay load.
         */

        registTransformer(new SubstrTransformer());
        registTransformer(new PadTransformer());
        registTransformer(new ReplaceTransformer());
        registTransformer(new FilterTransformer());
        registTransformer(new GroovyTransformer());
        registTransformer(new AESTransformer()); // 注册自己的类

    }
  1. 重新打包部署
  2. 使用transformer示例JSON
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "column" : [
                                       "id"                   
                                      ,"les_id"               
                                      ,"grade_id"             
                                      ,"edition_id"           
                                      ,"subject_id"           
                                      ,"course_system_first_id"
                                      ,"course_system_second_id"
                                      ,"course_system_third_id"
                                      ,"course_system_four_id"
                                      ,"custom_points"        
                                      ,"deleted"              
                                      ,"created_at"           
                                      ,"tea_id"               
                                      ,"stu_id"               
                                      ,"les_uid"              
                                      ,"updated_at"           
                                      ,"pt"
                                ],
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://xxxx:3306/test?useUnicode=true&characterEncoding=utf8"],
                                "table": ["xxx"]
                                  
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {"name":"id"        , "type":"int"},
                            {"name":"les_id"    , "type":"int"},
                            {"name":"grade_id"  , "type":"int"},
                            {"name":"edition_id", "type":"int"},
                            {"name":"subject_id", "type":"int"},
                            {"name":"course_system_first_id" , "type":"int"},
                            {"name":"course_system_second_id", "type":"int"},
                            {"name":"course_system_third_id" , "type":"int"},
                            {"name":"course_system_four_id"  , "type":"int"},
                            {"name":"custom_points", "type":"string"},
                            {"name":"deleted"      ,"type":"TINYINT"},
                            {"name":"created_at"   ,"type":"string"},
                            {"name":"tea_id"       ,"type":"int"},
                            {"name":"stu_id",       "type":"int"},
                            {"name":"les_uid"      ,"type":"string"},
                            {"name":"updated_at"   ,"type":"string"}
  
                        ],
                        "defaultFS": "hdfs://nameservice1",
                        "hadoopConfig":{
                                 "dfs.nameservices": "nameservice1",
                                 "dfs.ha.namenodes.nameservice1": "namenode286,namenode36",
                                 "dfs.namenode.rpc-address.nameservice1.namenode286": "xxxx:8020",
                                 "dfs.namenode.rpc-address.nameservice1.namenode36": "xxxx:8020",
                                 "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                         },
                         "haveKerberos": "true",
                         "kerberosKeytabFilePath": "/home/xx/kerberos/xxx.keytab",
                         "kerberosPrincipal":"xxx@FAYSON.COM",
                         "encoding": "UTF-8",
                         "fileType": "orc",
                         "fileName": "xxx",
                         "path": "/user/hive/warehouse/ods.db/xxxxx/pt=2020-01-20",
                         "writeMode": "append", // append & overwrite
                         "fieldDelimiter" :"\u0001"               
                       }
                },
                // 加密控制对应的字段索引号
                "transformer": [
                    {
                        "name": "dx_aes",
                        "parameter":
                            {
                            "columnIndex":9,
                            "paras":[""]
                            }
                    },
                     {
                        "name": "dx_aes",
                        "parameter":
                            {
                            "columnIndex":11,
                            "paras":[""]
                            }
                    }
                ]
            }
        ],
        // 优化相关,暂时给默认值
        "setting": {
            "speed": {
                "channel": "5"
            },
             "errorLimit": {
                "record": 0
            }
        }
    }
}

总结

本文作为示例主要讲解了DataX作为ETL工具中的T的转换部分,他能做的事情有很多比如脏数据清理,UDF转换、数据的加密解密处理等,在实际使用过程中还是有很多的用处的。后面的章节中将从源码的角度解开其“神秘”面纱,如果你有更好的想法欢迎和我一同分享。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐