Hadoop 分布式文件系统(HDFS)被设计成适合运行在通用硬件上的分布式文件系统。它是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。
概述 前提和设计目标 硬件错误 硬件错误是常态而不是异常 。HDFS可能由成百上千的服务器所构成,每个服务器上存储着文件系统的部分数据。我们面对的现实是构成系统的组件数目是巨大的,而且任一组件都有可能失效,这意味着总是有一部分HDFS的组件是不工作的。因此错误检测和快速、自动的恢复是HDFS最核心的架构目标。
流式数据访问 运行在HDFS上的应用和普通的应用不同,需要流式访问它们的数据集。HDFS的设计中更多的考虑到了数据批处理,而不是用户交互处理。比之数据访问的低延迟问题,更关键的在于数据访问的高吞吐量。
大规模数据集 运行在HDFS上的应用具有很大的数据集。HDFS上的一个典型文件大小一般都在G字节至T字节。因此,HDFS被调节以支持大文件存储。它应该能提供整体上高的数据传输带宽,能在一个集群里扩展到数百个节点。一个单一的HDFS实例应该能支撑数以千万计的文件。
简单的一致性模型 HDFS应用需要一个“一次写入多次读取” 的文件访问模型。一个文件经过创建、写入和关闭之后就不需要改变。这一假设简化了数据一致性问题,并且使高吞吐量的数据访问成为可能。Map/Reduce应用或者网络爬虫应用都非常适合这个模型。目前还有计划在将来扩充这个模型,使之支持文件的附加写操作。
“移动计算比移动数据更划算” 一个应用请求的计算,离它操作的数据越近就越高效,在数据达到海量级别的时候更是如此。因为这样就能降低网络阻塞的影响,提高系统数据的吞吐量。将计算移动到数据附近,比之将数据移动到应用所在显然更好。HDFS为应用提供了将它们自己移动到数据附近的接口。
架构与概念
HDFS 采用 master/slave 架构。一个 HDFS 集群由一个活动的 NameNode 和**多个 DataNode **组成。
NameNode 是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问 。
NameNode负责操作文件元数据,DataNode 负责处理文件内容的读写请求 ,在 NameNode 的统一调度下进行数据块的创建、删除和复制。跟文件内容相关的数据流不经过 NameNode,只询问要与哪个 DataNode 联系。
Namenode 控制复本存放在哪些 DataNode 上,根据全局情况作出块放置决定。读取文件时 NameNode 尽量让 Client 就近读取复本 ,以降低读取网络开销和读取延时。
NameNode 周期性的从集群中的每个 DataNode 接收心跳信号和块状态报告 ,接收到心跳信号意味着 DataNode 节点工作正常,块状态报告包含了一个该 DataNode 所有的数据列表。
NameNode
DataNode
存储元数据
存储文件内容
元数据保存在内存中
文件内容保存在磁盘
保存文件、block、DataNode 之间的映射关系
维护 block id 到 DataNode 本地文件的映射关系
名字空间 (namespace)
HDFS支持传统的层次型文件组织结构。用户或者应用程序可以创建目录,然后将文件保存在这些目录里。文件系统名字空间的层次结构和大多数现有的文件系统类似:用户可以创建、删除、移动或重命名文件。当前,HDFS不支持用户磁盘配额和访问权限控制,也不支持硬链接和软链接。但是HDFS架构并不妨碍实现这些特性。
NameNode 负责维护文件系统的名字空间,任何对文件系统名字空间或属性的修改都将被 NameNode 记录下来。应用程序可以设置HDFS保存的文件的复本数目。文件复本的数目称为文件的复本系数,这个信息也是由 NameNode 保存的。
数据复制
HDFS被设计成能够在一个大集群中跨机器可靠地存储超大文件。它将每个文件存储成一系列的数据块,除了最后一个,所有的数据块都是同样大小的。为了容错,文件的所有数据块都会有复本。每个文件的数据块大小和复本系数都是可配置的。应用程序可以指定某个文件的复本数目。复本系数可以在文件创建的时候指定,也可以在之后改变。HDFS中的文件都是一次性写入的,并且严格要求在任何时候只能有一个写入者。
NameNode 全权管理数据块的复制,它周期性地从集群中的每个 DataNode 接收心跳信号和块状态报告(BlockReport)。接收到心跳信号意味着该 DataNode 节点工作正常。块状态报告包含了一个该 DataNode 上所有数据块的列表。
安全模式
NameNode 启动后会进入一个称为安全模式的特殊状态,在这种状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求。处于安全模式的 NameNode 是不会进行数据块的复制的。NameNode 从所有的 DataNode 接收心跳信号和块状态报告。块状态报告包括了某个 DataNode 所有的数据块列表。每个数据块都有一个指定的最小复本数。当 NameNode 检测确认某个数据块的复本数目达到这个最小值,那么该数据块就会被认为是复本安全(safely replicated)的;在一定百分比(这个参数可配置)的数据块被 NameNode 检测确认是安全之后(加上一个额外的30秒等待时间),NameNode 将退出安全模式状态。接下来它会确定还有哪些数据块的复本没有达到指定数目,并将这些数据块复制到其他 DataNode 上。
文件的删除和恢复
当用户或应用程序删除某个文件时,这个文件并没有立刻从 HDFS 中删除。实际上,HDFS 会将这个文件重命名转移到 /trash
目录。只要文件还在 /trash
目录中,该文件就可以被迅速地恢复。文件在 /trash
中保存的时间是可配置的,当超过这个时间时,NameNode 就会将该文件从名字空间中删除。删除文件会使得该文件相关的数据块被释放。注意,从用户删除文件到HDFS空闲空间的增加之间会有一定时间的延迟。
只要被删除的文件还在 /trash
目录中,用户就可以恢复这个文件。/trash
目录仅仅保存被删除文件的最后复本。在 /trash
目录上HDFS会应用一个特殊策略来自动删除文件,目前的默认策略是删除 /trash
中保留时间超过6小时的文件。
分块存储 HDFS 中的文件在物理上是分块(block)存储的,在 hadoop 1.x
版本中默认大小是 64M,在 hadoop 2.x
版本中默认大小是 128M。为了容错,文件的所有 block 都会有复本。block 的大小和默认复本系数可以通过hdfs-site.xml
配置参数来指定:
/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml 1 2 3 4 5 6 7 8 9 <property > <name > dfs.replication</name > <value > 3</value > </property > <property > <name > dfs.blocksize</name > <value > 134217728</value > </property >
抽象成数据块后,一个文件的大小,可以大于集群中任意一个硬盘的大小 。一个文件的块并不需要全部存储在同一个硬盘上,可以分布在集群中任意一个硬盘上。
使用抽象块而非整个文件作为存储单元,大大简化了系统的设计 。简化设计,对于故障种类繁多的分布式系统来说尤为重要。以块为单位,一方面简化存储管理,因为块大小是固定的,所以一个硬盘放多少个块是非常容易计算的;另一方面,也消除了元数据的顾虑,因为 block 仅仅是存储的一块数据,其文件的元数据(例如权限等)就不需要跟数据块一起存储,可以交由其他系统来处理。
块非常适合用于数据备份进而提供数据容错能力和可用性 。
块缓存 通常 DataNode 从磁盘 中读取块,但对于访问频繁的文件,其对应的块可能被显式地缓存在 DataNode 的内存 中,以堆外块缓存的形式存在。默认情况下,一个块仅缓存在一个 DataNode 的内存中,当然可以针对每个文件配置 DataNode 的数量。作业调度器(用于MapReduce、Spark等框架的)通过在缓存块的 DataNode 上运行任务,可以利用块缓存的优势提高读操作的性能。
例如,连接(join)操作中使用的一个小的查询表就是块缓存的一个很好的候选。通过在缓存池(cache pool)中增加一个cache directive
来告诉 NameNode 需要缓存哪些文件及存多久。缓存池是一个拥有管理缓存权限和资源使用的管理性分组。
文件权限 HDFS 的文件权限机制与 Linux 系统的文件权限机制类似。HDFS 相信你告诉我你是谁,你就是谁。HDFS 文件权限的目的是防止好人做错事,而不是阻止坏人做坏事 。HDFS 可以通过hdfs-site.xml
配置参数来决定是否启用权限控制:
/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml 1 2 3 4 <property > <name > dfs.permissions</name > <value > false</value > </property >
元数据与CheckPoint 在 Hadoop 中,所有的元数据信息都保存在了 FSImage
与 Edits
文件当中,这两个文件就记录了所有的数据的元数据信息,元数据信息的保存路径可以通过 hdfs-site.xml
配置参数来指定:
/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml 1 2 3 4 5 6 7 8 <property > <name > dfs.namenode.name.dir</name > <value > file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/namenodeDatas</value > </property > <property > <name > dfs.namenode.edits.dir</name > <value > file:///export/servers/hadoop-2.6.0-cdh5.14.0/hadoopDatas/dfs/nn/edits</value > </property >
NameNode 启动后,加载元数据存储文件 FSImage
和事务日志文件 Edits
到内存。
DataNode 上报 block 信息到 NameNode,也存储在内存中。
FSImage
:HDFS 文件系统元数据存储在名为 “FSImage” 的文件中。它包含:
整个文件系统命名空间
block 到文件的映射
文件系统属性
Edits
:NameNode 会使用叫作 EditLog 的事务日志持续记录下发生在文件系统元数据上的每一个变化。内存中的元数据在变化前会将操作记录在 Edits
文件中,以防止数据丢失。
首先需要明确:我们使用的最完整的元数据信息存储在 NameNode 的内存中 。由于内存数据的易失性所以需要磁盘文件作备份。
FSImage
文件是作为元数据备份存储在磁盘中,但备份到磁盘需要时间,为了避免备份时新产生的元数据信息丢失,使用 Edits
文件通过预写日志记录元数据的变化。为了避免 Edits
文件大小无限膨胀下去,利用 SecondaryNameNode 或 JournalNode 来将 Edits
文件合并到 FSImage
文件中去。
SecondaryNameNode 是如何辅助管理 FSImage 文件与 Edits 文件的?
FSImage
文件与 Edits
文件的合并过程如下:
SecondaryNameNode 通知 NameNode 切换 Edits 文件,NameNode 会生成一个新的 Edits 文件
SecondaryNameNode 从 NameNode 中获得 FSImage 文件和 Edits 文件(通过 HTTP GET)
SecondaryNameNode 将 FSImage 文件载入内存,然后开始合并 Edits 文件,合并之后成为新的 FSImage 文件
SecondaryNameNode 将新的 FSImage 文件发回给 NameNode
NameNode 用新的 FSImage 文件替换旧的 FSImage 文件
可以通过修改 core-site.xml
文件改变 CheckPoint 触发条件,但一般来说不做修改:
fs.checkpoint.period
:表示多长时间记录一次 FSImage,默认是1小时。
fs.checkpoint.size
:表示一次记录的大小,默认是 64M。
hadoop-2.6.0-cdh5.14.0/etc/hadoop/core-site.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <property > <name > fs.checkpoint.period</name > <value > 3600</value > <description > The number of seconds between two periodic checkpoints. </description > </property > <property > <name > fs.checkpoint.size</name > <value > 67108864</value > <description > The size of the current edit log (in bytes) that triggers a periodic checkpoint even if the fs.checkpoint.period hasn’t expired. </description > </property >
如何配置 SecondaryNameNode 实现与 NameNode 分离?
SecondaryNameNode 在合并 Edits 和 FSImage 时需要消耗的内存和 NameNode 差不多,所以一般把 NameNode 和SecondaryNameNode 放在不同的节点上。
通过创建 masters
文件指定 SecondaryNameNode 节点(可以指定多个,一行一个):
hadoop-2.6.0-cdh5.14.0/etc/hadoop/masters
注:该配置文件的名字容易引起误会,名为masters
实则用来配置 SecondaryNameNode。
修改 hdfs-site.xml
文件:
hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml 1 2 3 4 <property > <name > dfs.namenode.http-address</name > <value > node02:50070</value > </property >
如何查看 FsImage 文件和 Edits 文件中的内容?
Offline Image Viewer Guide Offline Edits Viewer Guide
读写过程 文件读取 HDFS 文件读取过程如下:
文件读取在底层上的过程如下:
Client 通过调用 DistributedFileSystem 对象的 open()
方法来打开 HDFS 文件;
DistributedFileSystem 通过远程过程调用(RPC)来调用 NameNode 以确定文件起始 block 的位置。对于每一个 block,NameNode 返回存有该 block 复本的 DataNode 地址,并根据集群的网络拓扑结构计算出这些 DataNode 与 Client 的距离,然后进行排序。DistributedFileSystem 类返回一个 FSDataInputStream 对象给 Client 用于读取数据,继承自 FSDataInputStream 的 DFSInputStream 对象管理着 NN 和 DN 的 I/O ;
Client 对 DFSInputStream 调用 read()
方法;
DFSInputStream 与存储着文件起始 block 且距离最近的 DataNode 建立连接,将数据从 DataNode 传输给 Client。
读到 block 末端时,DFSInputStream 关闭与该 DataNode 的连接,然后寻找存储着下一个 block 的最佳 DataNode。这一过程对于 Client 来说是透明的,在 Client 看来它一直在读取一个连续的流;
Client 读取完所有的数据,FSDataInputStream 调用 close()
方法关闭输入流。
读取数据时,如果 DFSInputStream 在与 DataNode 通信时出现错误,会尝试从另一个拥有该 block 复本的距离最近的 DataNode 继续读取,同时记录那个故障的 DataNode,保证以后不会反复读取该节点上后续的 block;DFSInputStream 还会通过校验和(checksum)验证数据的完整性,如果发现有损坏的 block,也会尝试从其他 DataNode 读取复本,同时将被损坏的 block 通知给 NameNode。
文件写入
Client 通过 DistributedFileSystem 对象调用 create()
来新建文件;
DistributedFileSystem 对象向 NameNode 发起 RPC 调用在文件系统的命名空间新建一个文件。NameNode 检查目标文件是否已存在以及 Client 是否有权限。如果检查通过,NameNode 为创建新文件记下一条记录,DistributedFileSystem 向 Client 返回一个 FSDataOutputStream 对象,再由 FSDataOutputStream 封装一个 DFSOutPutStream 对象,由该对象负责处理 DataNode 和 NameNode 之间的通信;
DFSOutPutStream 将 Client 写入的数据分成一个个数据包(Packet,默认为 64 KB),并写入内部的数据队列 (data queue);
数据队列由 DFSOutputStream 对象中的 DataStreamer 线程处理,它挑选出适合存储数据复本的一组 DataNode,要求 NameNode 分配新的数据块,这组 DataNode 构成一个管线(pipeline)。假设复本数为 3,则管线有 3 个节点,DataStreamer 将数据包流式传输到管线中第 1 个 DataNode,该 DataNode 将数据包发送给管线中的第 2 个 DataNode,第 2 个 DataNode 再发送给第 3 个DataNode;
DFSOutPutStream 还维护着一个确认队列 (ack queue),数据包被发送到管线中的第一个 DataNode 后,会被移动到确认队列。DataNode 收到数据包后会返回一个确认回执,当 DataStreamer 收到管线中所有 DataNode 的确认信息后,才会将该数据包从确认队列中删除;
Client 完成数据的写入后,对 FSDataOutputStream 调用 close()
方法。该操作会将剩余的所有数据包写入 DataNode 管线,收到确认回执后向 NameNode 告知文件写入完成。
如果 DataNode 在数据写入期间发生故障,会先关闭管线,将确认队列中所有数据包添加回数据队列的最前端,以确保故障节点下游的 DataNode 不会漏掉数据包;为正常存储在另一 DataNode 的当前数据块指定一个新的标识,并将该标识传送给 NameNode,以便故障 DataNode 在恢复后可以删除存储的部分数据块。从管线中删除故障的 DataNode,基于 2 个正常 DataNode 构建一条新管线用于写入剩下的 block。如果复本数不足,NameNode 会在另一个 DataNode 上创建新的复本。
NameNode 如何选择在哪个 DateNode 存储复本?
Hadoop 在设计时考虑到数据存储的安全与高效,数据文件默认在 HDFS 上存放3份:1 份存放在 Client 节点上,2 份存放在另外某一个机架中的两个节点上。这样不仅能提供很好的稳定性(block 存储在两个机架中)并实现很好的负载均衡,包括写入带宽(写入操作只需经过一个交换机)、读取性能(可以从两个机架中选择读取)和集群中 block 的均匀分布(客户端只在本地机架写入一个 block)。
HDFS基准测试 测试写入速度 测试 HDFS 的文件写入性能:
1 2 3 4 5 yarn jar /export /servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0.jar TestDFSIO -write -nrFiles 10 -fileSize 10MB yarn dfs -text /benchmarks/TestDFSIO/io_write/part-00000
测试读取速度 测试 HDFS 的文件读取性能:
1 2 3 4 5 yarn jar /export /servers/hadoop-2.6.0-cdh5.14.0/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.6.0-cdh5.14.0.jar TestDFSIO -read -nrFiles 10 -fileSize 10MB yarn dfs -text /benchmarks/TestDFSIO/io_read/part-00000
清除测试数据 1 yarn jar /export/servers/hadoop-2 .6 .0 -cdh5 .14 .0 /share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2 .6 .0 -cdh5 .14 .0 .jar TestDFSIO -clean
命令行API 查看文件系统状况 运行 DFS 管理客户端,查看状况报告:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 $ hdfs dfsadmin -report Configured Capacity: 103422541824 (96.32 GB) Present Capacity: 78714851328 (73.31 GB) DFS Remaining: 74144059392 (69.05 GB) DFS Used: 4570791936 (4.26 GB) DFS Used%: 5.81% Replicated Blocks: Under replicated blocks: 0 Blocks with corrupt replicas: 0 Missing blocks: 0 Missing blocks (with replication factor 1): 0 Low redundancy blocks with highest priority to recover: 0 Pending deletion blocks: 0 Erasure Coded Block Groups: Low redundancy block groups: 0 Block groups with corrupt internal blocks: 0 Missing block groups: 0 Low redundancy blocks with highest priority to recover: 0 Pending deletion blocks: 0 ------------------------------------------------- Live datanodes (3): Name: 192.168.153.200:9866 (devcdh1.cdh.com) Hostname: devcdh1.cdh.com Rack: /default Decommission Status : Normal Configured Capacity: 34474180608 (32.11 GB) DFS Used: 1523503104 (1.42 GB) Non DFS Used: 15918665728 (14.83 GB) DFS Remaining: 17032011776 (15.86 GB) DFS Used%: 4.42% DFS Remaining%: 49.41% Configured Cache Capacity: 0 (0 B) Cache Used: 0 (0 B) Cache Remaining: 0 (0 B) Cache Used%: 100.00% Cache Remaining%: 0.00% Xceivers: 2 Last contact: Tue Jul 14 14:13:34 CST 2020 Last Block Report: Tue Jul 14 14:02:28 CST 2020 Name: 192.168.153.201:9866 (devcdh2.cdh.com) Hostname: devcdh2.cdh.com Rack: /default Decommission Status : Normal Configured Capacity: 34474180608 (32.11 GB) DFS Used: 1523789824 (1.42 GB) Non DFS Used: 4306612224 (4.01 GB) DFS Remaining: 28643778560 (26.68 GB) DFS Used%: 4.42% DFS Remaining%: 83.09% Configured Cache Capacity: 432013312 (412 MB) Cache Used: 0 (0 B) Cache Remaining: 432013312 (412 MB) Cache Used%: 0.00% Cache Remaining%: 100.00% Xceivers: 2 Last contact: Tue Jul 14 14:13:33 CST 2020 Last Block Report: Tue Jul 14 14:02:27 CST 2020 Name: 192.168.153.202:9866 (devcdh3.cdh.com) Hostname: devcdh3.cdh.com Rack: /default Decommission Status : Normal Configured Capacity: 34474180608 (32.11 GB) DFS Used: 1523499008 (1.42 GB) Non DFS Used: 4482412544 (4.17 GB) DFS Remaining: 28468269056 (26.51 GB) DFS Used%: 4.42% DFS Remaining%: 82.58% Configured Cache Capacity: 432013312 (412 MB) Cache Used: 0 (0 B) Cache Remaining: 432013312 (412 MB) Cache Used%: 0.00% Cache Remaining%: 100.00% Xceivers: 2 Last contact: Tue Jul 14 14:13:36 CST 2020 Last Block Report: Tue Jul 14 14:02:27 CST 2020
进行 DFS 文件系统检查:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 $ hdfs fsck / Connecting to namenode via http://devcdh1.cdh.com:9870/fsck?ugi=root&path=%2F FSCK started by root (auth:SIMPLE) from /192.168.153.200 for path / at Tue Jul 14 14:17:22 CST 2020 Status: HEALTHY Number of data-nodes: 3 Number of racks: 1 Total dirs : 953 Total symlinks: 0 Replicated Blocks: Total size: 1511151131 B Total files: 154 Total blocks (validated): 155 (avg. block size 9749362 B) Minimally replicated blocks: 155 (100.0 %) Over-replicated blocks: 0 (0.0 %) Under-replicated blocks: 0 (0.0 %) Mis-replicated blocks: 0 (0.0 %) Default replication factor: 3 Average block replication: 2.9741936 Missing blocks: 0 Corrupt blocks: 0 Missing replicas: 0 (0.0 %) Erasure Coded Block Groups: Total size: 0 B Total files: 0 Total block groups (validated): 0 Minimally erasure-coded block groups: 0 Over-erasure-coded block groups: 0 Under-erasure-coded block groups: 0 Unsatisfactory placement block groups: 0 Average block group size: 0.0 Missing block groups: 0 Corrupt block groups: 0 Missing internal blocks: 0 FSCK ended at Tue Jul 14 14:17:22 CST 2020 in 21 milliseconds The filesystem under path '/' is HEALTHY
查看目录和文件大小:
1 2 3 4 5 $ hdfs dfs -du -h / 562.4 M 1.6 G /hdfsdata 54 162 /hivetable 7.2 M 21.5 M /tmp 871.6 M 2.6 G /user
注意:前后两个文件大小分别是指逻辑空间和物理空间,物理空间 = 逻辑空间 * 复本数。
查看目录 ls
命令格式及示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 hdfs dfs -ls [-R] <args> -R选项递归返回目录列表 args参数指定路径 permissions userid groupid modification_date modification_time dirname $ hdfs dfs -ls / Found 12 items drwxr-xr-x - root supergroup 0 2018-07-31 21:02 /export drwxr-xr-x - root supergroup 0 2018-07-18 21:37 /flink drwxr-xr-x - SANNAHA supergroup 0 2018-07-19 17:32 /flink-checkpoint drwxr-xr-x - SANNAHA supergroup 0 2018-07-19 20:09 /flink-checkpoint-0719 drwxr-xr-x - root supergroup 0 2018-07-18 20:26 /hbase permissions number_of_replicas userid groupid filesize modification_date modification_time filename $ hdfs dfs -ls /export /servers/exporthive -rwxr-xr-x 3 root supergroup 11 2018-07-31 21:03 /export /servers/exporthive/000000_0
创建目录 mkdir
命令格式及示例:
1 2 3 4 5 6 hdfs dfs -mkdir [-p] <paths> -p选项与Linux相似,沿路径创建父目录 paths参数将路径uri作为参数并创建目录 $ hdfs dfs -mkdir -p /user/hadoop/dir1 $ hdfs dfs -mkdir hdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir
测试目录文件是否存在 test
命令格式及示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 hdfs dfs -test -[defsz] <path> 选项: -d: if the path is a directory, return 0. -e: if the path exists, return 0. -f: if the path is a file, return 0. -s: if the path is not empty, return 0. -z: if the file is zero length, return 0. $ hdfs dfs -test -d /user/hive/warehouse/sannaha.db/tb_test $ echo $? 0
文件系统间复制文件 将一个或多个文件从本地上传到 HDFS:
1 2 3 4 5 6 7 hdfs dfs -put <localsrc>... <dst> $ hdfs dfs -put localfile /user/hadoop/hadoopfile $ hdfs dfs -put localfile1 localfile2 /user/hadoop/hadoopdir $ hdfs dfs -put localfile hdfs://nn.example.com/hadoop/hadoopfile
将一个或多个文件从 HDFS 下载到本地:
1 2 3 4 5 6 7 hdfs dfs -get <dst>... <localsrc> $ hdfs dfs -get /user/hadoop/hadoopfile localfile $ hdfs dfs -get /user/hadoop/localfile1 /user/hadoop/localfile2 ./ $ hdfs dfs -get hdfs://nn.example.com/hadoop/hadoopfile localfile
将单个或多个文件从本地移动到 HDFS,移动后本地文件会被删除:
1 2 $ hdfs dfs -moveFromLocal <localsrc>... <dst>
将多个本地小文件合并成一个大文件上传到 HDFS:
1 $ hdfs dfs -appendToFile localfile1 localfile2 /mergefile
将多个 HDFS 文件合并成一个大文件下载到本地:
1 2 $ hdfs dfs -getmerge /config/*.xml ./hello.xml $ hdfs dfs -getmerge /config ./hello.xml
文件系统内复制和移动 mv
命令格式及示例:
1 2 3 4 5 hdfs dfs -mv URI [URI ...] <dest> $ hdfs dfs -mv /user/hadoop/file1 /user/hadoop/file2 $ hdfs dfs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2 hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1
cp
命令格式及示例:
1 2 3 4 5 6 7 8 9 10 hdfs dfs -cp [-f] [-p | -p[topax]] URI [URI ...] <dest> -f 选项在目标已存在的情况下将覆盖目标 -p选项将保留文件属性[topx](时间戳timestamps,所有权ownership,权限permission,访问控制列表ACL,文件系统的扩展属性XAttr) 如果指定了-p且没有参数,则保留时间戳,所有权和权限 如果指定了-pa,则还保留权限,因为ACL是一组超级权限 确定是否保留原始命名空间扩展属性与-p标志无关 $ hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 $ hdfs dfs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir
查看内容 cat
命令格式及示例:
1 2 3 4 hdfs dfs -cat URI [URI ...] $ hdfs dfs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2 $ hdfs dfs -cat file:///file3 /user/hadoop/file4
追加内容 appendToFile
命令格式及示例:
1 2 3 4 5 6 hdfs dfs -appendToFile <localsrc> ... <dst> $ hdfs dfs -appendToFile localfile /user/hadoop/hadoopfile $ hdfs dfs -appendToFile localfile1 localfile2 /user/hadoop/hadoopfile $ hdfs dfs -appendToFile localfile hdfs://nn.example.com/hadoop/hadoopfile
删除 rm
命令格式及示例:
1 2 3 4 5 6 7 8 hdfs dfs -rm [-f] [-r|-R] [-skipTrash] URI [URI ...] -f选项在目标文件不存在的情况下不显示错误信息 -R选项以递归方式删除目录及其下的所有内容 -r选项等效于-R -skipTrash选项将立即删除指定的文件,并绕过垃圾桶(如果已启用) $ hdfs dfs -rm hdfs://nn.example.com/file /user/hadoop/emptydir
修改权限 chmod
命令格式及示例:
1 2 3 4 hdfs dfs -chmod [-R] <MODE[,MODE]... | OCTALMODE> URI [URI ...] -R选项将以递归方式进行更改。 $ hdfs dfs -chmod 600 /config/core-site.xml
修改所有者 chown
命令格式:
1 2 hdfs dfs -chown [-R] [OWNER][:[GROUP]] URI [URI ] -R选项将以递归方式进行更改。
清空回收站 expunge
命令格式:
分布式拷贝 DistCp(分布式拷贝)是用于大规模集群内部和集群之间拷贝的工具。 它使用 MapReduce 实现文件分发、错误处理、恢复以及报告生成。DistCp 启动的 MapReduce 作业没有 reduce 任务,只通过 map 任务完成文件或目录的复制。
命令格式及示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 hadoop distcp OPTIONS [source_path...] <target_path> 常用选项: -append:追加,如果可能的话将复用目标路径中现有的数据,并追加新数据 -update:更新,仅复制缺少的文件或目录 -overwrite:覆盖,无条件覆盖目标文件 -delete :删除,从目标路径中删除源路径中不存在的文件 -i:复制时忽略失败 -f <arg>:使用存储源路径的列表文件 -m <arg>:同时复制的最大 map 数量 -p <arg>:复制时保留状态信息,如果 -p 后不指定参数,则保留复本、块大小、用户、组、权限、校验和以及时间戳 参数可选 rbugpcaxt (replication, block-size, user, group, permission, checksum-type, ACL, XATTR, timestamps) -bandwidth <arg>:指定每个 map 的带宽,单位为 MB hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/foo/bar hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/foo/bar hdfs://nn3:8020/foo/bar hadoop distcp hftp://nn1:50070/foo/bar hdfs://nn2/foo/bar $ hadoop distcp -Dmapreduce.job.name=syncHiveData -bandwidth 1000 -m 10 \ hdfs://Cluster1/user/hive/warehouse/sannaha.db/tb_test \ hdfs://Cluster2/user/hive/warehouse/sannaha.db/tb_test
源路径可以有多个,目标路径只能有一个;如果两个集群的 hadoop 版本相同,分布式拷贝时可以使用 hdfs 协议,否则需要使用 hftp 协议:
Java API 依赖包 由于 CDH 版本的软件涉及版权的问题,没有将所有的 jar 包托管到 Maven 仓库当中去,而是托管在了 Cloudera 自己的服务器上面,所以需要手动指定仓库地址。
使用 CDH5 Maven 仓库 适用于CDH 5.14.x版本的 Maven Artifacts 使用 CDH6 Maven 仓库 适用于CDH 6.1.x版本的 Maven Artifacts
pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 <repositories > <repository > <id > cloudera</id > <url > https://repository.cloudera.com/artifactory/cloudera-repos/</url > </repository > </repositories > <dependencies > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-client</artifactId > <version > 2.6.0-mr1-cdh5.14.0</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-common</artifactId > <version > 2.6.0-cdh5.14.0</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-hdfs</artifactId > <version > 2.6.0-cdh5.14.0</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-mapreduce-client-core</artifactId > <version > 2.6.0-cdh5.14.0</version > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > 4.11</version > <scope > test</scope > </dependency > <dependency > <groupId > org.testng</groupId > <artifactId > testng</artifactId > <version > RELEASE</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.0</version > <configuration > <source > 1.8</source > <target > 1.8</target > <encoding > UTF-8</encoding > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-shade-plugin</artifactId > <version > 2.4.3</version > <executions > <execution > <phase > package</phase > <goals > <goal > shade</goal > </goals > <configuration > <minimizeJar > true</minimizeJar > </configuration > </execution > </executions > </plugin > </plugins > </build >
HadoopURL方式获取数据 从 Hadoop 读取文件,最简单的方法是使用 java.net.URL
对象打开数据流,从中读取数据。
为了让 Java 程序识别 Hadoop 的 HDFS URL 形式,需要通过 FsUrlStreamHandlerFactory
实例调用 java.net.URL
对象的setURLStreamHandlerFactory()
方法。每个 Java 虚拟机只能调用一次 这个方法,因此通常在静态方法中调用。如果程序的其他组件已经声明了一个 FsUrlStreamHandlerFactory
实例,就无法再使用这种方法从 HDFS 读取数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package moe.sannaha.hdfsapitest;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.InputStream;import java.net.URL;public class GetFileByURL { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main (String[] args) { InputStream inputStream = null ; FileOutputStream outputStream = null ; String url = "hdfs://node01:8020/test/input/test.txt" ; try { inputStream = new URL(url).openStream(); outputStream = new FileOutputStream(new File("e:/test.txt" )); IOUtils.copyBytes(inputStream,System.out,4096 ,false ); IOUtils.copyBytes(inputStream,outputStream,4096 ,false ); } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeStream(inputStream); IOUtils.closeStream(outputStream); } } }
FileSystem方式获取数据 通过 URL 方式获取 HDFS 文件流虽然简单,但存在局限。因此需要 FileSystem API 打开文件的数据流。
获取FileSystem 在 Java 中获取 FileSystem,主要涉及以下几个类:
Configuration:该类的对象封装了客户端或服务器的配置
FileSystem:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作
FileSystem.get(Configuration conf):返回配置文件core-site.xml
中指定的文件系统。如果配置文件中没有指定,此时使用的不再是分布式文件系统,而是本地文件系统(LocalFileSystem)
FileSystem.get(URI uri, Configuration conf):通过给定的 URI 方案和权限来确定要使用的文件系统,如没有指定则返回默认文件系统
FileSystem.get(URI uri, Configuration conf, String user):作为给定用户来访问文件系统。如果 HDFS 启用了权限控制,即 hdfs-site.xml
中 dfs.permissions
的参数值为 true
,则可能需要“伪造用户”来实现文件读写
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class GetFileSystem { @Test public void getFileSystem1 () throws IOException { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS" , "hdfs://node01:8020" ); FileSystem fileSystem = FileSystem.get(configuration); System.out.println(fileSystem.toString()); } @Test public void getFileSystem1 () throws IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020" ), configuration, "JavaProgram" ); System.out.println(fileSystem.toString()); } @Test public void getFileSystem3 () throws IOException, InterruptedException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020" ), configuration, "JavaProgram" ); System.out.println(fileSystem.toString()); } }
获取输入流 有了 FileSystem 实例后,可以调用 open()
方法获取文件的输入流,用于从 HDFS 获取数据 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class FileSystemTest { public static void main (String[] args) throws IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020" ), configuration); InputStream inputStream = null ; try { inputStream = fileSystem.open(new Path("hdfs://node01:8020/test/input/test.txt" )); IOUtils.copyBytes(inputStream, System.out, 4096 , false ); } finally { IOUtils.closeStream(inputStream); } } }
open()
方法返回的是 FSDataInputStream
对象(而非标准的java.io
类对象),这个类是继承了 java.io.DataInputStream
的一个特殊类,实现了Seekable
接口,提供了一个设置偏移量的方法seek()
,以及一个查询当前位置相对于文件起始位置偏移量的查询方法getPos()
,支持随机访问,因此可以从流的任意位置开始读取数据 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class FileSystemTest { public static void main (String[] args) throws IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020" ), configuration); InputStream inputStream = null ; FSDataInputStream inputStream = null ; try { inputStream = fileSystem.open(new Path("hdfs://node01:8020/test/input/test.txt" )); IOUtils.copyBytes(inputStream, System.out, 4096 , false ); inputStream.seek(2 ); long pos = inputStream.getPos(); System.out.println(pos); IOUtils.copyBytes(inputStream, System.out, 4096 , false ); } finally { IOUtils.closeStream(inputStream); } } }
FSDataInputStream
类也实现了 PositionedReadable
接口,提供了read()
方法,从指定偏移量处读取文件的一部分 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class FileSystemTest { public static void main (String[] args) throws IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020" ), configuration); InputStream inputStream = null ; FSDataInputStream inputStream = null ; byte [] bytes = new byte [10 ]; try { inputStream = fileSystem.open(new Path("hdfs://node01:8020/test/input/test.txt" )); IOUtils.copyBytes(inputStream, System.out, 4096 , false ); inputStream.read(2 , bytes, 1 , 3 ); String string = new String(bytes); System.out.printf(string); } finally { IOUtils.closeStream(inputStream); } } }
获取输出流 FileSystem 类有一系列新建文件的方法,最简单的方法是使用create()
方法给准备建的文件指定一个 Path 对象,然后返回一个用于写入数据的输出流,用于往 HDFS 写入数据 。如果路径在 HDFS 中不存在,create()
方法能够自动创建父目录。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class CopyLocalfileToHDFS { public static void main (String[] args) throws IOException { String localPath = "e:/workspace/test.txt" ; String dstPath = "hdfs://node01:8020/test/output/test.txt" ; BufferedInputStream inputStream = new BufferedInputStream(new FileInputStream(localPath)); Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(URI.create(dstPath), configuration); FSDataOutputStream outputStream = fileSystem.create(new Path(dstPath), new Progressable() { @Override public void progress () { System.out.print("." ); } }); IOUtils.copyBytes(inputStream, outputStream, 4096 , true ); } }
create()
方法返回的是FSDataOutputStream
对象,与FSDataInputStream
对象相似,也有用于查询当前位置相对于文件起始位置偏移量的方法getPos()
,但由于 HDFS 只允许对在文件末尾追加数据 ,因此写入时定位无意义。
创建目录 FileSystem 提供了创建目录的方法 mkdirs()
,和 java.io.File
类的 mkdirs()
方法一样,可以自动创建父目录。
1 2 3 4 5 6 @Test public void mkdirs () throws URISyntaxException, IOException { FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020" ), new Configuration()); fileSystem.mkdirs(new Path("/test/createdir" )); fileSystem.close(); }
遍历所有文件 使用官方提供的 RemoteIterator
迭代器获取所有的文件或文件夹:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package moe.sannaha.hdfsapitest;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.junit.Test;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;public class GetFileList { @Test public void listMyFiles () throws Exception { FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020" ), new Configuration()); RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/" ), true ); while (locatedFileStatusRemoteIterator.hasNext()) { LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); System.out.println(next.getPath().toString()); } fileSystem.close(); } }
也可以通过 FileSystem 提供的listStatus()
方法,自己实现递归遍历获取所有文件或文件夹:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class GetFileList2 { @Test public void listFile () throws Exception { FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020" ), new Configuration()); FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/" )); for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isDirectory()) { Path path = fileStatus.getPath(); listAllFiles(fileSystem, path); } else { System.out.println("文件路径为" + fileStatus.getPath().toString()); } } } public void listAllFiles (FileSystem fileSystem, Path path) throws Exception { FileStatus[] fileStatuses = fileSystem.listStatus(path); for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isDirectory()) { listAllFiles(fileSystem, fileStatus.getPath()); } else { Path path1 = fileStatus.getPath(); System.out.println("文件路径为" + path1); } } } }
文件下载 1 2 3 4 5 6 @Test public void copyToLocal () throws IOException { FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020" ), new Configuration()); fileSystem.copyToLocalFile(new Path("hdfs://node01:8020/test/output/test.txt" ),new Path("file:///e:/workspace/test.txt" )); fileSystem.close(); }
文件上传 1 2 3 4 5 6 @Test public void copyFromLocal () throws IOException { FileSystem fileSystem = FileSystem.get(URI.create("hdfs://node01:8020" ), new Configuration()); fileSystem.copyFromLocalFile(new Path("file:///e:/workspace/test.txt" ),new Path("hdfs://node01:8020/test/input/test.txt" )); fileSystem.close(); }
小文件合并上传 Hadoop 存储的每一个文件都对应一条元数据,如果 Hadoop 中有大量小文件,会大大增加集群管理元数据的内存压力。如果可以建议在上传时将小文件合并成大文件,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Test public void mergeFile () throws Exception { FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020" ), new Configuration()); FSDataOutputStream outputStream = fileSystem.create(new Path("/hello/mydir/test/bigfile.log" )); LocalFileSystem local = FileSystem.getLocal(new Configuration()); FileStatus[] fileStatuses = local.listStatus(new Path("file:///E:\\logs" )); for (FileStatus fileStatus : fileStatuses) { FSDataInputStream inputStream = local.open(fileStatus.getPath()); IOUtils.copyBytes(inputStream, outputStream, 4096 , false ); IOUtils.closeStream(inputStream); } IOUtils.closeStream(outputStream); local.close(); fileSystem.close(); }
winutils ERROR问题 在 IDEA 运行 MapReduce 程序时,产生 winutils ERROR 的原因是 Windows 下缺少的 Hadoop 环境。编译 Windows 版本的 Hadoop 程序后,配置系统环境变量如下(注意路径中不能存在空格,Program Files
可用PROGRA~1
代替):
1 2 3 4 5 6 - JAVA_HOME=C:\Program Files\Java\jdk1.8.0_221 + JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_221 HADOOP_HOME=D:\hadoop-2.6.0-cdh5.14.0 Path=%JAVA_HOME%\bin Path=%HADOOP_HOME%\bin Path=%HADOOP_HOME%\sbin
参考资料
《Hadoop权威指南 第四版》 Hadoop分布式文件系统:架构和设计 HDFS - Checkpoint hadoop2.X如何将namenode与SecondaryNameNode分开配置