0%

Sqoop快速上手

Sqoop_logo

Sqoop是一款开源的工具,主要用于在Hadoop的文件存储系统(HDFS、HIVE、HBASE)与传统的数据库(MySQL、Oracle)间进行数据的传递,可以将关系型数据库中的数据导进到Hadoop文件存储系统中,也可以将Hadoop文件存储系统中的数据导进到关系型数据库中。


概述

Sqoop 是一款开源的工具,主要用于在 Hadoop 的文件存储系统(HDFS/Hive/HBase)与传统的数据库(MySQL/Oracle)间进行数据的传递,可以将关系型数据库中的数据导进到 Hadoop 文件存储系统中,也可以将 Hadoop 文件存储系统中的数据导进到关系型数据库中。

Sqoop功能概述

架构与工作机制

Sqoop1架构
Sqoop1架构
版本号:1.4.x
架构:sqoop1 使用 sqoop 客户端直接提交的方式
访问方式:CLI 控制台方式进行访问
安全性:命令或脚本中指定用户数据库名及密码

Sqoop2架构
Sqoop2架构
版本:1.99x
架构:引入了sqoop server,对connector实现了集中的管理
访问方式:REST API、JAVA API、 WEB UI以及CLI控制台方式进行访问

Sqoop2和Sqoop1的区别是独立出了客户端和服务端,变为C/S架构,服务端的任务和Sqoop1相同,客户端被独立出来后更加灵活。实际生产环境中,更多使用Sqoop1。

Sqoop工作机制
将导入或导出命令转换成 Mapreduce 程序来实现,主要是对 Mapreduce 的 inputformat 和 outputformat 进行定制。转换的 MR 程序只有 Map 不需要 Reduce。

安装

开发环境

  1. 三台虚拟机
1
2
3
192.168.153.100 node01
192.168.153.101 node02
192.168.153.102 node03
  1. 目录结构
1
2
/export/softwares:存放安装包
/export/servers:存放安装程序
  1. 软件环境
1
2
3
4
jdk:java version "1.8.0_141"
zookeeper:zookeeper-3.4.9
hadoop:hadoop-2.6.0-cdh5.14.0
hive:hive-1.1.0-cdh5.14.0

安装

在 node03 上安装Sqoop1。

  1. 下载压缩包和依赖包到 /export/softwares

Sqoop下载地址:Sqoop1Sqoop2

  1. 解压
/export/softwares/
1
tar -zxvf sqoop-1.4.6-cdh5.14.0.tar.gz -C /export/servers/

配置

  1. 配置 sqoop-env.sh,添加 Hadoop 路径:
sqoop-1.4.6-cdh5.14.0/conf
1
2
3
4
5
$ cp sqoop-env-template.sh sqoop-env.sh
$ vim sqoop-env.sh
export HADOOP_COMMON_HOME=/export/servers/hadoop-2.6.0-cdh5.14.0
export HADOOP_MAPRED_HOME=/export/servers/hadoop-2.6.0-cdh5.14.0
export HIVE_HOME=/export/servers/hive-1.1.0-cdh5.14.0
  1. 添加 MySQL 驱动包和 java-json.jar 依赖包:
1
2
cp mysql-connector-java-5.1.40.jar /export/servers/sqoop-1.4.6-cdh5.14.0/lib/
cp java-json.jar /export/servers/sqoop-1.4.6-cdh5.14.0/lib/
  1. 连接 MySQL 数据库:
1
2
3
4
5
6
7
$ cd /export/servers/sqoop-1.4.6-cdh5.14.0/
$ bin/sqoop-version
...
19/06/12 17:03:47 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.14.0
Sqoop 1.4.6-cdh5.14.0
git commit id
Compiled by jenkins on Sat Jan 6 13:24:40 PST 2018

列出库和表

1
2
3
4
5
6
7
8
# 查看帮助
$ bin/sqoop help

# 列出MySQL中的数据库
$ bin/sqoop list-databases --connect jdbc:mysql://192.168.12.25:3306/ --username root --password root

# 列出MySQL中hive库的表
$ bin/sqoop list-tables --connect jdbc:mysql://node03:3306/hive --username root --password 123456

如果连接的数据库不允许远程访问,需要修改 mysqluser 表中的 Host 字段,从 localhost 改为 %

1
2
3
4
5
6
7
8
9
10
11
12
# 允许Sqoop远程访问MySQL数据库
$ mysql -uroot -p
mysql> use mysql;
mysql> update user set host = '%' where user = 'root';
mysql> FLUSH PRIVILEGES;
mysql> select host, user from user;
+------+------+
| host | user |
+------+------+
| % | root |
+------+------+
1 row in set (0.00 sec)

导入

导入是指从传统的数据库导入到 Hadoop 上的文件存储系统中。

./sqoop import

  • --connect <jdbc-uri>:JDBC 连接字符串
  • -m <Num>:设置 MapTask 数量
  • --table <tablename>:指定源表
  • 导入到 HDFS
  • --target-dir <dir>:导入数据到 HDFS 上的目标目录
  • --delete-target-dir:目标目录如果存在就先删掉
  • --hive-import:导入到 Hive
  • --hive-table database.table:导入数据到 Hive 上的目标表
  • --hive-overwrite:覆盖原来的数据
  • --hive-database myhive:导入到 Hive 上的目标库,根据源表结构在 Hive 中自动创建表
  • --fields-terminated-by <char>:设置字段分隔符
  • --where <"field = 'value'">:根据条件过滤数据
  • --query 'SQL':根据 SQL 过滤数据,where 子句后需追加 $CONDITIONS,此为固定写法
准备数据

在宿主机 Windows 中创建 MySQL 数据库 userdb,包含三个表:emp, emp_addemp_conn,用于 node03 上的 Sqoop 导出数据。

**emp表**
id name deg salary dept create_time update_time is_delete
1201 gopal manager 50000 TP 2018-06-11 18:54:32 2018-06-11 18:54:32 1
1202 manisha Proof reader 50000 TPP 2018-06-15 18:54:32 2018-06-12 10:48:38 0
1203 khalillskjds php dev 30000 AC 2018-06-11 18:54:32 2018-06-12 18:29:05 1
1204 prasanth php dev 30000 AC 2018-06-11 18:54:32 2018-06-11 21:05:52 0
1205 kranthixxx admin 20000 TP 2018-06-11 18:54:32 2018-06-11 10:20:33 1
**emp_add表**
id hno street city create_time update_time is_delete
1201 288A vgiri jublee 2018-06-11 18:54:34 2018-06-11 18:54:34 1
1202 108I aoc sec-bad 2018-06-11 18:54:34 2018-06-11 18:54:34 1
1203 144Z pgutta hyd 2018-06-11 18:54:34 2018-06-11 18:54:34 1
1204 78B old city sec-bad 2018-06-11 18:54:34 2018-06-11 18:54:34 1
1205 720X hitec sec-bad 2018-06-11 18:54:34 2018-06-11 18:54:34 1
**emp_conn表**
id phno email create_time update_time is_delete
1201 2356742 gopal@tp.com 2018-06-11 18:54:36 2018-06-11 18:54:36 1
1202 1661663 manisha@tp.com 2018-06-11 18:54:36 2018-06-11 18:54:36 1
1203 8887776 khalil@ac.com 2018-06-11 18:54:36 2018-06-11 18:54:36 1
1204 9988774 prasanth@ac.com 2018-06-11 18:54:36 2018-06-11 18:54:36 1
1205 1231231 kranthi@tp.com 2018-06-11 18:54:36 2018-06-11 18:54:36 1

导入到HDFS

导入工具将单个表从 MySQL 导入到 HDFS。表中的每一行被视为 HDFS 的记录,所有记录都存储为文本格式(或者 Avro、sequence 文件等二进制数据)。

导入到HDFS默认目录

需求:将 MySQL 的 emp 表中数据导入到 HDFS,HDFS 自动创建目录 /user/root/emp

sqoop-1.4.6-cdh5.14.0/
1
2
bin/sqoop import --connect jdbc:mysql://192.168.12.25:3306/userdb --password root --username root \
--table emp -m 1

查看 HDFS 中的文件:

1
hdfs dfs -ls /user/root/emp

查看文件内容(字段间默认用 , 分割):

1
hdfs dfs -text /sqoop/emp/part-m-00000

导入到HDFS指定目录

需求:在导入表数据到 HDFS 时指定目标目录

sqoop-1.4.6-cdh5.14.0/
1
2
3
bin/sqoop import --connect jdbc:mysql://192.168.12.25:3306/userdb --username root --password root \
--delete-target-dir --table emp \
--target-dir /sqoop/emp -m 1

指定字段之间的分隔符:

sqoop-1.4.6-cdh5.14.0/
1
2
3
bin/sqoop import --connect jdbc:mysql://192.168.12.25:3306/userdb --username root --password root \
--delete-target-dir --table emp --target-dir /sqoop/emp2 -m 1 \
--fields-terminated-by '\t'

查看导出的数据(字段间使用\t分割):

sqoop-1.4.6-cdh5.14.0/
1
hdfs dfs -text /sqoop/emp2/part-m-00000

WHERE子句过滤数据

使用 Sqoop 导入where子句的一个子集。它执行相应的 SQL 查询,并将结果存储在 HDFS 的目标目录。

需求:通过 --where 参数查找 emp_add 表中 city 字段值为 sec-bad 的所有数据并导入到 HDFS 中

sqoop-1.4.6-cdh5.14.0/
1
2
3
bin/sqoop import --connect jdbc:mysql://192.168.12.25:3306/userdb --username root --password root \
--delete-target-dir --table emp_add --target-dir /sqoop/emp_add -m 1 \
--where "city = 'sec-bad'"

SQL语句过滤数据

需求:通过-e(--query)参数指定 SQL 语句,通过sql语句来过滤我们的数据进行导入

sqoop-1.4.6-cdh5.14.0/
1
2
3
bin/sqoop import --connect jdbc:mysql://192.168.12.25:3306/userdb --username root --password root \
--delete-target-dir --target-dir /sqoop/emp_conn -m 1 \
--query 'select phno from emp_conn where 1=1 and $CONDITIONS'

注意事项:

  • 使用 SQL 语句来进行查找时不能加参数 --table
  • SQL 语句必须用单引号,不能用双引号
  • SQL 语句必须包含 where 条件
  • where 条件后面必须跟一个字符串 $CONDITIONS

查看 HDFS 上的数据内容:

1
hdfs dfs -text /sqoop/emp_conn/part*

增量导入

在实际工作当中,很多时候都是只需要导入增量数据即可,一般都是选用一些字段进行增量的导入。

  1. 使用 Sqoop 提供的增量导入参数
  • --incremental <mode>:增量导入类型,可选 appendlastmodified
  • --check-column <column name>:用于增量检查的列(字段)
  • --last-value <last check column value>:增量检查中的上次导入的值
    导入 emp 表中 id 大于 1202 的所有数据:
    sqoop-1.4.6-cdh5.14.0/
    1
    2
    3
    4
    5
    bin/sqoop import --connect jdbc:mysql://192.168.12.25:3306/userdb --username root --password root \
    --table emp --target-dir /sqoop/increment -m 1 \
    --incremental append \
    --check-column id \
    --last-value 1202
  1. 使用 --where 参数(推荐
    导入 emp 表中 2019-06-17 的所有数据,使用 --where 参数可以更加精准地控制数据的选取:
    sqoop-1.4.6-cdh5.14.0/
    1
    2
    3
    4
    5
    bin/sqoop import --connect jdbc:mysql://192.168.12.25:3306/userdb --username root --password root \
    --table emp --target-dir /sqoop/increment2 -m 1 \
    --incremental append \
    --check-column id \
    --where "create_time > '2018-06-11 00:00:00' and create_time < '2019-06-17 23:59:59' and is_delete='1'"

导入到HIVE

准备工作

将 MySQL 表中的数据导入到 Hive 表。需要将 Hive 的一个 jar 包拷贝到 Sqoop 的 lib 目录下:

1
cp hive-1.1.0-cdh5.14.0/lib/hive-exec-1.1.0-cdh5.14.0.jar /export/servers/sqoop-1.4.6-cdh5.14.0/lib/

启动 Hive 并创建数据库:

1
2
3
4
5
6
7
8
9
10
11
# 后台启动Hive
$ cd /export/servers/hive-1.1.0-cdh5.14.0
$ nohup bin/hive --service hiveserver2 2>&1 &

# 启动Hive客户端,创建Hive数据库
$ cd /export/servers/hive-1.1.0-cdh5.14.0
$ bin/beeline
beeline> !connect jdbc:hive2://node03:10000
Enter username for jdbc:hive2://node03:10000: root
Enter password for jdbc:hive2://node03:10000: 123456
0: jdbc:hive2://node03:10000> create database sqooptohive;

示例一、主动创建Hive表

需求:将 MySQL 的 emp 表中数据导入到 Hive 的 emp_hive 表。

  1. 在 Hive 上创建表
1
2
0: jdbc:hive2://node03:10000> use sqooptohive;
0: jdbc:hive2://node03:10000> create external table emp_hive(id int,name string,deg string,salary int ,dept string) row format delimited fields terminated by '\001';
  1. 将 MySQL 表中数据导入到指定 Hive 表
sqoop-1.4.6-cdh5.14.0/
1
2
bin/sqoop import --connect jdbc:mysql://192.168.12.25:3306/userdb --username root --password root \
--table emp --fields-terminated-by '\001' --hive-import --hive-table sqooptohive.emp_hive --hive-overwrite --delete-target-dir --m 1
  1. 查看 Hive 表数据
1
0: jdbc:hive2://node03:10000> select * from emp_hive;

示例二、自动创建Hive表

需求:将 MySQL 的 emp_conn 表导入到 Hive,Hive 表自动创建,表名和表结构与 MySQL 表相同。

sqoop-1.4.6-cdh5.14.0/
1
2
bin/sqoop import --connect jdbc:mysql://192.168.12.25:3306/userdb --username root --password root \
--table emp_conn --hive-import -m 1 --hive-database sqooptohive

导出

将数据从 HDFS 的文件导出到 MySQL 数据库中。注意:导出前,目标表必须存在于 MySQL 数据库中。

./sqoop export

  • --connect <jdbc-uri>:JDBC 连接字符串
  • --export-dir <dir>:源数据在 HDFS 上的存放目录
  • --input-fields-terminated-by <char>:设置字段分隔符
  • --table <tablename>:指定目标表
  1. 查看 HDFS 文件中的内容:

    /sqoop/emp/part-m-00000
    1
    2
    3
    4
    5
    1201,gopal,manager,50000,TP,2018-06-11 18:54:32.0,2018-06-11 18:54:32.0,1
    1202,manisha,Proof reader,50000,TPP,2018-06-15 18:54:32.0,2018-06-12 10:48:38.0,0
    1203,khalillskjds,php dev,30000,AC,2018-06-11 18:54:32.0,2018-06-12 18:29:05.0,1
    1204,prasanth,php dev,30000,AC,2018-06-11 18:54:32.0,2018-06-11 21:05:52.0,0
    1205,kranthixxx,admin,20000,TP,2018-06-11 18:54:32.0,2018-06-11 10:20:33.0,1
  2. 在 MySQL 中创建目标表 emp_out

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    CREATE TABLE `emp_out` (
    `id` INT(11) DEFAULT NULL,
    `name` VARCHAR(100) DEFAULT NULL,
    `deg` VARCHAR(100) DEFAULT NULL,
    `salary` INT(11) DEFAULT NULL,
    `dept` VARCHAR(10) DEFAULT NULL,
    `create_time` TIMESTAMP NOT NULL,
    `update_time` TIMESTAMP NOT NULL,
    `is_delete` BIGINT(20) DEFAULT '1'
    ) ENGINE=INNODB DEFAULT CHARSET=utf8;
  3. 执行导出命令:

    sqoop-1.4.6-cdh5.14.0/
    1
    2
    bin/sqoop export --connect jdbc:mysql://192.168.12.25:3306/userdb --username root --password root \
    --table emp_out --export-dir /sqoop/emp --input-fields-terminated-by ","

导入脚本示例

脚本的使用场景:

1
2
3
4
5
6
# 初次导入,需要多初始化几张表,这几张表在之后不会发生变动
$ ./mysql2hdfs.sh first
# 日常导入
$ ./mysql2hdfs.sh all
# 导入指定表的数据,可以指定日期
$ ./mysql2hdfs.sh order_info 2020-10-10

需要注意的地方:

  • 脚本接收的参数:$1 = 导入哪些数据:全部表(初次)、全部表(日常)、具体某张表,$2 = 日期,默认为前一日。
  • 脚本中通用函数 import_data() 接收的参数:$1 = 表名,$2 = 过滤条件。
  • null 的处理:Hive 底层以 \N 来存储 null,为了保证 MySQL 和 Hive 数据的一致性,导入数据时使用 --null-string--null-non-string、导出数据时使用 --input-null-string--input-null-non-string 参数对 null 进行转换。
mysql2hdfs.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#! /bin/bash
# 设置变量记录sqoop路径
sqoop=/opt/module/sqoop/bin/sqoop
# 获取昨天的date,通过+%F格式化为年-月-日
do_date=$(date -d '-1 day' +%F)

# 脚本传入的第二个参数不为空,即指定了日期,则替换do_date的值
if [[ -n "$2" ]]; then
do_date=$2
fi

# 通用函数
import_data() {
$sqoop import \
--connect jdbc:mysql://hadoop102:3306/gmall --username root --password 123456 \
# 导入到HDFS的路径,$1参数为表名
--target-dir /origin_data/gmall/db/$1/$do_date \
--delete-target-dir \
# 可以执行SQL对数据进行过滤,where子句后需要追加$CONDITIONS
--query "$2 and \$CONDITIONS" \
# map tasks的数量,默认4个
--num-mappers 1 \
# 分隔符
--fields-terminated-by '\t' \
# 压缩
--compress --compression-codec lzop \
# hive底层用\N表示null,这里将null替换为\N,否则会存储为字符串null
--null-string '\\N' \
--null-non-string '\\N'

# 为刚导入的数据创建lzo索引
hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/gmall/db/$1/$do_date
}

# 增量导入
import_order_info() {
import_data order_info "select
id,
final_total_amount,
order_status,
user_id,
out_trade_no,
create_time,
operate_time,
province_id,
benefit_reduce_amount,
original_total_amount,
feight_fee
from order_info
where (date_format(create_time,'%Y-%m-%d')='$do_date'
or date_format(operate_time,'%Y-%m-%d')='$do_date')"
}

# 全量导入
import_sku_info() {
import_data sku_info "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
create_time
from sku_info
where 1=1"
}

# 全量导入
import_base_province() {
import_data base_province "select
id,
name,
region_id,
area_code,
iso_code
from base_province
where 1=1"
}

# 全量导入
import_base_region() {
import_data base_region "select
id,
region_name
from base_region
where 1=1"
}

case $1 in
"order_info")
import_order_info
;;
"sku_info")
import_sku_info
;;
"base_province")
import_base_province
;;
"base_region")
import_base_region
;;
# first比all多两个只需要在初始化时需要导的表
"first")
import_order_info
import_sku_info
import_base_province
import_base_region
;;
"all")
import_order_info
import_sku_info
;;
esac
  • 本文作者: SANNAHA
  • 本文链接: https://sannaha.moe/Sqoop/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!