重慶分公司,新征程啟航
為企業提供網站建設、域名注冊、服務器等服務
為企業提供網站建設、域名注冊、服務器等服務
1. 概述
成都創新互聯長期為數千家客戶提供的網站建設服務,團隊從業經驗10年,關注不同地域、不同群體,并針對不同對象提供差異化的產品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯網生態環境。為梅河口企業提供專業的成都網站設計、做網站,梅河口網站改版等技術服務。擁有十余年豐富建站經驗和眾多成功案例,為您定制開發。
在傳統數據庫(如:MySQL)中,JOIN操作是非常常見且非常耗時的。而在HADOOP中進行JOIN操作,同樣常見且耗時,由于Hadoop的獨特設計思想,當進行JOIN操作時,有一些特殊的技巧。
本文首先介紹了Hadoop上通常的JOIN實現方法,然后給出了幾種針對不同輸入數據集的優化方法。
2. 常見的join方法介紹
假設要進行join的數據分別來自File1和File2.
2.1 reduce side join
reduce side join是一種最簡單的join方式,其主要思想如下:
在map階段,map函數同時讀取兩個文件File1和File2,為了區分兩種來源的key/value數據對,對每條數據打一個標簽(tag),比如:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不同文件中的數據打標簽。
在reduce階段,reduce函數獲取key相同的來自File1和File2文件的value list, 然后對于同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的連接操作。
REF:hadoop join之reduce side join
http://blog.csdn.net/huashetianzu/article/details/7819244
2.2 map side join
之所以存在reduce side join,是因為在map階段不能獲取所有需要的join字段,即:同一個key對應的字段可能位于不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的數據傳輸。
Map side join是針對以下場景進行的優化:兩個待連接表中,有一個表非常大,而另一個表非常小,以至于小表可以直接存放到內存中。這樣,我們可以將小表復制多份,讓每個map task內存中存在一份(比如存放到hash table中),然后只掃描大表:對于大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,如果有,則連接后輸出即可。
為了支持文件的復制,Hadoop提供了一個類DistributedCache,使用該類的方法如下:
(1)用戶使用靜態方法DistributedCache.addCacheFile()指定要復制的文件,它的參數是文件的URI(如果是HDFS上的文件,可以這樣:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口號)。JobTracker在作業啟動之前會獲取這個URI列表,并將相應的文件拷貝到各個TaskTracker的本地磁盤上。(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,并使用標準的文件讀寫API讀取相應的文件。
REF:hadoop join之map side join
http://blog.csdn.net/huashetianzu/article/details/7821674
2.3 Semi Join
Semi Join,也叫半連接,是從分布式數據庫中借鑒過來的方法。它的產生動機是:對于reduce side join,跨機器的數據傳輸量非常大,這成了join操作的一個瓶頸,如果能夠在map端過濾掉不會參加join操作的數據,則可以大大節省網絡IO。
實現方法很簡單:選取一個小表,假設是File1,將其參與join的key抽取出來,保存到文件File3中,File3文件一般很小,可以放到內存中。在map階段,使用DistributedCache將File3復制到各個TaskTracker上,然后將File2中不在File3中的key對應的記錄過濾掉,剩下的reduce階段的工作與reduce side join相同。
更多關于半連接的介紹,可參考:半連接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html
REF:hadoop join之semi join
http://blog.csdn.net/huashetianzu/article/details/7823326
2.4 reduce side join + BloomFilter
在某些情況下,SemiJoin抽取出來的小表的key集合在內存中仍然存放不下,這時候可以使用BloomFiler以節省空間。
BloomFilter最常見的作用是:判斷某個元素是否在一個集合里面。它最重要的兩個方法是:add() 和contains()。最大的特點是不會存在 false negative,即:如果contains()返回false,則該元素一定不在集合中,但會存在一定的 false positive,即:如果contains()返回true,則該元素一定可能在集合中。
因而可將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關系,只不過增加了少量的網絡IO而已。
更多關于BloomFilter的介紹,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500
3. 二次排序
在Hadoop中,默認情況下是按照key進行排序,如果要按照value進行排序怎么辦?即:對于同一個key,reduce函數接收到的value list是按照value排序的。這種應用需求在join操作中很常見,比如,希望相同的key中,小表對應的value排在前面。
有兩種方法進行二次排序,分別為:buffer and in memory sort和 value-to-key conversion。
對于buffer and in memory sort,主要思想是:在reduce()函數中,將某個key對應的所有value保存下來,然后進行排序。 這種方法最大的缺點是:可能會造成out of memory。
對于value-to-key conversion,主要思想是:將key和部分value拼接成一個組合key(實現WritableComparable接口或者調用setSortComparatorClass函數),這樣reduce獲取的結果便是先按key排序,后按value排序的結果,需要注意的是,用戶需要自己實現Paritioner,以便只按照key進行數據劃分。Hadoop顯式的支持二次排序,在Configuration類中有個setGroupingComparatorClass()方法,可用于設置排序group的key值,具體參考:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html
4. 后記
最近一直在找工作,由于簡歷上寫了熟悉Hadoop,所以幾乎每個面試官都會問一些Hadoop相關的東西,而 Hadoop上Join的實現就成了一道必問的問題,而極個別公司還會涉及到DistributedCache原理以及怎樣利用DistributedCache進行Join操作。為了更好地應對這些面試官,特整理此文章。
5. 參考資料
(1) 書籍《Data-Intensive Text Processing with MapReduce》 page 60~67 Jimmy Lin and Chris Dyer,University of Maryland, College Park
(2) 書籍《Hadoop In Action》page 107~131
(3) mapreduce的二次排序 SecondarySort:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html
(4) 半連接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html
(5) BloomFilter介紹:http://blog.csdn.net/jiaomeng/article/details/1495500
(6)本文來自:http://dongxicheng.org/mapreduce/hadoop-join-two-tables/