/** * 递归遍历hdfs中所有的文件路径 */ @Test public void getAllHdfsFilePath() throws URISyntaxException, IOException { //获取fs的客户端 FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration()); Path path = new Path("/"); FileStatus[] fileStatuses = fileSystem.listStatus(path); //循环遍历fileStatuses,如果是文件,打印文件的路径,如果是文件夹,继续递归进去 for (FileStatus fileStatus : fileStatuses){ if (fileStatus.isDirectory()){//文件夹 getDirectoryFiles(fileSystem,fileStatus); }else{ //文件 System.out.println(fileStatus.getPath().toString()); } } //方法二: System.out.println("方法二:利用官方提供API"); RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true); while (locatedFileStatusRemoteIterator.hasNext()){ LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); System.out.println(next.getPath()); } //关闭fs的客户端 fileSystem.close(); } /** * 递归获取文件路径 */ public void getDirectoryFiles(FileSystem fileSystem,FileStatus fileStatus) throws IOException { //通过fileStatus获取文件夹路径 Path path = fileStatus.getPath(); //该fileStatus必定为一个文件夹 FileStatus[] fileStatuses = fileSystem.listStatus(path); for (FileStatus status:fileStatuses){ if (fileStatus.isDirectory()){ getDirectoryFiles(fileSystem,status); }else{ System.out.println(fileStatus.getPath().toString()); } } } /** * 下载hdfs文件到本地 */ @Test public void copyHdfsToLocal() throws Exception { FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration()); FSDataInputStream inputStream = fileSystem.open(new Path("hdfs://node01:8020/aa/haha2.txt")); FileOutputStream outputStream = new FileOutputStream(new File("d:\\install-log.txt")); IOUtils.copy(inputStream,outputStream); IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(outputStream); //方法二:利用官方API //有报错:java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor; fileSystem.copyToLocalFile(new Path("hdfs://node01:8020/aa/haha2.txt"),new Path("file:///d:\\install-log2.txt")); fileSystem.close(); } /** * hdfs上面创建文件夹 */ @Test public void createHdfsDir() throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration()); fileSystem.mkdirs(new Path("/aa/bb/cc/")); fileSystem.close(); } /** * hdfs的文件上传 */ @Test public void uploadFileToHdfs() throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration()); //注:new Path()中的字符串参数如果省略file:///或hdfs://的话,默认会在参数前添加hdfs://node01:8020,即,默认是hdfs路径 fileSystem.copyFromLocalFile(false,new Path("file:///d:\\output.txt"),new Path("/aa/bb/cc")); //第二种方法:通过流的方式 //输出流,负责将数据输出到hdfs的路径上面去 FSDataOutputStream outputStream = fileSystem.create(new Path("/aa/bb/cc/empSel.hdfs")); //通过输入流读取本地文件系统的文件 InputStream inputStream = new FileInputStream(new File("d:\\empSel.txt")); IOUtils.copy(inputStream,outputStream); IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(outputStream); fileSystem.close(); } /** * hdfs的权限校验机制 */ @Test public void hdfsPermission() throws Exception{ /* 在所有节点的hdfs-site.xml中设置开启权限验证: <property> <name>dfs.permissions</name> <value>true</value> </property> 普通的filesystem,执行时会报错:org.apache.hadoop.security.AccessControlException: Permission denied: user=Administrator, access=READ, inode="/config/core-site.xml":root:supergroup:-rw------- FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration()); */ //通过伪造用户来获取分布式文件系统的客户端 FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration(), "root"); //从hdfs上下载文件到本地 FSDataInputStream inputStream = fileSystem.open(new Path("/config/core-site.xml")); FileOutputStream outputStream = new FileOutputStream(new File("d:\\core-site.txt")); IOUtils.copy(inputStream,outputStream); IOUtils.closeQuietly(inputStream); IOUtils.closeQuietly(outputStream);// fileSystem.copyFromLocalFile(new Path("file:///d:\\transferIndex.txt"),new Path("/aa/bb/cc/"));// fileSystem.delete(new Path("/aa/bb/cc/"),false); fileSystem.close(); } /** * hdfs在上传小文件的时候进行合并 * 在我们的hdfs 的shell命令模式下,可以通过命令行将很多的hdfs文件合并成一个大文件下载到本地: * hdfs dfs -getmerge /config/*.xml ./hello.xml * 上传时也能将小文件合并到一个大文件里面去。 */ @Test public void mergeFile()throws Exception{ //获取分布式文件系统 FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.8.100:8020"), new Configuration(),"root"); FSDataOutputStream outputStream = fileSystem.create(new Path("/bigFile.xml")); //获取本地所有小文件的输入流 //首先获取本地文件系统 LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration()); FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("file:///D:\\上传小文件合并")); for (FileStatus fileStatus:fileStatuses){ Path path = fileStatus.getPath(); FSDataInputStream fsDataInputStream = localFileSystem.open(path); IOUtils.copy(fsDataInputStream,outputStream); IOUtils.closeQuietly(fsDataInputStream); } IOUtils.closeQuietly(outputStream); fileSystem.close(); localFileSystem.close(); }
转载于:https://www.cnblogs.com/mediocreWorld/p/10952959.html
相关资源:java操作Hadoop源码之HDFS Java API操作-创建目录