深入淺析Java7中的新特性forkjoin
深入淺析Java7中的新特性forkjoin?很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。
創(chuàng)新互聯(lián),為您提供重慶網(wǎng)站建設(shè)、重慶網(wǎng)站制作、網(wǎng)站營(yíng)銷推廣、網(wǎng)站開發(fā)設(shè)計(jì),對(duì)服務(wù)資質(zhì)代辦等多個(gè)行業(yè)擁有豐富的網(wǎng)站建設(shè)及推廣經(jīng)驗(yàn)。創(chuàng)新互聯(lián)網(wǎng)站建設(shè)公司成立于2013年,提供專業(yè)網(wǎng)站制作報(bào)價(jià)服務(wù),我們深知市場(chǎng)的競(jìng)爭(zhēng)激烈,認(rèn)真對(duì)待每位客戶,為客戶提供賞心悅目的作品。 與客戶共同發(fā)展進(jìn)步,是我們永遠(yuǎn)的責(zé)任!
Java7引入了Fork Join的概念,來(lái)更好的支持并行運(yùn)算。顧名思義,F(xiàn)ork Join類似與流程語(yǔ)言的分支,合并的概念。也就是說(shuō)Java7 SE原生支持了在一個(gè)主線程中開辟多個(gè)分支線程,并且根據(jù)分支線程的邏輯來(lái)等待(或者不等待)匯集,當(dāng)然你也可以fork的某一個(gè)分支線程中再開辟Fork Join,這也就可以實(shí)現(xiàn)Fork Join的嵌套。
有兩個(gè)核心類ForkJoinPool和ForkJoinTask。
ForkJoinPool實(shí)現(xiàn)了ExecutorService接口,起到線程池的作用。所以他的用法和Executor框架的使用時(shí)一樣的,當(dāng)然Fork Join本身就是Executor框架的擴(kuò)展。ForkJoinPool有3個(gè)關(guān)鍵的方法,來(lái)啟動(dòng)線程,execute(…),invoke(…),submit(…)。具體描述如下:
|
首先,用戶需要?jiǎng)?chuàng)建一個(gè)自己的ForkJoinTask。代碼如下:
public class MyForkJoinTask extends ForkJoinTask { /** * */ private static final long serialVersionUID = 1L; private V value; private boolean success = false; @Override public V getRawResult() { return value; } @Override protected void setRawResult(V value) { this.value = value; } @Override protected boolean exec() { System.out.println("exec"); return this.success; } public boolean isSuccess() { return success; } public void setSuccess(boolean isSuccess) { this.success = isSuccess; } }
測(cè)試ForkJoinPool.invoke(…):
@Test public void testForkJoinInvoke() throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask task = new MyForkJoinTask(); task.setSuccess(true); task.setRawResult("test"); String invokeResult = forkJoinPool.invoke(task); assertEquals(invokeResult, "test"); } @Test public void testForkJoinInvoke2() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); new Thread(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } task.complete("test"); } }).start(); // exec()返回值是false,此處阻塞,直到另一個(gè)線程調(diào)用了task.complete(...) String result = forkJoinPool.invoke(task); System.out.println(result); } @Test public void testForkJoinSubmit() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); task.setSuccess(true); // 是否在此任務(wù)運(yùn)行完畢后結(jié)束阻塞 ForkJoinTask result = forkJoinPool.submit(task); result.get(); // 如果exec()返回值是false,在此處會(huì)阻塞,直到調(diào)用complete }
測(cè)試ForkJoinPool.submit(…):
@Test public void testForkJoinSubmit() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); task.setSuccess(true); // 是否在此任務(wù)運(yùn)行完畢后結(jié)束阻塞 ForkJoinTask result = forkJoinPool.submit(task); result.get(); // 如果exec()返回值是false,在此處會(huì)阻塞,直到調(diào)用complete } @Test public void testForkJoinSubmit2() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); forkJoinPool.submit(task); Thread.sleep(1000); } @Test public void testForkJoinSubmit3() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); new Thread(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } task.complete("test"); } }).start(); ForkJoinTask result = forkJoinPool.submit(task); // exec()返回值是false,此處阻塞,直到另一個(gè)線程調(diào)用了task.complete(...) result.get(); Thread.sleep(1000); }
測(cè)試ForkJoinPool.execute(…):
@Test public void testForkJoinExecute() throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask task = new MyForkJoinTask(); forkJoinPool.execute(task); // 異步執(zhí)行,無(wú)視task.exec()返回值。 }
在實(shí)際情況中,很多時(shí)候我們都需要面對(duì)經(jīng)典的“分治”問(wèn)題。要解決這類問(wèn)題,主要任務(wù)通常被分解為多個(gè)任務(wù)塊(分解階段),其后每一小塊任務(wù)被獨(dú)立并行計(jì)算。一旦計(jì)算任務(wù)完成,每一快的結(jié)果會(huì)被合并或者解決(解決階段)。ForkJoinTask天然就是為了支持“分治”問(wèn)題的。
分支/合并的完整過(guò)程如下:
下面列舉一個(gè)分治算法的實(shí)例。
import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class MaximumFinder extends RecursiveTask{ private static final int SEQUENTIAL_THRESHOLD = 5; private final int[] data; private final int start; private final int end; public MaximumFinder(int[] data, int start, int end) { this.data = data; this.start = start; this.end = end; } public MaximumFinder(int[] data) { this(data, 0, data.length); } @Override protected Integer compute() { final int length = end - start; if (length < SEQUENTIAL_THRESHOLD) { return computeDirectly(); } final int split = length / 2; final MaximumFinder left = new MaximumFinder(data, start, start + split); left.fork(); final MaximumFinder right = new MaximumFinder(data, start + split, end); return Math.max(right.compute(), left.join()); } private Integer computeDirectly() { System.out.println(Thread.currentThread() + ' computing: ' + start + ' to ' + end); int max = Integer.MIN_VALUE; for (int i = start; i < end; i++) { if (data[i] > max) { max = data[i]; } } return max; } public static void main(String[] args) { // create a random data set final int[] data = new int[1000]; final Random random = new Random(); for (int i = 0; i < data.length; i++) { data[i] = random.nextInt(100); } // submit the task to the pool final ForkJoinPool pool = new ForkJoinPool(4); final MaximumFinder finder = new MaximumFinder(data); System.out.println(pool.invoke(finder)); } }
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。
分享文章:深入淺析Java7中的新特性forkjoin
URL鏈接:http://www.xueling.net.cn/article/jogdpc.html