pycurl实现hadoop的客户端功能

目前在测试一个hadoop的功能,需要频繁的和hadoop打交道。刚开始采用的python的subprocess模块来调用底层的hadoop提供的命令行工具实现的。

一,hadoop提供的命令行格式说明:

hadoop fs [cmd]具体的命令有:

 

hadoop fs [-fs <local | file system URI>] [-conf <configuration file>]

[-D <property=value>] [-ls <path>] [-lsr <path>] [-du <path>]

[-dus <path>] [-mv <src> <dst>] [-cp <src> <dst>] [-rm [-skipTrash] <src>]

[-rmr [-skipTrash] <src>] [-put <localsrc> ... <dst>] [-copyFromLocal <localsrc> ... <dst>]

[-moveFromLocal <localsrc> ... <dst>] [-get [-ignoreCrc] [-crc] <src> <localdst>

[-getmerge <src> <localdst> [addnl]] [-cat <src>]

[-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>] [-moveToLocal <src> <localdst>]

[-mkdir <path>] [-report] [-setrep [-R] [-w] <rep> <path/file>]

[-touchz <path>] [-test -[ezd] <path>] [-stat [format] <path>]

[-tail [-f] <path>] [-text <path>]

[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]

[-chown [-R] [OWNER][:[GROUP]] PATH...]

[-chgrp [-R] GROUP PATH...]

[-count[-q] <path>]

[-help [cmd]]

从上面可以看出命令提供的功能还是挺强大的。包括了文件和对目录的各种操作。
举个例子:
要列出hadoop的根目录下面的文件,具体命令如下:
#hadoop fs  -ls hdfs://192.168.0.112:50081/ 
drwx---r-x   - test test            0 2013-03-08 11:20 /static
drwx---r-x   - test test             0 2013-02-19 15:40 /system
drwxrwxrwx   - test         test            0 2013-01-22 18:42 /video
 
其他的命令功能就不一一介绍了,相信看帮组文档自己也可以看懂。
这样会有一个问题,每执行一个命令都会新生成一个jvm,对运行命令的机器造成很大的负担,在命令多的情况下,查看top可以看到java的进程会跑到99%,严重影响到的使用。于是有了下面的实现方法。
二,hadoop提供的web方式
在网上查看官方的客户端API,发现hadoop提供一个web REST API,既采用curl的方式可以轻松实现。官方文档连接为:http://hadoop.apache.org/docs/stable/webhdfs.html
上面对使用方式进行充分的说明。
curl的方式可以进行对hadoop中的文件和目录进行一些基本的操作。
目前官网上提供的有
1,创建并写入文件
2,追加文件
3,打开并读入文件
4,创建目录
5,重命名文件或者目录
6,删除文件或者目录
7,列出文件或者目录状态
8,列出目录列表
下面提供一些具体的使用例子:
a,列出目录的状态
#curl -i http://192.168.0.112:50071/webhdfs/v1/?op=GETFILESTATUS
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
 
{"FileStatus":{"accessTime":0,"blockSize":0,"group":"TEST","length":0,"modificationTime":1362812718704,"owner":"TEST","pathSuffix":"","permission":"705","replication":0,"type":"DIRECTORY"}}

 

b,重命名目录

#curl -i -X PUT http://192.168.0.112:50071/webhdfs/v1/test?op=RENAME&destination=/test1

 

HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
 
 
{"boolean":true}
 
其他的功能就不一一介绍了。具体的实现方式请看官方文档
 
三,由curl的方式想到的
因为我的程序是用python跑的,那么采用curl命令行的方式同样是调用底层命令,python的模块那么多,那么我如果使用python的curl库那不是可以轻松实现python对hadoop中文件和目录的操作。
在经过查资料后,写了一个基本的webhadoop的class,基本的功能大概完成了,其他的东西以后再加吧。
具体的代码如下:
 
 
  1. #!/usr/bin/env python 
  2. # -*- encoding:utf-8 -*- 
  3. """A library to access Hadoop HTTP REST API, 
  4.    make sure you hadoop cluster open the http access . 
  5. """ 
  6. ''' 
  7. author : liran 
  8. data   : 2013-03-11 
  9.  
  10. 致谢:xwu 
  11.      武汉云雅科技有限公司 
  12.      
  13. ''' 
  14. import StringIO 
  15. import pycurl 
  16. import re 
  17. import sys 
  18. import logging 
  19. import os 
  20.  
  21. class WebHadoop(object): 
  22.     def __init__(self,host,port,username,logger,prefix="/webhdfs/v1"): 
  23.         self.host = host 
  24.         self.port = port 
  25.         self.user = username 
  26.         self.logger = logger 
  27.         self.prefix = prefix 
  28.         self.status = None 
  29.         self.url = "http://%s:%s" % (host,port) 
  30.         selfself.url_path = self.url + self.prefix  
  31.  
  32.  
  33.  
  34.     def checklink(self): 
  35.         try: 
  36.             b = StringIO.StringIO() 
  37.             c = pycurl.Curl() 
  38.             checkurl = self.url + "/dfsnodelist.jsp?whatNodes=LIVE
  39.             c.setopt(pycurl.URL, checkurl) 
  40.             c.setopt(pycurl.HTTPHEADER, ["Accept:"]) 
  41.             c.setopt(pycurl.WRITEFUNCTION, b.write) 
  42.             c.setopt(pycurl.FOLLOWLOCATION, 1) 
  43.             c.setopt(pycurl.MAXREDIRS, 5) 
  44.             c.perform() 
  45.             self.status = c.getinfo(c.HTTP_CODE) 
  46.             bbody = b.getvalue() 
  47.             self.Write_Debug_Log(self.status,checkurl) 
  48.             p = re.compile(r'''Live Datanodes :(.*)</a''') 
  49.             results = p.findall(body) 
  50.             b.close() 
  51.             if results[0] == "0": 
  52.                 self.logger.error("Sorry, There are not live datanodes in Hadoop Cluster!!!") 
  53.                 self.curlObj.close() 
  54.                 sys.exit(255) 
  55.             return results[0] 
  56.         except pycurl.error,e: 
  57.             self.logger.error("Sorry, can not get the hadoop http link .Erros: %s" % e) 
  58.             c.close() 
  59.             b.close() 
  60.             sys.exit(255) 
  61.         finally: 
  62.             c.close() 
  63.             b.close() 
  64.              
  65.      
  66.     def lsdir(self,path): 
  67.         try: 
  68.             b = StringIO.StringIO() 
  69.             put_str = '[{"op":LISTSTATUS}]' 
  70.  
  71.             c = pycurl.Curl() 
  72.                  
  73.             lsdir_url = self.url_path + path + "?op=LISTSTATUS
  74.             c.setopt(pycurl.URL, lsdir_url) 
  75.             c.setopt(pycurl.HTTPHEADER, ["Accept:"]) 
  76.             c.setopt(pycurl.WRITEFUNCTION, b.write) 
  77.             c.setopt(pycurl.FOLLOWLOCATION, 1) 
  78.             c.setopt(pycurl.MAXREDIRS, 5) 
  79.             c.perform() 
  80.             bbody = b.getvalue() 
  81.             self.status = c.getinfo(c.HTTP_CODE) 
  82.         except Exception,e: 
  83.             print e 
  84.         finally: 
  85.             c.close() 
  86.             b.close() 
  87.          
  88.          
  89.         if self.status == 200: 
  90.             data_dir = eval(body) 
  91.             return data_dir['FileStatuses']['FileStatus'] 
  92.              
  93.         else: 
  94.             self.logger.error("Sorry,can not list the dir or file status!!!") 
  95.             self.Write_Debug_Log(self.status,lsdir_url) 
  96.             return False 
  97.          
  98.               
  99.     def lsfile(self,path): 
  100.         try: 
  101.             c = pycurl.Curl() 
  102.             b = StringIO.StringIO() 
  103.             put_str = '[{"op":LISTSTATUS}]' 
  104.             lsdir_url = self.url_path + path + "?op=GETFILESTATUS
  105.             c.setopt(pycurl.URL, lsdir_url) 
  106.             c.setopt(pycurl.HTTPHEADER, ["Accept:"]) 
  107.             c.setopt(pycurl.WRITEFUNCTION, b.write) 
  108.             c.setopt(pycurl.FOLLOWLOCATION, 1) 
  109.             c.setopt(pycurl.MAXREDIRS, 5) 
  110.             c.perform() 
  111.             bbody = b.getvalue() 
  112.             self.status = c.getinfo(c.HTTP_CODE) 
  113.         except Exception,e: 
  114.             print e 
  115.         finally: 
  116.             c.close() 
  117.             b.close() 
  118.              
  119.         if self.status == 200: 
  120.             data_dir = eval(body) 
  121.             if data_dir['FileStatus']['type'] == "DIRECTORY": 
  122.                 self.logger.error("Sorry,this file %s is a dir actually!!!" % (path)) 
  123.                 return False 
  124.             else: 
  125.                 return data_dir['FileStatus'] 
  126.         else: 
  127.             self.logger.error("Sorry,can not list the dir or file status!!!") 
  128.             self.Write_Debug_Log(self.status,lsdir_url) 
  129.             return False 
  130.              
  131.     def mkdir(self,path,permission="755"): 
  132.         try: 
  133.             print "yes ,mkdir function" 
  134.             b = StringIO.StringIO() 
  135.             c = pycurl.Curl() 
  136.             mkdir_str = '[{"op":"MKDIRS","permission"=permission}]' 
  137.             mkdir_url = "%s%s?op=MKDIRS&permission=%s" % (self.url_path,path,permission) 
  138.             c.setopt(pycurl.URL, mkdir_url) 
  139.             c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json','Content-Length: '+str(len(mkdir_str))]) 
  140.             c.setopt(pycurl.CUSTOMREQUEST,"PUT") 
  141.             c.setopt(pycurl.POSTFIELDS,mkdir_str) 
  142.            
  143.             c.setopt(pycurl.WRITEFUNCTION, b.write) 
  144.             c.setopt(pycurl.FOLLOWLOCATION, 1) 
  145.             c.setopt(pycurl.MAXREDIRS, 5) 
  146.             c.perform() 
  147.             self.status = c.getinfo(c.HTTP_CODE) 
  148.             bbody = b.getvalue() 
  149.             b.close() 
  150.         except Exception,e: 
  151.             print e 
  152.         finally: 
  153.             c.close() 
  154.              
  155.           
  156.         if self.status == 200 : 
  157.             if "true" in body: 
  158.                 self.logger.info("Great,Successfully Create dir %s in hadoop cluster!!" % (path)) 
  159.                 return True 
  160.             elif "false" in body: 
  161.                 self.logger.info("Sorry,can't create this %s dir in hadoop cluster!!1!!") 
  162.                 return False 
  163.             else: 
  164.                 return False 
  165.         else: 
  166.             self.logger.error("Sorry,can't create this %s dir in hadoop cluster!!1" % (path)) 
  167.             self.Write_Debug_Log(self.status,mkdir_url)  
  168.                      
  169.  
  170.     def remove(self,path,recursive="True"): 
  171.         try: 
  172.             c = pycurl.Curl() 
  173.             b = StringIO.StringIO() 
  174.             remove_str = '[{"op":"DELETE","recursive"=recursive}]' 
  175.             remvoe_url = "%s%s?op=DELETE&recursive=%s" % (self.url_path,path,recursive) 
  176.             c.setopt(pycurl.URL, remvoe_url) 
  177.             c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json','Content-Length: '+str(len(remove_str))]) 
  178.             c.setopt(pycurl.CUSTOMREQUEST,"DELETE") 
  179.             c.setopt(pycurl.POSTFIELDS,remove_str) 
  180.             c.setopt(pycurl.WRITEFUNCTION, b.write) 
  181.             c.setopt(pycurl.FOLLOWLOCATION, 1) 
  182.             c.setopt(pycurl.MAXREDIRS, 5) 
  183.             c.perform() 
  184.             bbody = b.getvalue() 
  185.             print type(body) 
  186.             self.status = c.getinfo(c.HTTP_CODE)  
  187.         except Exception,e: 
  188.             print e 
  189.         finally: 
  190.             c.close() 
  191.             b.close() 
  192.         if self.status == 200 : 
  193.             if "true" in body: 
  194.                 print "yes ,it in" 
  195.                 self.logger.info("Great,Successfully delete dir or file %s in hadoop cluster!!" % (path)) 
  196.                 return True 
  197.             elif "false" in body: 
  198.                 print "no ,it is not" 
  199.                 self.logger.info("Sorry,can't delete dir or file,maybe this dir is not exsited!!") 
  200.                 return False 
  201.             else: 
  202.                 return False 
  203.              
  204.         else: 
  205.             self.logger.error("Sorry,can't create this %s dir in hadoop cluster!!1" % (path)) 
  206.             self.Write_Debug_Log(self.status,remvoe_url) 
  207.              
  208.     def rename(self,src,dst): 
  209.         try: 
  210.             c = pycurl.Curl() 
  211.             b = StringIO.StringIO() 
  212.             rename_str = '[{"op":"RENAME"}]' 
  213.             rename_url = "%s%s?op=RENAME&destination=%s" % (self.url_path,src,dst) 
  214.             c.setopt(pycurl.URL, rename_url) 
  215.             c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json','Content-Length: '+str(len(rename_str))]) 
  216.             c.setopt(pycurl.CUSTOMREQUEST,"PUT") 
  217.             c.setopt(pycurl.POSTFIELDS,rename_str) 
  218.             c.setopt(pycurl.WRITEFUNCTION, b.write) 
  219.             c.setopt(pycurl.FOLLOWLOCATION, 1) 
  220.             c.setopt(pycurl.MAXREDIRS, 5) 
  221.             c.perform() 
  222.             bbody = b.getvalue() 
  223.             self.status = c.getinfo(c.HTTP_CODE)   
  224.         except Exception,e: 
  225.             print e 
  226.         finally: 
  227.             c.close() 
  228.             b.close() 
  229.         if self.status == 200 : 
  230.             if "true" in body: 
  231.                 self.logger.info("Great,Successfully rename dir or file %s in hadoop cluster!!" % (rename_url)) 
  232.                 return True 
  233.             elif "false" in body: 
  234.                 self.logger.info("Sorry,can't rename dir or file,maybe this dir is not exsited!!") 
  235.                 return False 
  236.             else: 
  237.                 return False 
  238.              
  239.         else: 
  240.             self.logger.error("Sorry,can't create this %s dir in hadoop cluster!!1" % (rename_url)) 
  241.             self.Write_Debug_Log(self.status,rename_url)      
  242.  
  243.     def put_file(self,local_path,hdfs_path,overwrite="true",permission="755",buffersize="128"): 
  244.         print "yes ,put fils ing!!!" 
  245.         try: 
  246.             c = pycurl.Curl() 
  247.             put_str = '[{"op":"CREATE","overwrite":overwrite,"permission":permission,"buffersize":buffersize}]' 
  248.             put_url = "%s%s?op=CREATE&overwrite=%s&permission=%s&buffersize=%s" % (self.url_path,hdfs_path,overwrite,permission,buffersize) 
  249.             c.setopt(pycurl.URL, put_url) 
  250.             header_str = StringIO.StringIO() 
  251.             c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json','Content-Length: '+str(len(put_str))]) 
  252.             c.setopt(pycurl.CUSTOMREQUEST,"PUT") 
  253.             c.setopt(pycurl.HEADER,1) 
  254.             c.setopt(pycurl.HEADERFUNCTION,header_str.write) 
  255.             c.setopt(pycurl.POSTFIELDS,put_str) 
  256.             b = StringIO.StringIO() 
  257.             c.setopt(pycurl.WRITEFUNCTION, b.write) 
  258.             c.setopt(pycurl.FOLLOWLOCATION, 1) 
  259.             c.setopt(pycurl.MAXREDIRS, 5) 
  260.             c.perform() 
  261.             redirect_url = c.getinfo(pycurl.EFFECTIVE_URL) 
  262.         except Exception,e: 
  263.             print e 
  264.          
  265.         if os.path.isfile(local_path): 
  266.             try: 
  267.                 f = file(local_path) 
  268.                 filesize = os.path.getsize(local_path) 
  269.                 c.setopt(pycurl.URL, redirect_url) 
  270.                 c.setopt(pycurl.HEADER,1) 
  271.                 c.setopt(pycurl.CUSTOMREQUEST,"PUT") 
  272.                 c.setopt(pycurl.PUT,1) 
  273.                 c.setopt(pycurl.INFILE,f) 
  274.                 c.setopt(pycurl.INFILESIZE,filesize) 
  275.                 c.setopt(pycurl.WRITEFUNCTION, b.write) 
  276.                 c.setopt(pycurl.FOLLOWLOCATION, 1) 
  277.                 c.setopt(pycurl.MAXREDIRS, 5) 
  278.                 c.perform() 
  279.                 print "yes.is ready to putting..." 
  280.                 self.status = c.getinfo(c.HTTP_CODE) 
  281.                 print b.getvalue() 
  282.             except Exception,e: 
  283.                 print e 
  284.           finally: 
  285.                 b.close() 
  286.                 header_str.close() 
  287.                 f.close() 
  288.         else: 
  289.             self.logger.error("Sorry,the %s is not existed,maybe it is not a file." % local_path) 
  290.             return False 
  291.          
  292.  
  293.         if self.status != 201: 
  294.             print self.status 
  295.             self.Write_Debug_Log(self.status,put_str) 
  296.             return False 
  297.         else: 
  298.             self.logger.info("Great,successfully put file into hdfs %s " % hdfs_path) 
  299.             return True 
  300.  
  301.     def append(self,local_path,hdfs_path,buffersize=None): 
  302.         pass         
  303.  
  304.      
  305.      
  306.     def get_file(self, local_path, hdfs_path,buffersize="128"): 
  307.  
  308.         if not os.path.isfile(local_path): 
  309.             print local_path 
  310.             os.mknod(local_path) 
  311.         c = pycurl.Curl() 
  312.         f = file(local_path,'wb') 
  313.         put_str = '[{"op":"OPEN"}]' 
  314.         put_url = "%s%s?op=OPEN&buffersize=%s" % (self.url_path,hdfs_path,buffersize)         
  315.         try: 
  316.             print "yes .aaaaaaaaaaaaaaaaaaaaa" 
  317.             c.setopt(pycurl.URL, put_url) 
  318.             c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json','Content-Length: '+str(len(put_str))]) 
  319.             c.setopt(pycurl.CUSTOMREQUEST,"GET") 
  320.             f = file(local_path,'wb') 
  321.             c.setopt(pycurl.POSTFIELDS,put_str) 
  322.             c.setopt(pycurl.WRITEFUNCTION,f.write) 
  323.             c.setopt(pycurl.FOLLOWLOCATION, 1) 
  324.             c.setopt(pycurl.MAXREDIRS, 5) 
  325.             c.setopt(pycurl.CONNECTTIMEOUT,60) 
  326.             c.setopt(pycurl.TIMEOUT,300)             
  327.             c.perform() 
  328.  
  329.             print c.getinfo(pycurl.HTTP_CODE) 
  330.             self.status = c.getinfo(pycurl.HTTP_CODE) 
  331.         except Exception,e: 
  332.             print e 
  333.         finally: 
  334.             c.close() 
  335.             f.close() 
  336.  
  337.         if self.status != 200: 
  338.             print self.status 
  339.             self.Write_Debug_Log(self.status,put_str) 
  340.             return False 
  341.         else: 
  342.             self.logger.info("Great,successfully put file into hdfs %s " % hdfs_path) 
  343.             return True 
  344.  
  345.          
  346.          
  347.     def cat_file(self, hdfs_path,buffersize="128"): 
  348.         c = pycurl.Curl() 
  349.         b = StringIO.StringIO() 
  350.         put_str = '[{"op":"OPEN"}]' 
  351.         put_url = "%s%s?op=OPEN&buffersize=%s" % (self.url_path,hdfs_path,buffersize)         
  352.         try: 
  353.             print "yes .ready to open" 
  354.             c.setopt(pycurl.URL, put_url) 
  355.             c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json','Content-Length: '+str(len(put_str))]) 
  356.             c.setopt(pycurl.CUSTOMREQUEST,"GET") 
  357.  
  358.             c.setopt(pycurl.POSTFIELDS,put_str) 
  359.             c.setopt(pycurl.WRITEFUNCTION,b.write) 
  360.             c.setopt(pycurl.FOLLOWLOCATION, 1) 
  361.             c.setopt(pycurl.MAXREDIRS, 5) 
  362.             c.perform() 
  363.             self.status = c.getinfo(pycurl.HTTP_CODE) 
  364.             print c.getinfo(pycurl.HTTP_CODE) 
  365.             print "###-------------------------------------------###" 
  366.             print b.getvalue() 
  367.         except Exception,e: 
  368.                     print e 
  369.         finally: 
  370.             c.close() 
  371.             b.close() 
  372.  
  373.         if self.status != 200: 
  374.             print self.status 
  375.             self.Write_Debug_Log(self.status,put_str) 
  376.             return False 
  377.         else: 
  378.             self.logger.info("Great,successfully put file into hdfs %s " % hdfs_path) 
  379.             return True 
  380.          
  381.     def copy_in_hdfs(self,src,dst,overwrite="true",permission="755",buffersize="128"): 
  382.         tmpfile = "/tmp/copy_inhdfs_tmpfile" 
  383.         self.get_file(tmpfile,src) 
  384.         if self.status == 200: 
  385.             self.put_file(tmpfile,dst,overwrite="true"
  386.             if self.status == 201: 
  387.                 os.remove(tmpfile) 
  388.                 return True 
  389.             else: 
  390.                 os.remove(tmpfile) 
  391.                 return False 
  392.         else: 
  393.             os.remove(tmpfile) 
  394.             return False          
  395.          
  396.                   
  397.     def Write_Debug_Log(self,status,url): 
  398.         if status != 200 or status != 201 : 
  399.             self.logger.error("Url : \"%s\" ,Exit code : %s"%(url,self.status)) 
  400.             self.logger.error("fetch a error ,but don't quit") 
  401.              
  402.  
  403.  
  404.      

 

采用curl的方式实现的功能和java自带的命令行工具比较,还是有些不足的

1,不支持hadoop内部文件copy

2,不支持目录上传或者下载

3,测试的时候, shell的方式上传,如果文件已经存在回报错;curl的方式上传默认参数必须是overwrite=true,才能成功,不知道为什么。

唯一的好处就是,执行的时间大大提高了。

 同样一个列出目录列表的命令,

#time hadoop fs  -ls hdfs://192.168.0.112:50081/

 

real 0m10.916s

user 0m4.082s

sys 0m6.799s

#time curl -i http://192.168.0.112:50071/webhdfs/v1/?op=LISTSTATUS
 
real
0m0.005s
 
user
0m0.002s
 
sys
0m0.000s
 
而采用python的方式调用pycurl的模式来看
运行时间应该在0.01s左右。
 
 
快了很多啊。类的代码还在继续完善中。
继续努力了!呵呵呵