使用DATAX将MySQL导入Phoenix
安装Hadoop、Zookeeper、Hbase 2.X、Phoenix 5.X
$ docker pull mysaber/hadoop 0.1.5
配置Phoenix 5.X
//复制hbase配置到phoenix $ cp {HBase_Home}/conf/hbase-site.xml {phoenix_home}/bin/ //拷贝phoenix下的把 phoenix-<version>-server.jar、phoenix-core-<version>.jar 拷贝到hbaseServer的lib目录下{Hbase_Home}/lib
下载DataX源码并进行编译
$ git clone https://github.com/alibaba/DataX.git
在Phoenix中创建与MySQL相对应的table
Phoenix的数据类型,注意修改MySQL表中数据到对应数据类型,另外not null需要与primary key联合使用
- INTEGER
- UNSIGNED_INT
- BIGINT
- UNSIGNED_LONG
- TINYINT
- UNSIGNED_TINYINT
- SMALLINT
- UNSIGNED_SMALLINT
- FLOAT
- UNSIGNED_FLOAT
- DOUBLE
- UNSIGNED_DOUBLE
- DECIMAL
- BOOLEAN
- TIME
- DATE
- TIMESTAMP
- UNSIGNED_TIME
- UNSIGNED_DATE
- UNSIGNED_TIMESTAMP
- VARCHAR
- CHAR
- BINARY
- VARBINARY
- ARRAY
例如驾驶舱中的visit_details可修改为:
create table visit_details ( inc_id varchar(32) not null, area_code varchar(32) , area_name varchar(32) , department_code varchar(32) , department_name varchar(32) , visit_type tinyint default 0 , patient_code varchar(32) not null , address varchar(128) , address_split varchar(128) , address_province varchar(32) , address_city varchar(32) , address_district varchar(32) , address_town varchar(32) , address_street varchar(128) , gender tinyint default 0 , age integer , visit_at timestamp , operate_at timestamp , created_at timestamp not null , updated_at timestamp not null CONSTRAINT PK PRIMARY KEY (inc_id, patient_code, created_at, updated_at) );
插入语句示例:
UPSERT INTO visit_details(inc_id, area_code, area_name, department_code, department_name, visit_type, patient_code, address, address_split, address_province, address_city, address_district, address_town, address_street, gender, age, visit_at, operate_at, created_at, updated_at) VALUES ('0000011ccc654b518edbce2b3e58afa2', '9', '江南分院', '0204', '江南急诊科', 1, '1000003834429', '重庆市万州区石峰路999号', '重庆市-万州区-石峰路999号', '重庆市', NULL, '万州区', NULL, '石峰路999号', 1, 3, '2019-02-10 22:43:11', '2019-02-10 20:29:19', '2019-08-01 00:39:43', '2019-08-01 00:39:43');
Datax源码编译(源码plugins太多,可以将主目录下pom.xml文件中的不需要的module注释掉)
$ cd {DataX_source_code_home} $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true
在编译的目录下,进入target目录,可以找到编译后的datax。
Phoenix开启QueryServer;服务器需要开启8765端口,docker则需要开放并映射8765端口
$ cd {Phoenix_Home}/bin $ ./queryServer.py start
编辑DataX的Job文件:
新建一个json文件:mysql2phoenix.json
{job:{"content":[{"reader":$Reader_Object,"writer":$Writer_Object}],"setting":{"speed":{"byte":1048576,//同时开启的进程数"channel":16,//分片的记录条数"record":10000}}}}Reader Object
{ //name为datax的plugin目录下的插件名 "name":"mysqlreader", //paramter为插件所需要的参数,各插件各不相同 "parameter":{ "connection":[ { //mysql连接地址,可配多个 "jdbcUrl":[ "jdbc:mysql://localhost:3306/odc_v2" ], //mysql查询SQL语句,列名顺序和列数需要和writer中的相同 "querySql":[ "select * from visit_details;" ] } ], "password":"******", "username":"root" }
Writer_Object
{ //write的插件名 "name":"hbase20xsqlwriter", "parameter":{ //单次提交的条数 "batchSize":"100", "column":[ "INC_ID", "AREA_CODE", "AREA_NAME", "DEPARTMENT_CODE", "DEPARTMENT_NAME", "VISIT_TYPE", "PATIENT_CODE", "ADDRESS", "ADDRESS_SPLIT", "ADDRESS_PROVINCE", "ADDRESS_CITY", "ADDRESS_DISTRICT", "ADDRESS_TOWN", "ADDRESS_STREET", "GENDER", "AGE", "VISIT_AT", "OPERATE_AT", "CREATED_AT", "UPDATED_AT" ], //空值处理策略,有skip和set null "nullMode":"skip", //服务地址 "queryServerAddress":"http://server-Ip:8765", //写入表 "table":"VISIT_DETAILS" } }
运行DataX
$ cd {DataX_Home}/bin $ python datax.py {Your_Job_Path}/mysql2phoenix.json