本文共 8396 字,大约阅读时间需要 27 分钟。
Plan:
分布式文件系统与HDFSHDFS体系结构与基本概念HDFS的shell操作java接口及常用apiHADOOP的RPC机制HDFS源码分析远程debug
自己设计一分布式文件系统?
Distributed File System
URI格式是scheme://authority/path。HDFS的scheme是hdfs,对本地文件系统,scheme是file。其中scheme和authority参数都是可选的,如果未加指定,就会使用配置中指定的默认scheme。
例如:/parent/child可以表示成hdfs://namenode:namenodePort/parent/child,或者更简单的/parent/child(假设配置文件是namenode:namenodePort)
#hadoop fs -mkdir /test1 在根目录创建一个目录test1
#hadoop fs -get /test/test.txt .
或#hadoop fs -getToLocal /test/test.txt .
dfs.block.size
hdfs-site.xml的dfs.replication属性
HDFS的java访问接口——FileSystem
HDFS的FileSystem读取文件
private static FileSystem getFileSystem() throws URISyntaxException, IOException { Configuration conf = new Configuration(); URI uri = new URI("hdfs://hadoop240:9000"); final FileSystem fileSystem = FileSystem.get(uri , conf); return fileSystem; } /** * 读取文件,调用fileSystem的open(path) * @throws Exception */ private static void readFile() throws Exception { FileSystem fileSystem = getFileSystem(); FSDataInputStream openStream = fileSystem.open(new Path("hdfs://itcast0106:9000/aaa")); IOUtils.copyBytes(openStream, System.out, 1024, false); IOUtils.closeStream(openStream); }
HDFS的FileSystem目录
/** * 创建目录,调用fileSystem的mkdirs(path) * @throws Exception */ private static void mkdir() throws Exception { FileSystem fileSystem = getFileSystem(); fileSystem.mkdirs(new Path("hdfs://itcast0106:9000/bbb")); } /** * 删除目录,调用fileSystem的deleteOnExit(path) * @throws Exception */ private static void rmdir() throws Exception { FileSystem fileSystem = getFileSystem(); fileSystem.delete(new Path("hdfs://itcast0106:9000/bbb")); }
HDFS的FileSystem遍历目录
/** * 遍历目录,使用FileSystem的listStatus(path) * 如果要查看file状态,使用FileStatus对象 * @throws Exception */ private static void list() throws Exception{ FileSystem fileSystem = getFileSystem(); FileStatus[] listStatus = fileSystem.listStatus(new Path("hdfs://itcast0106:9000/")); for (FileStatus fileStatus : listStatus) { String isDir = fileStatus.isDir()?"目录":"文件"; String name = fileStatus.getPath().toString(); System.out.println(isDir+" "+name); } }
FileSystem
Remote Procedure Call
public interface Bizable extends VersionedProtocol{ public abstract String hello(String name);}class Biz implements Bizable{ @Override public String hello(String name){ System.out.println("被调用了"); return "hello "+name; } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { System.out.println("Biz.getProtocalVersion()="+MyServer.VERSION); return MyServer.VERSION; }}public class MyServer { public static int PORT = 3242; public static long VERSION = 23234l; public static void main(String[] args) throws IOException { final Server server = RPC.getServer(new Biz(), "127.0.0.1", PORT, new Configuration()); server.start(); }}public class MyClient { public static void main(String[] args) throws IOException { final InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", MyServer.PORT); final Bizable proxy = (Bizable) RPC.getProxy(Bizable.class, MyServer.VERSION, inetSocketAddress, new Configuration()); final String ret = proxy.hello("吴超"); System.out.println(ret); RPC.stopProxy(proxy); }}