使用DATAX将MySQL导入Phoenix

安装Hadoop、Zookeeper、Hbase 2.X、Phoenix 5.X

$ docker pull mysaber/hadoop 0.1.5

配置Phoenix 5.X

//复制hbase配置到phoenix
$ cp {HBase_Home}/conf/hbase-site.xml {phoenix_home}/bin/
//拷贝phoenix下的把 phoenix-<version>-server.jar、phoenix-core-<version>.jar 拷贝到hbaseServer的lib目录下{Hbase_Home}/lib

下载DataX源码并进行编译

$ git clone https://github.com/alibaba/DataX.git

在Phoenix中创建与MySQL相对应的table

Phoenix的数据类型,注意修改MySQL表中数据到对应数据类型,另外not null需要与primary key联合使用

  • INTEGER
  • UNSIGNED_INT
  • BIGINT
  • UNSIGNED_LONG
  • TINYINT
  • UNSIGNED_TINYINT
  • SMALLINT
  • UNSIGNED_SMALLINT
  • FLOAT
  • UNSIGNED_FLOAT
  • DOUBLE
  • UNSIGNED_DOUBLE
  • DECIMAL
  • BOOLEAN
  • TIME
  • DATE
  • TIMESTAMP
  • UNSIGNED_TIME
  • UNSIGNED_DATE
  • UNSIGNED_TIMESTAMP
  • VARCHAR
  • CHAR
  • BINARY
  • VARBINARY
  • ARRAY

例如驾驶舱中的visit_details可修改为:

create table visit_details (
        inc_id varchar(32) not null,
        area_code varchar(32)  ,
        area_name varchar(32)  ,
        department_code varchar(32)  ,
        department_name varchar(32)  ,
        visit_type tinyint default 0  ,
        patient_code varchar(32) not null  ,
        address varchar(128)   ,
        address_split varchar(128) ,
        address_province varchar(32) ,
        address_city varchar(32)   ,
        address_district varchar(32)  ,
        address_town varchar(32)  ,
        address_street varchar(128) ,
        gender tinyint default 0 ,
        age integer ,
        visit_at timestamp ,
        operate_at timestamp ,
        created_at timestamp not null ,
        updated_at timestamp not null 
  CONSTRAINT PK PRIMARY KEY (inc_id, patient_code, created_at, updated_at) 
);

插入语句示例:

UPSERT INTO visit_details(inc_id, area_code, area_name, department_code, department_name, visit_type, patient_code, address, address_split, address_province, address_city, address_district, address_town, address_street, gender, age, visit_at, operate_at, created_at, updated_at) VALUES ('0000011ccc654b518edbce2b3e58afa2', '9', '江南分院', '0204', '江南急诊科', 1, '1000003834429', '重庆市万州区石峰路999号', '重庆市-万州区-石峰路999号', '重庆市', NULL, '万州区', NULL, '石峰路999号', 1, 3, '2019-02-10 22:43:11', '2019-02-10 20:29:19', '2019-08-01 00:39:43', '2019-08-01 00:39:43');

Datax源码编译(源码plugins太多,可以将主目录下pom.xml文件中的不需要的module注释掉)

$ cd  {DataX_source_code_home}
$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true

在编译的目录下,进入target目录,可以找到编译后的datax。

Phoenix开启QueryServer;服务器需要开启8765端口,docker则需要开放并映射8765端口

$ cd {Phoenix_Home}/bin
$ ./queryServer.py start

编辑DataX的Job文件:

  1. 新建一个json文件:mysql2phoenix.json

    {
      job:{
        "content":[
            {
                "reader":$Reader_Object,
                "writer":$Writer_Object
            }
        ],
        "setting":{
            "speed":{
                "byte":1048576,
          //同时开启的进程数
                "channel":16,
          //分片的记录条数
                "record":10000
            }
        }
    }
    }
    

    Reader Object

            {
        //name为datax的plugin目录下的插件名
        "name":"mysqlreader", 
        //paramter为插件所需要的参数,各插件各不相同
                "parameter":{
                    "connection":[
                        {
                              //mysql连接地址,可配多个
                            "jdbcUrl":[
                                "jdbc:mysql://localhost:3306/odc_v2"
                            ],
                              //mysql查询SQL语句,列名顺序和列数需要和writer中的相同
                            "querySql":[
                                "select * from visit_details;"
                            ]
                        }
                    ],
                    "password":"******",
                    "username":"root"
                }

Writer_Object

{
                  //write的插件名
                "name":"hbase20xsqlwriter",
                "parameter":{
                      //单次提交的条数
                    "batchSize":"100",
                    "column":[
                        "INC_ID",
                        "AREA_CODE",
                        "AREA_NAME",
                        "DEPARTMENT_CODE",
                        "DEPARTMENT_NAME",
                        "VISIT_TYPE",
                        "PATIENT_CODE",
                        "ADDRESS",
                        "ADDRESS_SPLIT",
                        "ADDRESS_PROVINCE",
                        "ADDRESS_CITY",
                        "ADDRESS_DISTRICT",
                        "ADDRESS_TOWN",
                        "ADDRESS_STREET",
                        "GENDER",
                        "AGE",
                        "VISIT_AT",
                        "OPERATE_AT",
                        "CREATED_AT",
                        "UPDATED_AT"
                    ],
                      //空值处理策略,有skip和set null
                    "nullMode":"skip",
                      //服务地址
                    "queryServerAddress":"http://server-Ip:8765",
                    //写入表
                      "table":"VISIT_DETAILS"
                }
            }

运行DataX

$ cd {DataX_Home}/bin
$ python datax.py {Your_Job_Path}/mysql2phoenix.json