使用DATAX将MySQL导入Phoenix

安装Hadoop、Zookeeper、Hbase 2.X、Phoenix 5.X

$ docker pull mysaber/hadoop 0.1.5

配置Phoenix 5.X

//复制hbase配置到phoenix
$ cp {HBase_Home}/conf/hbase-site.xml {phoenix_home}/bin/
//拷贝phoenix下的把 phoenix-<version>-server.jar、phoenix-core-<version>.jar 拷贝到hbaseServer的lib目录下{Hbase_Home}/lib

下载DataX源码并进行编译

$ git clone https://github.com/alibaba/DataX.git

在Phoenix中创建与MySQL相对应的table

Phoenix的数据类型,注意修改MySQL表中数据到对应数据类型,另外not null需要与primary key联合使用

  • INTEGER
  • UNSIGNED_INT
  • BIGINT
  • UNSIGNED_LONG
  • TINYINT
  • UNSIGNED_TINYINT
  • SMALLINT
  • UNSIGNED_SMALLINT
  • FLOAT
  • UNSIGNED_FLOAT
  • DOUBLE
  • UNSIGNED_DOUBLE
  • DECIMAL
  • BOOLEAN
  • TIME
  • DATE
  • TIMESTAMP
  • UNSIGNED_TIME
  • UNSIGNED_DATE
  • UNSIGNED_TIMESTAMP
  • VARCHAR
  • CHAR
  • BINARY
  • VARBINARY
  • ARRAY

例如驾驶舱中的visit_details可修改为:

create table visit_details (
inc_id varchar(32) not null,
area_code varchar(32) ,
area_name varchar(32) ,
department_code varchar(32) ,
department_name varchar(32) ,
visit_type tinyint default 0 ,
patient_code varchar(32) not null ,
address varchar(128) ,
address_split varchar(128) ,
address_province varchar(32) ,
address_city varchar(32) ,
address_district varchar(32) ,
address_town varchar(32) ,
address_street varchar(128) ,
gender tinyint default 0 ,
age integer ,
visit_at timestamp ,
operate_at timestamp ,
created_at timestamp not null ,
updated_at timestamp not null
CONSTRAINT PK PRIMARY KEY (inc_id, patient_code, created_at, updated_at)
);

插入语句示例:

UPSERT INTO visit_details(inc_id, area_code, area_name, department_code, department_name, visit_type, patient_code, address, address_split, address_province, address_city, address_district, address_town, address_street, gender, age, visit_at, operate_at, created_at, updated_at) VALUES ('0000011ccc654b518edbce2b3e58afa2', '9', '江南分院', '0204', '江南急诊科', 1, '1000003834429', '重庆市万州区石峰路999号', '重庆市-万州区-石峰路999号', '重庆市', NULL, '万州区', NULL, '石峰路999号', 1, 3, '2019-02-10 22:43:11', '2019-02-10 20:29:19', '2019-08-01 00:39:43', '2019-08-01 00:39:43');

Datax源码编译(源码plugins太多,可以将主目录下pom.xml文件中的不需要的module注释掉)

$ cd {DataX_source_code_home}
$ mvn -U clean package assembly:assembly -Dmaven.test.skip=true

在编译的目录下,进入target目录,可以找到编译后的datax。

Phoenix开启QueryServer;服务器需要开启8765端口,docker则需要开放并映射8765端口

$ cd {Phoenix_Home}/bin
$ ./queryServer.py start

编辑DataX的Job文件:

  1. 新建一个json文件:mysql2phoenix.json

    {
    job:{
    "content":[
    {
    "reader":$Reader_Object,
    "writer":$Writer_Object
    }
    ],
    "setting":{
    "speed":{
    "byte":1048576,
    //同时开启的进程数
    "channel":16,
    //分片的记录条数
    "record":10000
    }
    }
    }
    }

    Reader Object

{
//name为datax的plugin目录下的插件名
"name":"mysqlreader",
//paramter为插件所需要的参数,各插件各不相同
"parameter":{
"connection":[
{
//mysql连接地址,可配多个
"jdbcUrl":[
"jdbc:mysql://localhost:3306/odc_v2"
],
//mysql查询SQL语句,列名顺序和列数需要和writer中的相同
"querySql":[
"select * from visit_details;"
]
}
],
"password":"******",
"username":"root"
}

Writer_Object

{
//write的插件名
"name":"hbase20xsqlwriter",
"parameter":{
//单次提交的条数
"batchSize":"100",
"column":[
"INC_ID",
"AREA_CODE",
"AREA_NAME",
"DEPARTMENT_CODE",
"DEPARTMENT_NAME",
"VISIT_TYPE",
"PATIENT_CODE",
"ADDRESS",
"ADDRESS_SPLIT",
"ADDRESS_PROVINCE",
"ADDRESS_CITY",
"ADDRESS_DISTRICT",
"ADDRESS_TOWN",
"ADDRESS_STREET",
"GENDER",
"AGE",
"VISIT_AT",
"OPERATE_AT",
"CREATED_AT",
"UPDATED_AT"
],
//空值处理策略,有skip和set null
"nullMode":"skip",
//服务地址
"queryServerAddress":"http://server-Ip:8765",
//写入表
"table":"VISIT_DETAILS"
}
}

运行DataX

$ cd {DataX_Home}/bin
$ python datax.py {Your_Job_Path}/mysql2phoenix.json