0%

HDFS快速上手

Hadoop 分布式文件系统(HDFS)被设计成适合运行在通用硬件上的分布式文件系统。它是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。


概述

前提和设计目标

硬件错误
硬件错误是常态而不是异常。HDFS可能由成百上千的服务器所构成,每个服务器上存储着文件系统的部分数据。我们面对的现实是构成系统的组件数目是巨大的,而且任一组件都有可能失效,这意味着总是有一部分HDFS的组件是不工作的。因此错误检测和快速、自动的恢复是HDFS最核心的架构目标。

流式数据访问
运行在HDFS上的应用和普通的应用不同,需要流式访问它们的数据集。HDFS的设计中更多的考虑到了数据批处理,而不是用户交互处理。比之数据访问的低延迟问题,更关键的在于数据访问的高吞吐量。

大规模数据集
运行在HDFS上的应用具有很大的数据集。HDFS上的一个典型文件大小一般都在G字节至T字节。因此,HDFS被调节以支持大文件存储。它应该能提供整体上高的数据传输带宽,能在一个集群里扩展到数百个节点。一个单一的HDFS实例应该能支撑数以千万计的文件。

简单的一致性模型
HDFS应用需要一个“一次写入多次读取”的文件访问模型。一个文件经过创建、写入和关闭之后就不需要改变。这一假设简化了数据一致性问题,并且使高吞吐量的数据访问成为可能。Map/Reduce应用或者网络爬虫应用都非常适合这个模型。目前还有计划在将来扩充这个模型,使之支持文件的附加写操作。

“移动计算比移动数据更划算”
一个应用请求的计算,离它操作的数据越近就越高效,在数据达到海量级别的时候更是如此。因为这样就能降低网络阻塞的影响,提高系统数据的吞吐量。将计算移动到数据附近,比之将数据移动到应用所在显然更好。HDFS为应用提供了将它们自己移动到数据附近的接口。

架构与概念

HDFS架构

  • HDFS 采用 master/slave 架构。一个 HDFS 集群由一个活动的 NameNode 和**多个 DataNode **组成。
  • NameNode 是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问
  • NameNode负责操作文件元数据,DataNode 负责处理文件内容的读写请求,在 NameNode 的统一调度下进行数据块的创建、删除和复制。跟文件内容相关的数据流不经过 NameNode,只询问要与哪个 DataNode 联系。
  • Namenode 控制复本存放在哪些 DataNode 上,根据全局情况作出块放置决定。读取文件时 NameNode 尽量让 Client 就近读取复本,以降低读取网络开销和读取延时。
  • NameNode 周期性的从集群中的每个 DataNode 接收心跳信号和块状态报告,接收到心跳信号意味着 DataNode 节点工作正常,块状态报告包含了一个该 DataNode 所有的数据列表。
NameNode DataNode
存储元数据 存储文件内容
元数据保存在内存中 文件内容保存在磁盘
保存文件、block、DataNode 之间的映射关系 维护 block id 到 DataNode 本地文件的映射关系

名字空间 (namespace)

HDFS支持传统的层次型文件组织结构。用户或者应用程序可以创建目录,然后将文件保存在这些目录里。文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。当前,HDFS不支持用户磁盘配额和访问权限控制,也不支持硬链接和软链接。但是HDFS架构并不妨碍实现这些特性。

NameNode 负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将被 NameNode 记录下来。应用程序可以设置HDFS保存的文件的复本数目。文件复本的数目称为文件的复本系数,这个信息也是由 NameNode 保存的。

数据复制

HDFS被设计成能够在一个大集群中跨机器可靠地存储超大文件。它将每个文件存储成一系列的数据块,除了最后一个,所有的数据块都是同样大小的。为了容错,文件的所有数据块都会有复本。每个文件的数据块大小和复本系数都是可配置的。应用程序可以指定某个文件的复本数目。复本系数可以在文件创建的时候指定,也可以在之后改变。HDFS中的文件都是一次性写入的,并且严格要求在任何时候只能有一个写入者。

NameNode 全权管理数据块的复制,它周期性地从集群中的每个 DataNode 接收心跳信号和块状态报告(BlockReport)。接收到心跳信号意味着该 DataNode 节点工作正常。块状态报告包含了一个该 DataNode 上所有数据块的列表。

安全模式

NameNode 启动后会进入一个称为安全模式的特殊状态,在这种状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求。处于安全模式的 NameNode 是不会进行数据块的复制的。NameNode 从所有的 DataNode 接收心跳信号和块状态报告。块状态报告包括了某个 DataNode 所有的数据块列表。每个数据块都有一个指定的最小复本数。当 NameNode 检测确认某个数据块的复本数目达到这个最小值,那么该数据块就会被认为是复本安全(safely replicated)的;在一定百分比(这个参数可配置)的数据块被 NameNode 检测确认是安全之后(加上一个额外的30秒等待时间),NameNode 将退出安全模式状态。接下来它会确定还有哪些数据块的复本没有达到指定数目,并将这些数据块复制到其他 DataNode 上。

文件的删除和恢复

当用户或应用程序删除某个文件时,这个文件并没有立刻从 HDFS 中删除。实际上,HDFS 会将这个文件重命名转移到 /trash 目录。只要文件还在 /trash 目录中,该文件就可以被迅速地恢复。文件在 /trash 中保存的时间是可配置的,当超过这个时间时,NameNode 就会将该文件从名字空间中删除。删除文件会使得该文件相关的数据块被释放。注意,从用户删除文件到HDFS空闲空间的增加之间会有一定时间的延迟。

只要被删除的文件还在 /trash 目录中,用户就可以恢复这个文件。/trash 目录仅仅保存被删除文件的最后复本。在 /trash 目录上HDFS会应用一个特殊策略来自动删除文件,目前的默认策略是删除 /trash 中保留时间超过6小时的文件。

分块存储

HDFS 中的文件在物理上是分块(block)存储的,在 hadoop 1.x 版本中默认大小是 64M,在 hadoop 2.x 版本中默认大小是 128M。为了容错,文件的所有 block 都会有复本。block 的大小和默认复本系数可以通过hdfs-site.xml配置参数来指定:

/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml
1
2
3
4
5
6
7
8
9
<property>
<name>dfs.replication</name>
<value>3</value>
</property>

<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>

抽象成数据块有哪些好处?

  1. 抽象成数据块后,一个文件的大小,可以大于集群中任意一个硬盘的大小。一个文件的块并不需要全部存储在同一个硬盘上,可以分布在集群中任意一个硬盘上。
  2. 使用抽象块而非整个文件作为存储单元,大大简化了系统的设计。简化设计,对于故障种类繁多的分布式系统来说尤为重要。以块为单位,一方面简化存储管理,因为块大小是固定的,所以一个硬盘放多少个块是非常容易计算的;另一方面,也消除了元数据的顾虑,因为 block 仅仅是存储的一块数据,其文件的元数据(例如权限等)就不需要跟数据块一起存储,可以交由其他系统来处理。
  3. 块非常适合用于数据备份进而提供数据容错能力和可用性

块缓存
通常 DataNode 从磁盘中读取块,但对于访问频繁的文件,其对应的块可能被显式地缓存在 DataNode 的内存中,以堆外块缓存的形式存在。默认情况下,一个块仅缓存在一个 DataNode 的内存中,当然可以针对每个文件配置 DataNode 的数量。作业调度器(用于MapReduce、Spark等框架的)通过在缓存块的 DataNode 上运行任务,可以利用块缓存的优势提高读操作的性能。

例如,连接(join)操作中使用的一个小的查询表就是块缓存的一个很好的候选。通过在缓存池(cache pool)中增加一个cache directive来告诉 NameNode 需要缓存哪些文件及存多久。缓存池是一个拥有管理缓存权限和资源使用的管理性分组。

文件权限

HDFS 的文件权限机制与 Linux 系统的文件权限机制类似。HDFS 相信你告诉我你是谁,你就是谁。HDFS 文件权限的目的是防止好人做错事,而不是阻止坏人做坏事。HDFS 可以通过hdfs-site.xml配置参数来决定是否启用权限控制:

/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml
1
2
3
4
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>

HDFS文件权限.png

元数据与CheckPoint

在 Hadoop 中,所有的元数据信息都保存在了 FSImageEdits 文件当中,这两个文件就记录了所有的数据的元数据信息,元数据信息的保存路径可以通过 hdfs-site.xml 配置参数来指定:

/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml
1
2
3
4
5
6
7
8
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/namenodeDatas</value>
</property>
<property>
<name>dfs.namenode.edits.dir</name>
<value>file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/nn/edits</value>
</property>

元数据的加载流程是什么?

  1. NameNode 启动后,加载元数据存储文件 FSImage 和事务日志文件 Edits 到内存。
  2. DataNode 上报 block 信息到 NameNode,也存储在内存中。

FSImage 与 Edits 是什么?

FSImage:HDFS 文件系统元数据存储在名为 “FSImage” 的文件中。它包含:

  • 整个文件系统命名空间
  • block 到文件的映射
  • 文件系统属性

Edits:NameNode 会使用叫作 EditLog 的事务日志持续记录下发生在文件系统元数据上的每一个变化。内存中的元数据在变化前会将操作记录在 Edits 文件中,以防止数据丢失。

为什么需要 FSImage 与 Edits?

首先需要明确:我们使用的最完整的元数据信息存储在 NameNode 的内存中。由于内存数据的易失性所以需要磁盘文件作备份。

FSImage 文件是作为元数据备份存储在磁盘中,但备份到磁盘需要时间,为了避免备份时新产生的元数据信息丢失,使用 Edits 文件通过预写日志记录元数据的变化。为了避免 Edits 文件大小无限膨胀下去,利用 SecondaryNameNode 或 JournalNode 来将 Edits 文件合并到 FSImage 文件中去。

SecondaryNameNode 是如何辅助管理 FSImage 文件与 Edits 文件的?

FSImage 文件与 Edits 文件的合并过程如下:

hdfs-checkpoint.png

  1. SecondaryNameNode 通知 NameNode 切换 Edits 文件,NameNode 会生成一个新的 Edits 文件
  2. SecondaryNameNode 从 NameNode 中获得 FSImage 文件和 Edits 文件(通过 HTTP GET)
  3. SecondaryNameNode 将 FSImage 文件载入内存,然后开始合并 Edits 文件,合并之后成为新的 FSImage 文件
  4. SecondaryNameNode 将新的 FSImage 文件发回给 NameNode
  5. NameNode 用新的 FSImage 文件替换旧的 FSImage 文件

可以通过修改 core-site.xml 文件改变 CheckPoint 触发条件,但一般来说不做修改:

  • fs.checkpoint.period:表示多长时间记录一次 FSImage,默认是1小时。
  • fs.checkpoint.size:表示一次记录的大小,默认是 64M。
hadoop-2.6.0-cdh5.14.0/etc/hadoop/core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<property><name>fs.checkpoint.period</name>
<value>3600</value>
<description>
The number of seconds between two periodic checkpoints.
</description>
</property>

<property>
<name>fs.checkpoint.size</name>
<value>67108864</value>
<description>
The size of the current edit log (in bytes) that triggers
a periodic checkpoint even if the fs.checkpoint.period hasn’t expired.
</description>
</property>

如何配置 SecondaryNameNode 实现与 NameNode 分离?

SecondaryNameNode 在合并 Edits 和 FSImage 时需要消耗的内存和 NameNode 差不多,所以一般把 NameNode 和SecondaryNameNode 放在不同的节点上。

  1. 通过创建 masters 文件指定 SecondaryNameNode 节点(可以指定多个,一行一个):
hadoop-2.6.0-cdh5.14.0/etc/hadoop/masters
1
node02

注:该配置文件的名字容易引起误会,名为masters实则用来配置 SecondaryNameNode。

  1. 修改 hdfs-site.xml 文件:
hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml
1
2
3
4
<property>
<name>dfs.namenode.http-address</name>
<value>node02:50070</value>
</property>

如何查看 FsImage 文件和 Edits 文件中的内容?

Offline Image Viewer Guide
Offline Edits Viewer Guide

读写过程

文件读取

HDFS_Client_Read_File

  1. Client 向 NameNode 发起 RPC 请求,来确定请求文件起始 block 所在的位置。
  2. NameNode 返回存有该 block 复本的 DataNode 地址,这些 DataNode 会根据集群网络拓扑结构计算出 DataNode 与 Client 的距离,然后进行排序。
  3. Client 选取排序靠前的 DataNode 来读取 block,如果该 Client 本身就是 DataNode,那么将从本地直接获取数据(短路读取)。
  4. 读取完当前 DataNode 中的所有 block 后自动寻找下一个 block 的最佳 DataNode,这些都在底层实现,在 Client 看来它一直在读取一个连续的流。
  5. 读取数据时,如果在与 DataNode 通信时出现错误,会尝试从下一个拥有该 block 复本的 DataNode 继续读取,同时记录那个故障的 DataNode,保证以后不会反复读取该节点上后续的 block;读取完 block 也会通过校验和(checksum)验证完整性,如果发现有损坏的 block,会尝试从其他 DataNode 继续读取,也会将被损坏的 block 通知给 NameNode。

文件读取在底层上是 Client 创建 DFSInputStream 对象,重复的调用 read() 方法读取 DataNode 中 block 的数据。

文件写入

HDFS_Client_Write_File

  1. Client 通过 DistributedFileSystem 对象调用 create() 来新建文件,DistributedFileSystem 对象向 NameNode 发起 RPC 请求在文件系统的命名空间新建一个文件。NameNode 检查目标文件是否已存在以及 Client 是否有权限。如果检查通过,NameNode 为创建新文件记录一条记录,DistributedFileSystem 向 Client 返回一个 FSDataOutputStream 对象,再由 FSDataOutputStream 封装一个 DFSOutPutStream 对象,该对象负责处理 DataNode 和 NameNode 之间的通信。
  2. DFSOutPutStream 将 Client 写入的数据分成一个个数据包,并写入内部的数据队列(data queue)。数据队列由 DataStreamer 处理,它挑选出适合存储数据复本的一组 DataNode,要求 NameNode 分配新的数据块,这组 DataNode 构成一个管线(pipeline)。假设复本数为 3,则管线有 3 个节点,DataStreamer 将数据包流式传输到管线中第 1 个 DataNode,该 DataNode 将数据包发送给管线中的第 2 个 DataNode,第 2 个 DataNode 再发送给第 3 个DataNode。
  3. DFSOutPutStream 还维护着一个确认队列(ack queue)来接收 DataNode 收到数据后发送的确认信息,收到管线中所有 DataNode 确认信息后,该数据包才会从确认队列中删除。
  4. 如果 DataNode 在数据写入期间发生故障,会先关闭管线,将队列中所有数据包添加回数据队列,确保故障节点下游的 DataNode 不会漏掉数据包;为存储在另一正常 DataNode 的当前数据块指定一个新的标识,并将该标识传送给 NameNode,以便故障 DataNode 在恢复后可以删除存储的部分数据库。从管线中删除故障的 DataNode,基于 2 个正常 DataNode 构建一条新管线处理后续的 block。如果复本数不足,NameNode 会在另一个 DataNode 上创建新的复本。
  5. Client 完成数据的写入后,对 FSDataOutputStream 调用 close() 方法。该操作会将剩余的所有数据包写入 DataNode 管线,并在 NameNode 告知其文件写入完成之前等待确认。

NameNode 如何选择在哪个 NameNode 存储复本?

Hadoop 在设计时考虑到数据存储的安全与高效,数据文件默认在 HDFS 上存放3份:1份存放在 Client 节点上,2份存放在另外某一个机架中的两个节点上。这样不仅能提供很好的稳定性(block 存储在两个机架中)并实现很好的负载均衡,包括写入带宽(写入操作只需经过一个交换机)、读取性能(可以从两个机架中选择读取)和集群中 block 的均匀分布(客户端只在本地机架写入一个 block)。

块在HDFS的复制过程

HDFS基准测试

测试写入速度

测试 HDFS 的文件写入性能:

1
2
3
4
5
# 向HDFS文件系统写入10个文件,每个文件10MB,文件存放到/benchmarks/TestDFSIO中
yarn jar /export/servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0.jar TestDFSIO -write -nrFiles 10 -fileSize 10MB

# 查看写入速度测试结果
yarn dfs -text /benchmarks/TestDFSIO/io_write/part-00000

测试读取速度

测试 HDFS 的文件读取性能:

1
2
3
4
5
# 从HDFS文件系统读取10个文件,每个文件10M
yarn jar /export/servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0.jar TestDFSIO -read -nrFiles 10 -fileSize 10MB

# 查看读取速度测试结果
yarn dfs -text /benchmarks/TestDFSIO/io_read/part-00000

清除测试数据

1
yarn jar /export/servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0.jar TestDFSIO -clean

命令行API

查看文件系统状况

运行 DFS 管理客户端,查看状况报告:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
$ hdfs dfsadmin -report
Configured Capacity: 103422541824 (96.32 GB)
Present Capacity: 78714851328 (73.31 GB)
DFS Remaining: 74144059392 (69.05 GB)
DFS Used: 4570791936 (4.26 GB)
DFS Used%: 5.81%
Replicated Blocks:
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0
Low redundancy blocks with highest priority to recover: 0
Pending deletion blocks: 0
Erasure Coded Block Groups:
Low redundancy block groups: 0
Block groups with corrupt internal blocks: 0
Missing block groups: 0
Low redundancy blocks with highest priority to recover: 0
Pending deletion blocks: 0

-------------------------------------------------
Live datanodes (3):

Name: 192.168.153.200:9866 (devcdh1.cdh.com)
Hostname: devcdh1.cdh.com
Rack: /default
Decommission Status : Normal
Configured Capacity: 34474180608 (32.11 GB)
DFS Used: 1523503104 (1.42 GB)
Non DFS Used: 15918665728 (14.83 GB)
DFS Remaining: 17032011776 (15.86 GB)
DFS Used%: 4.42%
DFS Remaining%: 49.41%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 2
Last contact: Tue Jul 14 14:13:34 CST 2020
Last Block Report: Tue Jul 14 14:02:28 CST 2020


Name: 192.168.153.201:9866 (devcdh2.cdh.com)
Hostname: devcdh2.cdh.com
Rack: /default
Decommission Status : Normal
Configured Capacity: 34474180608 (32.11 GB)
DFS Used: 1523789824 (1.42 GB)
Non DFS Used: 4306612224 (4.01 GB)
DFS Remaining: 28643778560 (26.68 GB)
DFS Used%: 4.42%
DFS Remaining%: 83.09%
Configured Cache Capacity: 432013312 (412 MB)
Cache Used: 0 (0 B)
Cache Remaining: 432013312 (412 MB)
Cache Used%: 0.00%
Cache Remaining%: 100.00%
Xceivers: 2
Last contact: Tue Jul 14 14:13:33 CST 2020
Last Block Report: Tue Jul 14 14:02:27 CST 2020


Name: 192.168.153.202:9866 (devcdh3.cdh.com)
Hostname: devcdh3.cdh.com
Rack: /default
Decommission Status : Normal
Configured Capacity: 34474180608 (32.11 GB)
DFS Used: 1523499008 (1.42 GB)
Non DFS Used: 4482412544 (4.17 GB)
DFS Remaining: 28468269056 (26.51 GB)
DFS Used%: 4.42%
DFS Remaining%: 82.58%
Configured Cache Capacity: 432013312 (412 MB)
Cache Used: 0 (0 B)
Cache Remaining: 432013312 (412 MB)
Cache Used%: 0.00%
Cache Remaining%: 100.00%
Xceivers: 2
Last contact: Tue Jul 14 14:13:36 CST 2020
Last Block Report: Tue Jul 14 14:02:27 CST 2020

进行 DFS 文件系统检查:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
$ hdfs fsck /
Connecting to namenode via http://devcdh1.cdh.com:9870/fsck?ugi=root&path=%2F
FSCK started by root (auth:SIMPLE) from /192.168.153.200 for path / at Tue Jul 14 14:17:22 CST 2020

Status: HEALTHY
Number of data-nodes: 3
Number of racks: 1
Total dirs: 953
Total symlinks: 0

Replicated Blocks:
Total size: 1511151131 B
Total files: 154
Total blocks (validated): 155 (avg. block size 9749362 B)
Minimally replicated blocks: 155 (100.0 %)
Over-replicated blocks: 0 (0.0 %)
Under-replicated blocks: 0 (0.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 3
Average block replication: 2.9741936
Missing blocks: 0
Corrupt blocks: 0
Missing replicas: 0 (0.0 %)

Erasure Coded Block Groups:
Total size: 0 B
Total files: 0
Total block groups (validated): 0
Minimally erasure-coded block groups: 0
Over-erasure-coded block groups: 0
Under-erasure-coded block groups: 0
Unsatisfactory placement block groups: 0
Average block group size: 0.0
Missing block groups: 0
Corrupt block groups: 0
Missing internal blocks: 0
FSCK ended at Tue Jul 14 14:17:22 CST 2020 in 21 milliseconds


The filesystem under path '/' is HEALTHY

查看目录和文件大小:

1
2
3
4
5
$ hdfs dfs -du -h /
562.4 M 1.6 G /hdfsdata
54 162 /hivetable
7.2 M 21.5 M /tmp
871.6 M 2.6 G /user

注意:前后两个文件大小分别是指逻辑空间和物理空间,物理空间 = 逻辑空间 * 复本数。

查看目录

ls 命令格式及示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
hdfs dfs -ls [-R] <args>
-R选项递归返回目录列表
args参数指定路径

# 如果指定路径下为目录,返回结果格式如下
permissions userid groupid modification_date modification_time dirname

$ hdfs dfs -ls /
Found 12 items
drwxr-xr-x - root supergroup 0 2018-07-31 21:02 /export
drwxr-xr-x - root supergroup 0 2018-07-18 21:37 /flink
drwxr-xr-x - SANNAHA supergroup 0 2018-07-19 17:32 /flink-checkpoint
drwxr-xr-x - SANNAHA supergroup 0 2018-07-19 20:09 /flink-checkpoint-0719
drwxr-xr-x - root supergroup 0 2018-07-18 20:26 /hbase

# 如果指定路径下为文件,返回结果格式及示例如下
permissions number_of_replicas userid groupid filesize modification_date modification_time filename

$ hdfs dfs -ls /export/servers/exporthive
-rwxr-xr-x 3 root supergroup 11 2018-07-31 21:03 /export/servers/exporthive/000000_0

创建目录

mkdir 命令格式及示例:

1
2
3
4
5
6
hdfs dfs -mkdir [-p] <paths>
-p选项与Linux相似,沿路径创建父目录
paths参数将路径uri作为参数并创建目录

$ hdfs dfs -mkdir -p /user/hadoop/dir1
$ hdfs dfs -mkdir hdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir

文件系统间复制文件

将一个或多个文件从本地上传到 HDFS:

1
2
3
4
5
6
7
# put 命令格式
hdfs dfs -put <localsrc>... <dst>

# 如果上传时保留原文件的名称,该命令可简化为 hdfs dfs -put localfile /user/hadoop
$ hdfs dfs -put localfile /user/hadoop/hadoopfile
$ hdfs dfs -put localfile1 localfile2 /user/hadoop/hadoopdir
$ hdfs dfs -put localfile hdfs://nn.example.com/hadoop/hadoopfile

将一个或多个文件从 HDFS 下载到本地:

1
2
3
4
5
6
7
# get 命令格式
hdfs dfs -get <dst>... <localsrc>

# 如果下载时保留原文件的名称,该命令可简化为 hdfs dfs -get /user/hadoop/hadoopfile ./
$ hdfs dfs -get /user/hadoop/hadoopfile localfile
$ hdfs dfs -get /user/hadoop/localfile1 /user/hadoop/localfile2 ./
$ hdfs dfs -get hdfs://nn.example.com/hadoop/hadoopfile localfile

将单个或多个文件从本地移动到 HDFS,移动后本地文件会被删除:

1
2
# moveFromLocal 命令格式
$ hdfs dfs -moveFromLocal <localsrc>... <dst>

将多个本地小文件合并成一个大文件上传到 HDFS:

1
$ hdfs dfs -appendToFile localfile1 localfile2 /mergefile

将多个 HDFS 小文件合并成一个大文件下载到本地:

1
2
$ hdfs dfs -getmerge /config/*.xml ./hello.xml
$ hdfs dfs -getmerge /config ./hello.xml

文件系统内复制和移动

mv 命令格式及示例:

1
2
3
4
5
6
# 将文件从源移动到目标。此命令允许多个源,在这种情况下,目标需要是目录。不允许跨文件系统移动文件
hdfs dfs -mv URI [URI ...] <dest>


$ hdfs dfs -mv /user/hadoop/file1 /user/hadoop/file2
$ hdfs dfs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2 hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1

cp 命令格式及示例:

1
2
3
4
5
6
7
8
9
10
# 复制文件/文件夹,可以覆盖,可以保留原有权限信息
hdfs dfs -cp [-f] [-p | -p[topax]] URI [URI ...] <dest>
-f选项在目标已存在的情况下将覆盖目标
-p选项将保留文件属性[topx](时间戳timestamps,所有权ownership,权限permission,访问控制列表ACL,文件系统的扩展属性XAttr)
如果指定了-p且没有参数,则保留时间戳,所有权和权限
如果指定了-pa,则还保留权限,因为ACL是一组超级权限
确定是否保留原始命名空间扩展属性与-p标志无关

$ hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2
$ hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir

查看内容

cat 命令格式及示例:

1
2
3
4
hdfs dfs -cat URI [URI ...]

$ hdfs dfs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
$ hdfs dfs -cat file:///file3 /user/hadoop/file4

追加内容

appendToFile 命令格式及示例:

1
2
3
4
5
6
# 追加一个或者多个文件到hdfs指定文件中
hdfs dfs -appendToFile <localsrc> ... <dst>

$ hdfs dfs -appendToFile localfile /user/hadoop/hadoopfile
$ hdfs dfs -appendToFile localfile1 localfile2 /user/hadoop/hadoopfile
$ hdfs dfs -appendToFile localfile hdfs://nn.example.com/hadoop/hadoopfile

删除

rm 命令格式及示例:

1
2
3
4
5
6
7
8
# 删除指定路径的文件
hdfs dfs -rm [-f] [-r|-R] [-skipTrash] URI [URI ...]
-f选项在目标文件不存在的情况下不显示错误信息
-R选项以递归方式删除目录及其下的所有内容
-r选项等效于-R
-skipTrash选项将立即删除指定的文件,并绕过垃圾桶(如果已启用)

$ hdfs dfs -rm hdfs://nn.example.com/file /user/hadoop/emptydir

修改权限

chmod 命令格式及示例:

1
2
3
4
hdfs dfs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...]
-R选项将以递归方式进行更改。

$ hdfs dfs -chmod 600 /config/core-site.xml

修改所有者

chown 命令格式:

1
2
hdfs dfs -chown [-R] [OWNER][:[GROUP]] URI [URI ]
-R选项将以递归方式进行更改。

清空回收站

expunge 命令格式:

1
hdfs dfs -expunge

Java API

依赖包

由于 CDH 版本的软件涉及版权的问题,没有将所有的 jar 包托管到 Maven 仓库当中去,而是托管在了 Cloudera 自己的服务器上面,所以需要手动指定仓库地址。

使用 CDH5 Maven 仓库
适用于CDH 5.14.x版本的 Maven Artifacts
使用 CDH6 Maven 仓库
适用于CDH 6.1.x版本的 Maven Artifacts

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!--<verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
<!--<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>moe.sannaha.hadoop.db.DBToHDFS</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>-->
</plugins>
</build>

HadoopURL方式获取数据

从 Hadoop 读取文件,最简单的方法是使用 java.net.URL 对象打开数据流,从中读取数据。

为了让 Java 程序识别 Hadoop 的 HDFS URL 形式,需要通过 FsUrlStreamHandlerFactory 实例调用 java.net.URL对象的setURLStreamHandlerFactory()方法。每个 Java 虚拟机只能调用一次这个方法,因此通常在静态方法中调用。如果程序的其他组件已经声明了一个 FsUrlStreamHandlerFactory实例,就无法再使用这种方法从 HDFS 读取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package moe.sannaha.hdfsapitest;

import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;

public class GetFileByURL {
static {
//注册HDFS的URL,让Java代码能够识别
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) {
InputStream inputStream = null;
FileOutputStream outputStream = null;
//指定访问文件的URL地址
String url = "hdfs://node01:8020/test/input/test.txt";
try {
//打开数据流
inputStream = new URL(url).openStream();
outputStream = new FileOutputStream(new File("e:/test.txt"));
//调用 Hadoop 中简洁的 IOUtils 类中的 copyBytes() 方法,参数为:输入流,输出流,复制缓冲区大小,复制结束是否关闭数据流
//输出到控制台
IOUtils.copyBytes(inputStream,System.out,4096,false);
//输出到本地文件
IOUtils.copyBytes(inputStream,outputStream,4096,false);
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(inputStream);
IOUtils.closeStream(outputStream);
}
}
}

FileSystem方式获取数据

通过 URL 方式获取 HDFS 文件流虽然简单,但存在局限。因此需要 FileSystem API 打开文件的数据流。

获取FileSystem

在 Java 中获取 FileSystem,主要涉及以下几个类:

  • Configuration:该类的对象封装了客户端或服务器的配置
  • FileSystem:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作
  • FileSystem.get(Configuration conf):返回配置文件core-site.xml中指定的文件系统。如果配置文件中没有指定,此时使用的不再是分布式文件系统,而是本地文件系统(LocalFileSystem)
  • FileSystem.get(URI uri, Configuration conf):通过给定的 URI 方案和权限来确定要使用的文件系统,如没有指定则返回默认文件系统
  • FileSystem.get(URI uri, Configuration conf, String user):作为给定用户来访问文件系统。如果 HDFS 启用了权限控制,即 hdfs-site.xmldfs.permissions 的参数值为 true,则可能需要“伪造用户”来实现文件读写

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class GetFileSystem {
//使用默认文件系统。可以在程序中调用 Configuration 对象的 set() 方法修改默认的文件系统
@Test
public void getFileSystem1() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://node01:8020");
FileSystem fileSystem = FileSystem.get(configuration);
System.out.println(fileSystem.toString());
//DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_619133888_1, ugi=SANNAHA (auth:SIMPLE)]]
}

//使用给定的 URI 来确定要使用的文件系统
@Test
public void getFileSystem1() throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration, "JavaProgram");
System.out.println(fileSystem.toString());
//DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_647853457_1, ugi=JavaProgram (auth:SIMPLE)]]
}

//作为给定用户来访问文件系统
@Test
public void getFileSystem3() throws IOException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration, "JavaProgram");
System.out.println(fileSystem.toString());
//DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1426236787_1, ugi=JavaProgram (auth:SIMPLE)]]
}
}

获取输入流

有了 FileSystem 实例后,可以调用 open()方法获取文件的输入流,用于从 HDFS 获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//获取HDFS文件的输入流,标准输出到控制台
public class FileSystemTest {
public static void main(String[] args) throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration);
InputStream inputStream = null;
try {
inputStream = fileSystem.open(new Path("hdfs://node01:8020/test/input/test.txt"));
IOUtils.copyBytes(inputStream, System.out, 4096, false); //hello
} finally {
IOUtils.closeStream(inputStream);
}
}
}

open()方法返回的是 FSDataInputStream对象(而非标准的java.io类对象),这个类是继承了 java.io.DataInputStream的一个特殊类,实现了Seekable接口,提供了一个设置偏移量的方法seek(),以及一个查询当前位置相对于文件起始位置偏移量的查询方法getPos(),支持随机访问,因此可以从流的任意位置开始读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//使用seek()方法设置偏移量,从指定偏移量处读取文件
public class FileSystemTest {
public static void main(String[] args) throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration);
InputStream inputStream = null;
//创建FSDataInputStream对象
FSDataInputStream inputStream = null;
try {
inputStream = fileSystem.open(new Path("hdfs://node01:8020/test/input/test.txt"));
IOUtils.copyBytes(inputStream, System.out, 4096, false); //hello
//设置偏移量返回到距文件起始位置2个字节的位置
inputStream.seek(2);
//查询偏移量
long pos = inputStream.getPos();
System.out.println(pos); //2
IOUtils.copyBytes(inputStream, System.out, 4096, false); //llo
} finally {
IOUtils.closeStream(inputStream);
}
}
}

FSDataInputStream 类也实现了 PositionedReadable 接口,提供了read()方法,从指定偏移量处读取文件的一部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//使用read()方法设从指定偏移量处读取文件的一部分
public class FileSystemTest {
public static void main(String[] args) throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration);
InputStream inputStream = null;
//创建FSDataInputStream对象
FSDataInputStream inputStream = null;
byte[] bytes = new byte[10];
try {
inputStream = fileSystem.open(new Path("hdfs://node01:8020/test/input/test.txt"));
IOUtils.copyBytes(inputStream, System.out, 4096, false); //hello
//从距离文件起始偏移量2字节处开始读,读取3个字节,存入缓冲区bytes的,指定偏移量1字节处
inputStream.read(2, bytes, 1, 3);
String string = new String(bytes);
System.out.printf(string); //□llo□
} finally {
IOUtils.closeStream(inputStream);
}
}
}

获取输出流

FileSystem 类有一系列新建文件的方法,最简单的方法是使用create()方法给准备建的文件指定一个 Path 对象,然后返回一个用于写入数据的输出流,用于往 HDFS 写入数据。如果路径在 HDFS 中不存在,create()方法能够自动创建父目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CopyLocalfileToHDFS {
public static void main(String[] args) throws IOException {
String localPath = "e:/workspace/test.txt";
String dstPath = "hdfs://node01:8020/test/output/test.txt";
BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(localPath));
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create(dstPath), configuration);
//获取输出流。其中Progressable用于传递回调接口,显示数据写入datanode的进度,可省
FSDataOutputStream outputStream = fileSystem.create(new Path(dstPath), new Progressable() {
@Override
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(inputStream, outputStream, 4096, true);
}
}

create()方法返回的是FSDataOutputStream对象,与FSDataInputStream对象相似,也有用于查询当前位置相对于文件起始位置偏移量的方法getPos()但由于 HDFS 只允许对在文件末尾追加数据,因此写入时定位无意义。

创建目录

FileSystem 提供了创建目录的方法 mkdirs(),和 java.io.File类的 mkdirs()方法一样,可以自动创建父目录。

1
2
3
4
5
6
@Test
public void mkdirs() throws URISyntaxException, IOException {
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
fileSystem.mkdirs(new Path("/test/createdir"));
fileSystem.close();
}

遍历所有文件

使用官方提供的 RemoteIterator 迭代器获取所有的文件或文件夹:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package moe.sannaha.hdfsapitest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class GetFileList {
@Test
public void listMyFiles() throws Exception {
//获取 FileSystem 对象
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
//调用 listFiles() 方法获取 RemoteIterator 迭代器,第一个参数指定遍历的路径,第二个参数表示是否要递归
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true);
//通过 RemoteIterator 得到所有的文件或者文件夹
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
System.out.println(next.getPath().toString());
}
fileSystem.close();
}
}

也可以通过 FileSystem 提供的listStatus()方法,自己实现递归遍历获取所有文件或文件夹:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class GetFileList2 {
@Test
public void listFile() throws Exception {
//获取 FileSystem 对象
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
//调用 listFiles() 方法获取 fileStatuses,参数指定遍历的路径
FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));
//遍历 fileStatuses
for (FileStatus fileStatus : fileStatuses) {
//判断 fileStatuses 是否为文件夹
if (fileStatus.isDirectory()) {
Path path = fileStatus.getPath();
listAllFiles(fileSystem, path);
} else {
System.out.println("文件路径为" + fileStatus.getPath().toString());
}
}
}

/**
* 根据给定路径和文件系统,递归遍历该路径下的所有目录,打印所有文件的路径
* @param fileSystem
* @param path
* @throws Exception
*/
public void listAllFiles(FileSystem fileSystem, Path path) throws Exception {
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
listAllFiles(fileSystem, fileStatus.getPath());
} else {
Path path1 = fileStatus.getPath();
System.out.println("文件路径为" + path1);
}
}
}
}

文件下载

1
2
3
4
5
6
@Test
public void copyToLocal() throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), new Configuration());
fileSystem.copyToLocalFile(new Path("hdfs://node01:8020/test/output/test.txt"),new Path("file:///e:/workspace/test.txt"));
fileSystem.close();
}

文件上传

1
2
3
4
5
6
@Test
public void copyFromLocal() throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), new Configuration());
fileSystem.copyFromLocalFile(new Path("file:///e:/workspace/test.txt"),new Path("hdfs://node01:8020/test/input/test.txt"));
fileSystem.close();
}

小文件合并上传

Hadoop 存储的每一个文件都对应一条元数据,如果 Hadoop 中有大量小文件,会大大增加集群管理元数据的内存压力。如果可以建议在上传时将小文件合并成大文件,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//将本地文件系统中的多个小文件,并合并成一个大文件,上传到HDFS
@Test
public void mergeFile() throws Exception {
//获取 FileSystem 对象
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
FSDataOutputStream outputStream = fileSystem.create(new Path("/hello/mydir/test/bigfile.log"));
//获取 LocalFileSystem 对象
LocalFileSystem local = FileSystem.getLocal(new Configuration());
//通过本地文件系统获取 fileStatuses
FileStatus[] fileStatuses = local.listStatus(new Path("file:///E:\\logs"));
//遍历 fileStatuses,获取每个小文件的输入流,输出到HDFS
for (FileStatus fileStatus : fileStatuses) {
FSDataInputStream inputStream = local.open(fileStatus.getPath());
IOUtils.copyBytes(inputStream, outputStream, 4096, false);
IOUtils.closeStream(inputStream);
}
//关闭连接,释放资源
IOUtils.closeStream(outputStream);
local.close();
fileSystem.close();
}

winutils ERROR问题

在 IDEA 运行 MapReduce 程序时,产生 winutils ERROR 的原因是 Windows 下缺少的 Hadoop 环境。编译 Windows 版本的 Hadoop 程序后,配置系统环境变量如下(注意路径中不能存在空格,Program Files可用PROGRA~1代替):

1
2
3
4
5
6
- JAVA_HOME=C:\Program Files\Java\jdk1.8.0_221
+ JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_221
HADOOP_HOME=D:\hadoop-2.6.0-cdh5.14.0
Path=%JAVA_HOME%\bin
Path=%HADOOP_HOME%\bin
Path=%HADOOP_HOME%\sbin

参考资料
Hadoop分布式文件系统:架构和设计
HDFS - Checkpoint
hadoop2.X如何将namenode与SecondaryNameNode分开配置

  • 本文作者: SANNAHA
  • 本文链接: https://sannaha.moe/HDFS/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!