使用DATAX将MySQL导入Phoenix
安装Hadoop、Zookeeper、Hbase 2.X、Phoenix 5.X
$ docker pull mysaber/hadoop 0.1.5
配置Phoenix 5.X
//复制hbase配置到phoenix
$ cp {HBase_Home}/conf/hbase-site.xml {phoenix_home}/bin/
//拷贝phoenix下的把 phoenix-<version>-server.jar、phoenix-core-<version>.jar 拷贝到hbaseServer的lib目录下{Hbase_Home}/lib
下载DataX源码并进行编译
$ git clone https://github.com/alibaba/DataX.git
在Phoenix中创建与MySQL相对应的table
Phoenix的数据类型,注意修改MySQL表中数据到对应数据类型,另外not null需要与primary key联合使用
- INTEGER
- UNSIGNED_INT
- BIGINT
- UNSIGNED_LONG
- TINYINT
- UNSIGNED_TINYINT
- SMALLINT
- UNSIGNED_SMALLINT
- FLOAT
- UNSIGNED_FLOAT
- DOUBLE
- UNSIGNED_DOUBLE
- DECIMAL
- BOOLEAN
- TIME
- DATE
- TIMESTAMP
- UNSIGNED_TIME
- UNSIGNED_DATE
- UNSIGNED_TIMESTAMP
- VARCHAR
- CHAR
- BINARY
- VARBINARY
- ARRAY
例如驾驶舱中的visit_details可修改为:
create table visit_details (
inc_id varchar(32) not null,
area_code varchar(32) ,
area_name varchar(32) ,
department_code varchar(32) ,
department_name varchar(32) ,
visit_type tinyint default 0 ,
patient_code varchar(32) not null ,
address varchar(128) ,
address_split varchar(128) ,
address_province varchar(32) ,
address_city varchar(32) ,
address_district varchar(32) ,
address_town varchar(32) ,
address_street varchar(128) ,
gender tinyint default 0 ,
age integer ,
visit_at timestamp ,
operate_at timestamp ,
created_at timestamp not null ,
updated_at timestamp not null
CONSTRAINT PK PRIMARY KEY (inc_id, patient_code, created_at, updated_at)
);
插入语句示例:
UPSERT INTO visit_details(inc_id, area_code, area_name, department_code, department_name, visit_type, patient_code, address, address_split, address_province, address_city, address_district, address_town, address_street, gender, age, visit_at, operate_at, created_at, updated_at) VALUES ('0000011ccc654b518edbce2b3e58afa2', '9', '江南分院', '0204', '江南急诊科', 1, '1000003834429', '重庆市万州区石峰路999号', '重庆市-万州区-石峰路999号', '重庆市', NULL, '万州区', NULL, '石峰路999号', 1, 3, '2019-02-10 22:43:11', '2019-02-10 20:29:19', '2019-08-01 00:39:43', '2019-08-01 00:39:43');
Datax源码编译(源码plugins太多,可以将主目录下pom.xml文件中的不需要的module注释掉)
$ cd {DataX_source_code_home} $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
在编译的目录下,进入target目录,可以找到编译后的datax。
Phoenix开启QueryServer;服务器需要开启8765端口,docker则需要开放并映射8765端口
$ cd {Phoenix_Home}/bin $ ./queryServer.py start
编辑DataX的Job文件:
新建一个json文件:mysql2phoenix.json
{ job:{ "content":[ { "reader":$Reader_Object, "writer":$Writer_Object } ], "setting":{ "speed":{ "byte":1048576, //同时开启的进程数 "channel":16, //分片的记录条数 "record":10000 } } } }
Reader Object
{
//name为datax的plugin目录下的插件名
"name":"mysqlreader",
//paramter为插件所需要的参数,各插件各不相同
"parameter":{
"connection":[
{
//mysql连接地址,可配多个
"jdbcUrl":[
"jdbc:mysql://localhost:3306/odc_v2"
],
//mysql查询SQL语句,列名顺序和列数需要和writer中的相同
"querySql":[
"select * from visit_details;"
]
}
],
"password":"******",
"username":"root"
}
Writer_Object
{
//write的插件名
"name":"hbase20xsqlwriter",
"parameter":{
//单次提交的条数
"batchSize":"100",
"column":[
"INC_ID",
"AREA_CODE",
"AREA_NAME",
"DEPARTMENT_CODE",
"DEPARTMENT_NAME",
"VISIT_TYPE",
"PATIENT_CODE",
"ADDRESS",
"ADDRESS_SPLIT",
"ADDRESS_PROVINCE",
"ADDRESS_CITY",
"ADDRESS_DISTRICT",
"ADDRESS_TOWN",
"ADDRESS_STREET",
"GENDER",
"AGE",
"VISIT_AT",
"OPERATE_AT",
"CREATED_AT",
"UPDATED_AT"
],
//空值处理策略,有skip和set null
"nullMode":"skip",
//服务地址
"queryServerAddress":"http://server-Ip:8765",
//写入表
"table":"VISIT_DETAILS"
}
}
运行DataX
$ cd {DataX_Home}/bin
$ python datax.py {Your_Job_Path}/mysql2phoenix.json