0%

HDFS快速上手

Hadoop 分布式文件系统(HDFS)被设计成适合运行在通用硬件上的分布式文件系统。它是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。


概述

前提和设计目标

硬件错误
硬件错误是常态而不是异常。HDFS可能由成百上千的服务器所构成,每个服务器上存储着文件系统的部分数据。我们面对的现实是构成系统的组件数目是巨大的,而且任一组件都有可能失效,这意味着总是有一部分HDFS的组件是不工作的。因此错误检测和快速、自动的恢复是HDFS最核心的架构目标。

流式数据访问
运行在HDFS上的应用和普通的应用不同,需要流式访问它们的数据集。HDFS的设计中更多的考虑到了数据批处理,而不是用户交互处理。比之数据访问的低延迟问题,更关键的在于数据访问的高吞吐量。

大规模数据集
运行在HDFS上的应用具有很大的数据集。HDFS上的一个典型文件大小一般都在G字节至T字节。因此,HDFS被调节以支持大文件存储。它应该能提供整体上高的数据传输带宽,能在一个集群里扩展到数百个节点。一个单一的HDFS实例应该能支撑数以千万计的文件。

简单的一致性模型
HDFS应用需要一个“一次写入多次读取”的文件访问模型。一个文件经过创建、写入和关闭之后就不需要改变。这一假设简化了数据一致性问题,并且使高吞吐量的数据访问成为可能。Map/Reduce应用或者网络爬虫应用都非常适合这个模型。目前还有计划在将来扩充这个模型,使之支持文件的附加写操作。

“移动计算比移动数据更划算”
一个应用请求的计算,离它操作的数据越近就越高效,在数据达到海量级别的时候更是如此。因为这样就能降低网络阻塞的影响,提高系统数据的吞吐量。将计算移动到数据附近,比之将数据移动到应用所在显然更好。HDFS为应用提供了将它们自己移动到数据附近的接口。

架构与概念

HDFS架构

  • HDFS 采用 master/slave 架构。一个 HDFS 集群由一个活动的 NameNode 和**多个 DataNode **组成。
  • NameNode 是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问
  • NameNode负责操作文件元数据,DataNode 负责处理文件内容的读写请求,在 NameNode 的统一调度下进行数据块的创建、删除和复制。跟文件内容相关的数据流不经过 NameNode,只询问要与哪个 DataNode 联系。
  • Namenode 控制复本存放在哪些 DataNode 上,根据全局情况作出块放置决定。读取文件时 NameNode 尽量让 Client 就近读取复本,以降低读取网络开销和读取延时。
  • NameNode 周期性的从集群中的每个 DataNode 接收心跳信号和块状态报告,接收到心跳信号意味着 DataNode 节点工作正常,块状态报告包含了一个该 DataNode 所有的数据列表。
NameNode DataNode
存储元数据 存储文件内容
元数据保存在内存中 文件内容保存在磁盘
保存文件、block、DataNode 之间的映射关系 维护 block id 到 DataNode 本地文件的映射关系

名字空间 (namespace)

HDFS支持传统的层次型文件组织结构。用户或者应用程序可以创建目录,然后将文件保存在这些目录里。文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。当前,HDFS不支持用户磁盘配额和访问权限控制,也不支持硬链接和软链接。但是HDFS架构并不妨碍实现这些特性。

NameNode 负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将被 NameNode 记录下来。应用程序可以设置HDFS保存的文件的复本数目。文件复本的数目称为文件的复本系数,这个信息也是由 NameNode 保存的。

数据复制

HDFS被设计成能够在一个大集群中跨机器可靠地存储超大文件。它将每个文件存储成一系列的数据块,除了最后一个,所有的数据块都是同样大小的。为了容错,文件的所有数据块都会有复本。每个文件的数据块大小和复本系数都是可配置的。应用程序可以指定某个文件的复本数目。复本系数可以在文件创建的时候指定,也可以在之后改变。HDFS中的文件都是一次性写入的,并且严格要求在任何时候只能有一个写入者。

NameNode 全权管理数据块的复制,它周期性地从集群中的每个 DataNode 接收心跳信号和块状态报告(BlockReport)。接收到心跳信号意味着该 DataNode 节点工作正常。块状态报告包含了一个该 DataNode 上所有数据块的列表。

安全模式

NameNode 启动后会进入一个称为安全模式的特殊状态,在这种状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求。处于安全模式的 NameNode 是不会进行数据块的复制的。NameNode 从所有的 DataNode 接收心跳信号和块状态报告。块状态报告包括了某个 DataNode 所有的数据块列表。每个数据块都有一个指定的最小复本数。当 NameNode 检测确认某个数据块的复本数目达到这个最小值,那么该数据块就会被认为是复本安全(safely replicated)的;在一定百分比(这个参数可配置)的数据块被 NameNode 检测确认是安全之后(加上一个额外的30秒等待时间),NameNode 将退出安全模式状态。接下来它会确定还有哪些数据块的复本没有达到指定数目,并将这些数据块复制到其他 DataNode 上。

文件的删除和恢复

当用户或应用程序删除某个文件时,这个文件并没有立刻从 HDFS 中删除。实际上,HDFS 会将这个文件重命名转移到 /trash 目录。只要文件还在 /trash 目录中,该文件就可以被迅速地恢复。文件在 /trash 中保存的时间是可配置的,当超过这个时间时,NameNode 就会将该文件从名字空间中删除。删除文件会使得该文件相关的数据块被释放。注意,从用户删除文件到HDFS空闲空间的增加之间会有一定时间的延迟。

只要被删除的文件还在 /trash 目录中,用户就可以恢复这个文件。/trash 目录仅仅保存被删除文件的最后复本。在 /trash 目录上HDFS会应用一个特殊策略来自动删除文件,目前的默认策略是删除 /trash 中保留时间超过6小时的文件。

分块存储

HDFS 中的文件在物理上是分块(block)存储的,在 hadoop 1.x 版本中默认大小是 64M,在 hadoop 2.x 版本中默认大小是 128M。为了容错,文件的所有 block 都会有复本。block 的大小和默认复本系数可以通过hdfs-site.xml配置参数来指定:

/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml
1
2
3
4
5
6
7
8
9
<property>
<name>dfs.replication</name>
<value>3</value>
</property>

<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>

抽象成数据块有哪些好处?

  1. 抽象成数据块后,一个文件的大小,可以大于集群中任意一个硬盘的大小。一个文件的块并不需要全部存储在同一个硬盘上,可以分布在集群中任意一个硬盘上。
  2. 使用抽象块而非整个文件作为存储单元,大大简化了系统的设计。简化设计,对于故障种类繁多的分布式系统来说尤为重要。以块为单位,一方面简化存储管理,因为块大小是固定的,所以一个硬盘放多少个块是非常容易计算的;另一方面,也消除了元数据的顾虑,因为 block 仅仅是存储的一块数据,其文件的元数据(例如权限等)就不需要跟数据块一起存储,可以交由其他系统来处理。
  3. 块非常适合用于数据备份进而提供数据容错能力和可用性

块缓存
通常 DataNode 从磁盘中读取块,但对于访问频繁的文件,其对应的块可能被显式地缓存在 DataNode 的内存中,以堆外块缓存的形式存在。默认情况下,一个块仅缓存在一个 DataNode 的内存中,当然可以针对每个文件配置 DataNode 的数量。作业调度器(用于MapReduce、Spark等框架的)通过在缓存块的 DataNode 上运行任务,可以利用块缓存的优势提高读操作的性能。

例如,连接(join)操作中使用的一个小的查询表就是块缓存的一个很好的候选。通过在缓存池(cache pool)中增加一个cache directive来告诉 NameNode 需要缓存哪些文件及存多久。缓存池是一个拥有管理缓存权限和资源使用的管理性分组。

文件权限

HDFS 的文件权限机制与 Linux 系统的文件权限机制类似。HDFS 相信你告诉我你是谁,你就是谁。HDFS 文件权限的目的是防止好人做错事,而不是阻止坏人做坏事。HDFS 可以通过hdfs-site.xml配置参数来决定是否启用权限控制:

/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml
1
2
3
4
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>

HDFS文件权限.png

元数据与CheckPoint

在 Hadoop 中,所有的元数据信息都保存在了 FSImageEdits 文件当中,这两个文件就记录了所有的数据的元数据信息,元数据信息的保存路径可以通过 hdfs-site.xml 配置参数来指定:

/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml
1
2
3
4
5
6
7
8
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/namenodeDatas</value>
</property>
<property>
<name>dfs.namenode.edits.dir</name>
<value>file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/nn/edits</value>
</property>

元数据的加载流程是什么?

  1. NameNode 启动后,加载元数据存储文件 FSImage 和事务日志文件 Edits 到内存。
  2. DataNode 上报 block 信息到 NameNode,也存储在内存中。

FSImage 与 Edits 是什么?

FSImage:HDFS 文件系统元数据存储在名为 “FSImage” 的文件中。它包含:

  • 整个文件系统命名空间
  • block 到文件的映射
  • 文件系统属性

Edits:NameNode 会使用叫作 EditLog 的事务日志持续记录下发生在文件系统元数据上的每一个变化。内存中的元数据在变化前会将操作记录在 Edits 文件中,以防止数据丢失。

为什么需要 FSImage 与 Edits?

首先需要明确:我们使用的最完整的元数据信息存储在 NameNode 的内存中。由于内存数据的易失性所以需要磁盘文件作备份。

FSImage 文件是作为元数据备份存储在磁盘中,但备份到磁盘需要时间,为了避免备份时新产生的元数据信息丢失,使用 Edits 文件通过预写日志记录元数据的变化。为了避免 Edits 文件大小无限膨胀下去,利用 SecondaryNameNode 或 JournalNode 来将 Edits 文件合并到 FSImage 文件中去。

SecondaryNameNode 是如何辅助管理 FSImage 文件与 Edits 文件的?

FSImage 文件与 Edits 文件的合并过程如下:

hdfs-checkpoint.png

  1. SecondaryNameNode 通知 NameNode 切换 Edits 文件,NameNode 会生成一个新的 Edits 文件
  2. SecondaryNameNode 从 NameNode 中获得 FSImage 文件和 Edits 文件(通过 HTTP GET)
  3. SecondaryNameNode 将 FSImage 文件载入内存,然后开始合并 Edits 文件,合并之后成为新的 FSImage 文件
  4. SecondaryNameNode 将新的 FSImage 文件发回给 NameNode
  5. NameNode 用新的 FSImage 文件替换旧的 FSImage 文件

可以通过修改 core-site.xml 文件改变 CheckPoint 触发条件,但一般来说不做修改:

  • fs.checkpoint.period:表示多长时间记录一次 FSImage,默认是1小时。
  • fs.checkpoint.size:表示一次记录的大小,默认是 64M。
hadoop-2.6.0-cdh5.14.0/etc/hadoop/core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<property><name>fs.checkpoint.period</name>
<value>3600</value>
<description>
The number of seconds between two periodic checkpoints.
</description>
</property>

<property>
<name>fs.checkpoint.size</name>
<value>67108864</value>
<description>
The size of the current edit log (in bytes) that triggers
a periodic checkpoint even if the fs.checkpoint.period hasn’t expired.
</description>
</property>

如何配置 SecondaryNameNode 实现与 NameNode 分离?

SecondaryNameNode 在合并 Edits 和 FSImage 时需要消耗的内存和 NameNode 差不多,所以一般把 NameNode 和SecondaryNameNode 放在不同的节点上。

  1. 通过创建 masters 文件指定 SecondaryNameNode 节点(可以指定多个,一行一个):
hadoop-2.6.0-cdh5.14.0/etc/hadoop/masters
1
node02

注:该配置文件的名字容易引起误会,名为masters实则用来配置 SecondaryNameNode。

  1. 修改 hdfs-site.xml 文件:
hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml
1
2
3
4
<property>
<name>dfs.namenode.http-address</name>
<value>node02:50070</value>
</property>

如何查看 FsImage 文件和 Edits 文件中的内容?

Offline Image Viewer Guide
Offline Edits Viewer Guide

读写过程

文件读取

HDFS 文件读取过程如下:

HDFS_Client_Read_File

文件读取在底层上的过程如下:

  1. Client 通过调用 DistributedFileSystem 对象的 open() 方法来打开 HDFS 文件;
  2. DistributedFileSystem 通过远程过程调用(RPC)来调用 NameNode 以确定文件起始 block 的位置。对于每一个 block,NameNode 返回存有该 block 复本的 DataNode 地址,并根据集群的网络拓扑结构计算出这些 DataNode 与 Client 的距离,然后进行排序。DistributedFileSystem 类返回一个 FSDataInputStream 对象给 Client 用于读取数据,继承自 FSDataInputStream 的 DFSInputStream 对象管理着 NN 和 DN 的 I/O ;
  3. Client 对 DFSInputStream 调用 read() 方法;
  4. DFSInputStream 与存储着文件起始 block 且距离最近的 DataNode 建立连接,将数据从 DataNode 传输给 Client。
  5. 读到 block 末端时,DFSInputStream 关闭与该 DataNode 的连接,然后寻找存储着下一个 block 的最佳 DataNode。这一过程对于 Client 来说是透明的,在 Client 看来它一直在读取一个连续的流;
  6. Client 读取完所有的数据,FSDataInputStream 调用 close() 方法关闭输入流。

读取数据时,如果 DFSInputStream 在与 DataNode 通信时出现错误,会尝试从另一个拥有该 block 复本的距离最近的 DataNode 继续读取,同时记录那个故障的 DataNode,保证以后不会反复读取该节点上后续的 block;DFSInputStream 还会通过校验和(checksum)验证数据的完整性,如果发现有损坏的 block,也会尝试从其他 DataNode 读取复本,同时将被损坏的 block 通知给 NameNode。

文件写入

HDFS_Client_Write_File

  1. Client 通过 DistributedFileSystem 对象调用 create() 来新建文件;
  2. DistributedFileSystem 对象向 NameNode 发起 RPC 调用在文件系统的命名空间新建一个文件。NameNode 检查目标文件是否已存在以及 Client 是否有权限。如果检查通过,NameNode 为创建新文件记下一条记录,DistributedFileSystem 向 Client 返回一个 FSDataOutputStream 对象,再由 FSDataOutputStream 封装一个 DFSOutPutStream 对象,由该对象负责处理 DataNode 和 NameNode 之间的通信;
  3. DFSOutPutStream 将 Client 写入的数据分成一个个数据包(Packet,默认为 64 KB),并写入内部的数据队列(data queue);
  4. 数据队列由 DFSOutputStream 对象中的 DataStreamer 线程处理,它挑选出适合存储数据复本的一组 DataNode,要求 NameNode 分配新的数据块,这组 DataNode 构成一个管线(pipeline)。假设复本数为 3,则管线有 3 个节点,DataStreamer 将数据包流式传输到管线中第 1 个 DataNode,该 DataNode 将数据包发送给管线中的第 2 个 DataNode,第 2 个 DataNode 再发送给第 3 个DataNode;
  5. DFSOutPutStream 还维护着一个确认队列(ack queue),数据包被发送到管线中的第一个 DataNode 后,会被移动到确认队列。DataNode 收到数据包后会返回一个确认回执,当 DataStreamer 收到管线中所有 DataNode 的确认信息后,才会将该数据包从确认队列中删除;
  6. Client 完成数据的写入后,对 FSDataOutputStream 调用 close() 方法。该操作会将剩余的所有数据包写入 DataNode 管线,收到确认回执后向 NameNode 告知文件写入完成。

如果 DataNode 在数据写入期间发生故障,会先关闭管线,将确认队列中所有数据包添加回数据队列的最前端,以确保故障节点下游的 DataNode 不会漏掉数据包;为正常存储在另一 DataNode 的当前数据块指定一个新的标识,并将该标识传送给 NameNode,以便故障 DataNode 在恢复后可以删除存储的部分数据块。从管线中删除故障的 DataNode,基于 2 个正常 DataNode 构建一条新管线用于写入剩下的 block。如果复本数不足,NameNode 会在另一个 DataNode 上创建新的复本。

NameNode 如何选择在哪个 DateNode 存储复本?

Hadoop 在设计时考虑到数据存储的安全与高效,数据文件默认在 HDFS 上存放3份:1 份存放在 Client 节点上,2 份存放在另外某一个机架中的两个节点上。这样不仅能提供很好的稳定性(block 存储在两个机架中)并实现很好的负载均衡,包括写入带宽(写入操作只需经过一个交换机)、读取性能(可以从两个机架中选择读取)和集群中 block 的均匀分布(客户端只在本地机架写入一个 block)。

块在HDFS的复制过程

HDFS基准测试

测试写入速度

测试 HDFS 的文件写入性能:

1
2
3
4
5
# 向HDFS文件系统写入10个文件,每个文件10MB,文件存放到/benchmarks/TestDFSIO中
yarn jar /export/servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0.jar TestDFSIO -write -nrFiles 10 -fileSize 10MB

# 查看写入速度测试结果
yarn dfs -text /benchmarks/TestDFSIO/io_write/part-00000

测试读取速度

测试 HDFS 的文件读取性能:

1
2
3
4
5
# 从HDFS文件系统读取10个文件,每个文件10M
yarn jar /export/servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0.jar TestDFSIO -read -nrFiles 10 -fileSize 10MB

# 查看读取速度测试结果
yarn dfs -text /benchmarks/TestDFSIO/io_read/part-00000

清除测试数据

1
yarn jar /export/servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0.jar TestDFSIO -clean

命令行API

查看文件系统状况

运行 DFS 管理客户端,查看状况报告:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
$ hdfs dfsadmin -report
Configured Capacity: 103422541824 (96.32 GB)
Present Capacity: 78714851328 (73.31 GB)
DFS Remaining: 74144059392 (69.05 GB)
DFS Used: 4570791936 (4.26 GB)
DFS Used%: 5.81%
Replicated Blocks:
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0
Low redundancy blocks with highest priority to recover: 0
Pending deletion blocks: 0
Erasure Coded Block Groups:
Low redundancy block groups: 0
Block groups with corrupt internal blocks: 0
Missing block groups: 0
Low redundancy blocks with highest priority to recover: 0
Pending deletion blocks: 0

-------------------------------------------------
Live datanodes (3):

Name: 192.168.153.200:9866 (devcdh1.cdh.com)
Hostname: devcdh1.cdh.com
Rack: /default
Decommission Status : Normal
Configured Capacity: 34474180608 (32.11 GB)
DFS Used: 1523503104 (1.42 GB)
Non DFS Used: 15918665728 (14.83 GB)
DFS Remaining: 17032011776 (15.86 GB)
DFS Used%: 4.42%
DFS Remaining%: 49.41%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 2
Last contact: Tue Jul 14 14:13:34 CST 2020
Last Block Report: Tue Jul 14 14:02:28 CST 2020


Name: 192.168.153.201:9866 (devcdh2.cdh.com)
Hostname: devcdh2.cdh.com
Rack: /default
Decommission Status : Normal
Configured Capacity: 34474180608 (32.11 GB)
DFS Used: 1523789824 (1.42 GB)
Non DFS Used: 4306612224 (4.01 GB)
DFS Remaining: 28643778560 (26.68 GB)
DFS Used%: 4.42%
DFS Remaining%: 83.09%
Configured Cache Capacity: 432013312 (412 MB)
Cache Used: 0 (0 B)
Cache Remaining: 432013312 (412 MB)
Cache Used%: 0.00%
Cache Remaining%: 100.00%
Xceivers: 2
Last contact: Tue Jul 14 14:13:33 CST 2020
Last Block Report: Tue Jul 14 14:02:27 CST 2020


Name: 192.168.153.202:9866 (devcdh3.cdh.com)
Hostname: devcdh3.cdh.com
Rack: /default
Decommission Status : Normal
Configured Capacity: 34474180608 (32.11 GB)
DFS Used: 1523499008 (1.42 GB)
Non DFS Used: 4482412544 (4.17 GB)
DFS Remaining: 28468269056 (26.51 GB)
DFS Used%: 4.42%
DFS Remaining%: 82.58%
Configured Cache Capacity: 432013312 (412 MB)
Cache Used: 0 (0 B)
Cache Remaining: 432013312 (412 MB)
Cache Used%: 0.00%
Cache Remaining%: 100.00%
Xceivers: 2
Last contact: Tue Jul 14 14:13:36 CST 2020
Last Block Report: Tue Jul 14 14:02:27 CST 2020

进行 DFS 文件系统检查:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
$ hdfs fsck /
Connecting to namenode via http://devcdh1.cdh.com:9870/fsck?ugi=root&path=%2F
FSCK started by root (auth:SIMPLE) from /192.168.153.200 for path / at Tue Jul 14 14:17:22 CST 2020

Status: HEALTHY
Number of data-nodes: 3
Number of racks: 1
Total dirs: 953
Total symlinks: 0

Replicated Blocks:
Total size: 1511151131 B
Total files: 154
Total blocks (validated): 155 (avg. block size 9749362 B)
Minimally replicated blocks: 155 (100.0 %)
Over-replicated blocks: 0 (0.0 %)
Under-replicated blocks: 0 (0.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 3
Average block replication: 2.9741936
Missing blocks: 0
Corrupt blocks: 0
Missing replicas: 0 (0.0 %)

Erasure Coded Block Groups:
Total size: 0 B
Total files: 0
Total block groups (validated): 0
Minimally erasure-coded block groups: 0
Over-erasure-coded block groups: 0
Under-erasure-coded block groups: 0
Unsatisfactory placement block groups: 0
Average block group size: 0.0
Missing block groups: 0
Corrupt block groups: 0
Missing internal blocks: 0
FSCK ended at Tue Jul 14 14:17:22 CST 2020 in 21 milliseconds


The filesystem under path '/' is HEALTHY

查看目录和文件大小:

1
2
3
4
5
$ hdfs dfs -du -h /
562.4 M 1.6 G /hdfsdata
54 162 /hivetable
7.2 M 21.5 M /tmp
871.6 M 2.6 G /user

注意:前后两个文件大小分别是指逻辑空间和物理空间,物理空间 = 逻辑空间 * 复本数。

查看目录

ls 命令格式及示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
hdfs dfs -ls [-R] <args>
-R选项递归返回目录列表
args参数指定路径

# 如果指定路径下为目录,返回结果格式如下
permissions userid groupid modification_date modification_time dirname

$ hdfs dfs -ls /
Found 12 items
drwxr-xr-x - root supergroup 0 2018-07-31 21:02 /export
drwxr-xr-x - root supergroup 0 2018-07-18 21:37 /flink
drwxr-xr-x - SANNAHA supergroup 0 2018-07-19 17:32 /flink-checkpoint
drwxr-xr-x - SANNAHA supergroup 0 2018-07-19 20:09 /flink-checkpoint-0719
drwxr-xr-x - root supergroup 0 2018-07-18 20:26 /hbase

# 如果指定路径下为文件,返回结果格式及示例如下
permissions number_of_replicas userid groupid filesize modification_date modification_time filename

$ hdfs dfs -ls /export/servers/exporthive
-rwxr-xr-x 3 root supergroup 11 2018-07-31 21:03 /export/servers/exporthive/000000_0

创建目录

mkdir 命令格式及示例:

1
2
3
4
5
6
hdfs dfs -mkdir [-p] <paths>
-p选项与Linux相似,沿路径创建父目录
paths参数将路径uri作为参数并创建目录

$ hdfs dfs -mkdir -p /user/hadoop/dir1
$ hdfs dfs -mkdir hdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir

测试目录文件是否存在

test 命令格式及示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
hdfs dfs -test -[defsz] <path>

选项:
-d: if the path is a directory, return 0.
-e: if the path exists, return 0.
-f: if the path is a file, return 0.
-s: if the path is not empty, return 0.
-z: if the file is zero length, return 0.

# 查看是否存在目录
$ hdfs dfs -test -d /user/hive/warehouse/sannaha.db/tb_test
$ echo $?
0

文件系统间复制文件

将一个或多个文件从本地上传到 HDFS:

1
2
3
4
5
6
7
# put 命令格式
hdfs dfs -put <localsrc>... <dst>

# 如果上传时保留原文件的名称,该命令可简化为 hdfs dfs -put localfile /user/hadoop
$ hdfs dfs -put localfile /user/hadoop/hadoopfile
$ hdfs dfs -put localfile1 localfile2 /user/hadoop/hadoopdir
$ hdfs dfs -put localfile hdfs://nn.example.com/hadoop/hadoopfile

将一个或多个文件从 HDFS 下载到本地:

1
2
3
4
5
6
7
# get 命令格式
hdfs dfs -get <dst>... <localsrc>

# 如果下载时保留原文件的名称,该命令可简化为 hdfs dfs -get /user/hadoop/hadoopfile ./
$ hdfs dfs -get /user/hadoop/hadoopfile localfile
$ hdfs dfs -get /user/hadoop/localfile1 /user/hadoop/localfile2 ./
$ hdfs dfs -get hdfs://nn.example.com/hadoop/hadoopfile localfile

将单个或多个文件从本地移动到 HDFS,移动后本地文件会被删除:

1
2
# moveFromLocal 命令格式
$ hdfs dfs -moveFromLocal <localsrc>... <dst>

将多个本地小文件合并成一个大文件上传到 HDFS:

1
$ hdfs dfs -appendToFile localfile1 localfile2 /mergefile

将多个 HDFS 文件合并成一个大文件下载到本地:

1
2
$ hdfs dfs -getmerge /config/*.xml ./hello.xml
$ hdfs dfs -getmerge /config ./hello.xml

文件系统内复制和移动

mv 命令格式及示例:

1
2
3
4
5
# 将文件从源移动到目标。此命令允许多个源,在这种情况下,目标需要是目录。不允许跨文件系统移动文件
hdfs dfs -mv URI [URI ...] <dest>

$ hdfs dfs -mv /user/hadoop/file1 /user/hadoop/file2
$ hdfs dfs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2 hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1

cp 命令格式及示例:

1
2
3
4
5
6
7
8
9
10
# 复制文件/文件夹,可以覆盖,可以保留原有权限信息
hdfs dfs -cp [-f] [-p | -p[topax]] URI [URI ...] <dest>
-f 选项在目标已存在的情况下将覆盖目标
-p选项将保留文件属性[topx](时间戳timestamps,所有权ownership,权限permission,访问控制列表ACL,文件系统的扩展属性XAttr)
如果指定了-p且没有参数,则保留时间戳,所有权和权限
如果指定了-pa,则还保留权限,因为ACL是一组超级权限
确定是否保留原始命名空间扩展属性与-p标志无关

$ hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2
$ hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir

查看内容

cat 命令格式及示例:

1
2
3
4
hdfs dfs -cat URI [URI ...]

$ hdfs dfs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
$ hdfs dfs -cat file:///file3 /user/hadoop/file4

追加内容

appendToFile 命令格式及示例:

1
2
3
4
5
6
# 追加一个或者多个文件到hdfs指定文件中
hdfs dfs -appendToFile <localsrc> ... <dst>

$ hdfs dfs -appendToFile localfile /user/hadoop/hadoopfile
$ hdfs dfs -appendToFile localfile1 localfile2 /user/hadoop/hadoopfile
$ hdfs dfs -appendToFile localfile hdfs://nn.example.com/hadoop/hadoopfile

删除

rm 命令格式及示例:

1
2
3
4
5
6
7
8
# 删除指定路径的文件
hdfs dfs -rm [-f] [-r|-R] [-skipTrash] URI [URI ...]
-f选项在目标文件不存在的情况下不显示错误信息
-R选项以递归方式删除目录及其下的所有内容
-r选项等效于-R
-skipTrash选项将立即删除指定的文件,并绕过垃圾桶(如果已启用)

$ hdfs dfs -rm hdfs://nn.example.com/file /user/hadoop/emptydir

修改权限

chmod 命令格式及示例:

1
2
3
4
hdfs dfs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...]
-R选项将以递归方式进行更改。

$ hdfs dfs -chmod 600 /config/core-site.xml

修改所有者

chown 命令格式:

1
2
hdfs dfs -chown [-R] [OWNER][:[GROUP]] URI [URI ]
-R选项将以递归方式进行更改。

清空回收站

expunge 命令格式:

1
hdfs dfs -expunge

分布式拷贝

DistCp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。 它使用 MapReduce 实现文件分发、错误处理、恢复以及报告生成。DistCp 启动的 MapReduce 作业没有 reduce 任务,只通过 map 任务完成文件或目录的复制。

命令格式及示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 格式
hadoop distcp OPTIONS [source_path...] <target_path>

常用选项:
-append:追加,如果可能的话将复用目标路径中现有的数据,并追加新数据
-update:更新,仅复制缺少的文件或目录
-overwrite:覆盖,无条件覆盖目标文件
-delete :删除,从目标路径中删除源路径中不存在的文件
-i:复制时忽略失败
-f <arg>:使用存储源路径的列表文件
-m <arg>:同时复制的最大 map 数量
-p <arg>:复制时保留状态信息,如果 -p 后不指定参数,则保留复本、块大小、用户、组、权限、校验和以及时间戳
参数可选 rbugpcaxt (replication, block-size, user, group, permission, checksum-type, ACL, XATTR, timestamps)
-bandwidth <arg>:指定每个 map 的带宽,单位为 MB

# 示例
# 基本用法,如果两个集群的 hadoop 版本相同,分布式拷贝时可以使用 hdfs 协议
hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/foo/bar
# 源路径可以有多个,目标路径只能有一个
hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/foo/bar hdfs://nn3:8020/foo/bar
# 如果两个集群的 hadoop 版本不同,则需要使用 hftp 协议
hadoop distcp hftp://nn1:50070/foo/bar hdfs://nn2/foo/bar

# 指定 MapReduce 任务名,限制 map 数量及带宽
$ hadoop distcp -Dmapreduce.job.name=syncHiveData -bandwidth 1000 -m 10 \
hdfs://Cluster1/user/hive/warehouse/sannaha.db/tb_test \
hdfs://Cluster2/user/hive/warehouse/sannaha.db/tb_test

源路径可以有多个,目标路径只能有一个;如果两个集群的 hadoop 版本相同,分布式拷贝时可以使用 hdfs 协议,否则需要使用 hftp 协议:

1
2


Java API

依赖包

由于 CDH 版本的软件涉及版权的问题,没有将所有的 jar 包托管到 Maven 仓库当中去,而是托管在了 Cloudera 自己的服务器上面,所以需要手动指定仓库地址。

使用 CDH5 Maven 仓库
适用于CDH 5.14.x版本的 Maven Artifacts
使用 CDH6 Maven 仓库
适用于CDH 6.1.x版本的 Maven Artifacts

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-mr1-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0-cdh5.14.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!--<verbal>true</verbal>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
<!--<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>moe.sannaha.hadoop.db.DBToHDFS</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>-->
</plugins>
</build>

HadoopURL方式获取数据

从 Hadoop 读取文件,最简单的方法是使用 java.net.URL 对象打开数据流,从中读取数据。

为了让 Java 程序识别 Hadoop 的 HDFS URL 形式,需要通过 FsUrlStreamHandlerFactory 实例调用 java.net.URL对象的setURLStreamHandlerFactory()方法。每个 Java 虚拟机只能调用一次这个方法,因此通常在静态方法中调用。如果程序的其他组件已经声明了一个 FsUrlStreamHandlerFactory实例,就无法再使用这种方法从 HDFS 读取数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package moe.sannaha.hdfsapitest;

import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;

public class GetFileByURL {
static {
//注册HDFS的URL,让Java代码能够识别
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) {
InputStream inputStream = null;
FileOutputStream outputStream = null;
//指定访问文件的URL地址
String url = "hdfs://node01:8020/test/input/test.txt";
try {
//打开数据流
inputStream = new URL(url).openStream();
outputStream = new FileOutputStream(new File("e:/test.txt"));
//调用 Hadoop 中简洁的 IOUtils 类中的 copyBytes() 方法,参数为:输入流,输出流,复制缓冲区大小,复制结束是否关闭数据流
//输出到控制台
IOUtils.copyBytes(inputStream,System.out,4096,false);
//输出到本地文件
IOUtils.copyBytes(inputStream,outputStream,4096,false);
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(inputStream);
IOUtils.closeStream(outputStream);
}
}
}

FileSystem方式获取数据

通过 URL 方式获取 HDFS 文件流虽然简单,但存在局限。因此需要 FileSystem API 打开文件的数据流。

获取FileSystem

在 Java 中获取 FileSystem,主要涉及以下几个类:

  • Configuration:该类的对象封装了客户端或服务器的配置
  • FileSystem:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作
  • FileSystem.get(Configuration conf):返回配置文件core-site.xml中指定的文件系统。如果配置文件中没有指定,此时使用的不再是分布式文件系统,而是本地文件系统(LocalFileSystem)
  • FileSystem.get(URI uri, Configuration conf):通过给定的 URI 方案和权限来确定要使用的文件系统,如没有指定则返回默认文件系统
  • FileSystem.get(URI uri, Configuration conf, String user):作为给定用户来访问文件系统。如果 HDFS 启用了权限控制,即 hdfs-site.xmldfs.permissions 的参数值为 true,则可能需要“伪造用户”来实现文件读写

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class GetFileSystem {
//使用默认文件系统。可以在程序中调用 Configuration 对象的 set() 方法修改默认的文件系统
@Test
public void getFileSystem1() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://node01:8020");
FileSystem fileSystem = FileSystem.get(configuration);
System.out.println(fileSystem.toString());
//DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_619133888_1, ugi=SANNAHA (auth:SIMPLE)]]
}

//使用给定的 URI 来确定要使用的文件系统
@Test
public void getFileSystem1() throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration, "JavaProgram");
System.out.println(fileSystem.toString());
//DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_647853457_1, ugi=JavaProgram (auth:SIMPLE)]]
}

//作为给定用户来访问文件系统
@Test
public void getFileSystem3() throws IOException, InterruptedException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration, "JavaProgram");
System.out.println(fileSystem.toString());
//DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1426236787_1, ugi=JavaProgram (auth:SIMPLE)]]
}
}

获取输入流

有了 FileSystem 实例后,可以调用 open()方法获取文件的输入流,用于从 HDFS 获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//获取HDFS文件的输入流,标准输出到控制台
public class FileSystemTest {
public static void main(String[] args) throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration);
InputStream inputStream = null;
try {
inputStream = fileSystem.open(new Path("hdfs://node01:8020/test/input/test.txt"));
IOUtils.copyBytes(inputStream, System.out, 4096, false); //hello
} finally {
IOUtils.closeStream(inputStream);
}
}
}

open()方法返回的是 FSDataInputStream对象(而非标准的java.io类对象),这个类是继承了 java.io.DataInputStream的一个特殊类,实现了Seekable接口,提供了一个设置偏移量的方法seek(),以及一个查询当前位置相对于文件起始位置偏移量的查询方法getPos(),支持随机访问,因此可以从流的任意位置开始读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//使用seek()方法设置偏移量,从指定偏移量处读取文件
public class FileSystemTest {
public static void main(String[] args) throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration);
InputStream inputStream = null;
//创建FSDataInputStream对象
FSDataInputStream inputStream = null;
try {
inputStream = fileSystem.open(new Path("hdfs://node01:8020/test/input/test.txt"));
IOUtils.copyBytes(inputStream, System.out, 4096, false); //hello
//设置偏移量返回到距文件起始位置2个字节的位置
inputStream.seek(2);
//查询偏移量
long pos = inputStream.getPos();
System.out.println(pos); //2
IOUtils.copyBytes(inputStream, System.out, 4096, false); //llo
} finally {
IOUtils.closeStream(inputStream);
}
}
}

FSDataInputStream 类也实现了 PositionedReadable 接口,提供了read()方法,从指定偏移量处读取文件的一部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//使用read()方法设从指定偏移量处读取文件的一部分
public class FileSystemTest {
public static void main(String[] args) throws IOException {
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), configuration);
InputStream inputStream = null;
//创建FSDataInputStream对象
FSDataInputStream inputStream = null;
byte[] bytes = new byte[10];
try {
inputStream = fileSystem.open(new Path("hdfs://node01:8020/test/input/test.txt"));
IOUtils.copyBytes(inputStream, System.out, 4096, false); //hello
//从距离文件起始偏移量2字节处开始读,读取3个字节,存入缓冲区bytes的,指定偏移量1字节处
inputStream.read(2, bytes, 1, 3);
String string = new String(bytes);
System.out.printf(string); //□llo□
} finally {
IOUtils.closeStream(inputStream);
}
}
}

获取输出流

FileSystem 类有一系列新建文件的方法,最简单的方法是使用create()方法给准备建的文件指定一个 Path 对象,然后返回一个用于写入数据的输出流,用于往 HDFS 写入数据。如果路径在 HDFS 中不存在,create()方法能够自动创建父目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CopyLocalfileToHDFS {
public static void main(String[] args) throws IOException {
String localPath = "e:/workspace/test.txt";
String dstPath = "hdfs://node01:8020/test/output/test.txt";
BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(localPath));
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(URI.create(dstPath), configuration);
//获取输出流。其中Progressable用于传递回调接口,显示数据写入datanode的进度,可省
FSDataOutputStream outputStream = fileSystem.create(new Path(dstPath), new Progressable() {
@Override
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(inputStream, outputStream, 4096, true);
}
}

create()方法返回的是FSDataOutputStream对象,与FSDataInputStream对象相似,也有用于查询当前位置相对于文件起始位置偏移量的方法getPos()但由于 HDFS 只允许对在文件末尾追加数据,因此写入时定位无意义。

创建目录

FileSystem 提供了创建目录的方法 mkdirs(),和 java.io.File类的 mkdirs()方法一样,可以自动创建父目录。

1
2
3
4
5
6
@Test
public void mkdirs() throws URISyntaxException, IOException {
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
fileSystem.mkdirs(new Path("/test/createdir"));
fileSystem.close();
}

遍历所有文件

使用官方提供的 RemoteIterator 迭代器获取所有的文件或文件夹:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package moe.sannaha.hdfsapitest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class GetFileList {
@Test
public void listMyFiles() throws Exception {
//获取 FileSystem 对象
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
//调用 listFiles() 方法获取 RemoteIterator 迭代器,第一个参数指定遍历的路径,第二个参数表示是否要递归
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true);
//通过 RemoteIterator 得到所有的文件或者文件夹
while (locatedFileStatusRemoteIterator.hasNext()) {
LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
System.out.println(next.getPath().toString());
}
fileSystem.close();
}
}

也可以通过 FileSystem 提供的listStatus()方法,自己实现递归遍历获取所有文件或文件夹:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class GetFileList2 {
@Test
public void listFile() throws Exception {
//获取 FileSystem 对象
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
//调用 listFiles() 方法获取 fileStatuses,参数指定遍历的路径
FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/"));
//遍历 fileStatuses
for (FileStatus fileStatus : fileStatuses) {
//判断 fileStatuses 是否为文件夹
if (fileStatus.isDirectory()) {
Path path = fileStatus.getPath();
listAllFiles(fileSystem, path);
} else {
System.out.println("文件路径为" + fileStatus.getPath().toString());
}
}
}

/**
* 根据给定路径和文件系统,递归遍历该路径下的所有目录,打印所有文件的路径
* @param fileSystem
* @param path
* @throws Exception
*/
public void listAllFiles(FileSystem fileSystem, Path path) throws Exception {
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
listAllFiles(fileSystem, fileStatus.getPath());
} else {
Path path1 = fileStatus.getPath();
System.out.println("文件路径为" + path1);
}
}
}
}

文件下载

1
2
3
4
5
6
@Test
public void copyToLocal() throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), new Configuration());
fileSystem.copyToLocalFile(new Path("hdfs://node01:8020/test/output/test.txt"),new Path("file:///e:/workspace/test.txt"));
fileSystem.close();
}

文件上传

1
2
3
4
5
6
@Test
public void copyFromLocal() throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020"), new Configuration());
fileSystem.copyFromLocalFile(new Path("file:///e:/workspace/test.txt"),new Path("hdfs://node01:8020/test/input/test.txt"));
fileSystem.close();
}

小文件合并上传

Hadoop 存储的每一个文件都对应一条元数据,如果 Hadoop 中有大量小文件,会大大增加集群管理元数据的内存压力。如果可以建议在上传时将小文件合并成大文件,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//将本地文件系统中的多个小文件,并合并成一个大文件,上传到HDFS
@Test
public void mergeFile() throws Exception {
//获取 FileSystem 对象
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
FSDataOutputStream outputStream = fileSystem.create(new Path("/hello/mydir/test/bigfile.log"));
//获取 LocalFileSystem 对象
LocalFileSystem local = FileSystem.getLocal(new Configuration());
//通过本地文件系统获取 fileStatuses
FileStatus[] fileStatuses = local.listStatus(new Path("file:///E:\\logs"));
//遍历 fileStatuses,获取每个小文件的输入流,输出到HDFS
for (FileStatus fileStatus : fileStatuses) {
FSDataInputStream inputStream = local.open(fileStatus.getPath());
IOUtils.copyBytes(inputStream, outputStream, 4096, false);
IOUtils.closeStream(inputStream);
}
//关闭连接,释放资源
IOUtils.closeStream(outputStream);
local.close();
fileSystem.close();
}

winutils ERROR问题

在 IDEA 运行 MapReduce 程序时,产生 winutils ERROR 的原因是 Windows 下缺少的 Hadoop 环境。编译 Windows 版本的 Hadoop 程序后,配置系统环境变量如下(注意路径中不能存在空格,Program Files可用PROGRA~1代替):

1
2
3
4
5
6
- JAVA_HOME=C:\Program Files\Java\jdk1.8.0_221
+ JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_221
HADOOP_HOME=D:\hadoop-2.6.0-cdh5.14.0
Path=%JAVA_HOME%\bin
Path=%HADOOP_HOME%\bin
Path=%HADOOP_HOME%\sbin

参考资料

《Hadoop权威指南 第四版》
Hadoop分布式文件系统:架构和设计
HDFS - Checkpoint
hadoop2.X如何将namenode与SecondaryNameNode分开配置

  • 本文作者: SANNAHA
  • 本文链接: https://sannaha.moe/HDFS/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!