本章涵盖
开发用于并行计算的功能性API
代数方式接近API
定义泛型组合器
由于现代计算机的每个CPU有多个内核,并且通常有多个CPU,因此以可以利用这种并行处理能力的方式设计程序比以往任何时候都更加重要。但是以并行方式运行的程序的交互是复杂的,而且众所周知,执行线程之间的传统通信机制-共享可变内存-很难推理。这很容易导致程序出现争用条件和死锁,不容易测试,并且不能很好地扩展。
在本章中,我们将构建一个纯函数库,用于创建并行和异步计算。我们将通过仅使用纯函数来描述并行程序固有的复杂性。这将使我们能够使用替换模型来简化我们的推理,并希望使并发计算既简单又愉快。
valoutputList=parMap(inputList)(f)
为了实现这一目标,我们将迭代工作。我们将从我们希望我们的库处理的简单用例开始,然后开发一个促进此用例的接口;只有这样,我们才会考虑这个接口的实现应该是什么。随着我们不断完善我们的设计,我们将在接口和实现之间摇摆不定,同时通过越来越复杂的用例更好地了解领域和设计空间。我们将强调代数推理,并介绍API可以用遵守特定定律的代数来描述的想法。
为什么要设计我们自己的库?为什么不直接在scala.并发包中使用Scala标准库附带的并发原语呢?这部分是出于教学目的—我们想向您展示设计自己的实用库是多么容易。但还有另一个原因:我们希望鼓励这样一种观点,即没有现有的图书馆是权威的或无法重新审查的,即使由专家设计并标记为标准。做其他人做的事情有一定的安全性,但传统的东西不一定是最实用的。大多数库都包含许多任意的设计选择,其中许多是无意中做出的。当你从头开始时,你可以重新审视设计库的所有基本假设,走一条不同的道路,并发现其他人可能甚至没有考虑过的问题空间。因此,您可能会得出更适合您目的的自己的设计。在这种情况下,我们的基本假设是我们的库绝对不允许副作用。
我们将在本章中编写大量代码,主要作为读者的练习。与往常一样,您可以在本书随附的可下载内容中找到答案。
7.1选择数据类型和函数当你开始设计一个函数库时,你通常会对你希望能够做什么有一些一般的想法,而设计过程中的困难在于提炼这些想法并找到一个支持你想要的功能的数据类型。在我们的例子中,我们希望能够创建并行计算,但这到底意味着什么?让我们尝试将其细化为可以通过检查一个简单的、可并行化的计算来实现的东西:对整数列表求和。通常的左折如下:
defsum(ints:Seq[Int]):Int=(0)((a,b)=a+b)
这里Seq是标准库中列表和其他序列的超类。重要的是,它有一个foldLeft方法。我们可以使用分而治之算法,而不是按顺序折叠(请参阅以下列表)。
示例7.1使用分而治之算法对列表求和
defsum(ints:IndexedSeq[Int]):Int=①=1(0)②elseval(l,r)=(/2)③sum(l)+sum(r)④
(1)IndexedSeq是标准库中随机存取序列的超类,如Vector。与列表不同,这些序列提供了一种有效的splitAt方法,用于在特定索引处将它们分成两部分。
(4)headOption是在Scala中的所有集合上定义的方法。我们在第章中看到了这个函数。
(3)使用splitAt函数将序列分成两半
(4)递归地将两半相加,并将结果相加
我们使用splitAt函数将序列分成两半,递归地对两半求和,然后合并它们的结果。与基于foldLeft的实现不同,此实现可以并行化;两半可以并行相加。
简单示例的重要性
7.1.1并行计算的数据类型查看行sum(l)+sum(r),它递归地调用两半的sum。仅通过查看这一行,我们可以看到,我们可能选择表示并行计算的任何数据类型都需要能够包含结果。该结果将具有一些有意义的类型(在本例中为Int),我们需要某种方法来提取此结果。让我们将这些新发现的知识应用到我们的设计中。现在,我们可以为我们的结果发明一个容器类型,Par[A](用于并行),并立法规定我们需要的函数的存在:
defunit[A](a:=A):Par[A]—获取未计算的A,返回可能在单独的线程中评估它的计算。我们称它为单元,因为从某种意义上说,它创建了一个仅包装单个值的并行单元。
defget[A](a:Par[A]):A—从并行计算中提取结果值。
我们真的能做到吗?是的,当然!现在,我们不需要担心我们需要什么其他函数,Par的内部表示可能是什么,或者这些功能是如何实现的。我们只是通过检查我们的简单示例来读取必要的数据类型和函数。现在让我们更新此示例。
示例7.2使用我们的自定义数据类型更新总和
defsum(ints:IndexedSeq[Int]):Int==1(0)elseval(l,r)=(/2)valsumL:Par[Int]=(sum(l))①valsumR:Par[Int]=(sum(r))②(sumL)+(sumR)③
(1)并行计算左半部分
(2)并行计算右半部分
(3)提取结果并求和
我们已经将两个递归调用包装为对单位的调用求和,并且我们调用get以从两个子计算中提取两个结果。
直接使用并发原语的问题
和Runnable呢?让我们来看看这些类。以下是他们的API的部分摘录,转录到Scala中:
traitRunnable:defrun:UnitclassThread(r:Runnable):defstart:Unit①defjoin:Unit②
(1)开始在单独的线程中运行r
(2)阻塞调用线程,直到r完成运行
我们已经可以看到这两种类型的一个问题:没有一个方法返回有意义的值。因此,如果我们想从Runnable中获取任何信息,它必须有一些副作用,比如改变我们可以检查的某些状态。
这对组合性不利;我们不能一般地操纵Runnable对象,因为我们总是需要了解它们的内部行为。线程还有一个缺点,即它直接映射到操作系统线程上,而操作系统线程是一种稀缺资源。最好创建尽可能多的逻辑线程,然后再将这些线程映射到实际的操作系统线程上。
这种事情可以通过并发之类的东西来处理。未来、执行服务或类似。我们为什么不直接使用它们?以下是他们API的一部分:
classExecutorService:defsubmit[A](a:Callable[A]):Future[A]traitFuture[A]:defget:A
虽然它们在物理线程上抽象方面提供了巨大的帮助,但这些原语的抽象级别仍然比我们在本章中要创建的库低得多。例如,对的调用会阻塞调用线程,直到ExecutorService完成执行它,并且其API不提供编写期货的方法。当然,我们可以在这些工具之上构建库的实现(事实上,这是我们在本章后面最终要做的事情),但它们并没有提供我们想要直接从函数式程序中使用的模块化和组合API。
我们现在可以选择单位的含义并得到;Unit可以立即在单独的(逻辑)线程中开始计算其参数,1或者它可以简单地保留其参数,直到调用get并开始计算。但请注意,在这个例子中,如果我们想获得任何程度的并行性,我们要求unit开始并发计算其参数并立即返回。你能明白为什么吗?阿拉伯数字
但是,如果单位开始同时评估其参数,那么调用get可以说会破坏引用透明度。我们可以通过用它们的定义替换sumL和sumR来看到这一点;如果我们这样做,我们仍然得到相同的结果,但我们的程序不再是并行的:
((sum(l)))+((sum(r)))
如果单元立即开始评估其参数,get将等待该评估完成。这意味着如果我们简单地内联sumL和sumR变量,+号的两侧将不会并行运行。我们可以看到该单位具有明确的副作用,但仅限于获得.也就是说,在这种情况下,unit只是返回一个Par[Int],表示异步计算。但是一旦我们通过了那个Par来获得,我们明确地等待它,暴露了副作用。因此,我们似乎希望避免调用get,或者至少将调用它推迟到最后。我们希望能够组合异步计算,而无需等待它们完成。
在我们继续之前,请花点时间检查一下我们所做的工作。首先,我们想出了一个简单、几乎微不足道的例子。接下来,我们稍微探讨了这个例子,以揭示设计选择。然后,通过一些实验,我们发现了一个选项的有趣结果,并在此过程中了解了我们问题域的性质!整个设计过程是一系列这些小冒险。你不需要任何特殊的许可证来进行这种探索,也不需要成为函数式编程方面的专家。只要潜入,看看你发现了什么。
7.1.2组合并行计算让我们看看是否可以避免上述组合单元并获得的陷阱。如果我们不调用get,这意味着我们的sum函数必须返回一个Par[Int]。这种变化揭示了什么后果?同样,让我们发明具有所需签名的函数:
defsum(ints:IndexedSeq[Int]):Par[Int]==1((0))elseval(l,r)=(/2)(sum(l),sum(r))(_+_)
习题7.1
——————————————————————————————
是一个新的高阶函数,用于组合两个并行计算的结果。它的签名是什么?尽可能提供最通用的签名(不假设它仅适用于Int)。
请注意,我们不再在递归情况下调用unit,并且不清楚unit是否应该懒惰地接受它的参数。在这个例子中,懒惰地接受论点似乎不会提供任何好处,但也许情况并非总是如此。让我们稍后再回到这个问题。
那么map2呢——它应该懒惰地接受它的论点吗?对于map2来说,并行运行计算的两端是有意义的,给每一端平等的运行机会(map2参数的顺序很重要似乎是任意的,因为我们只是希望map2表明正在组合的两个计算是独立的,可以并行运行)。什么选择让我们实现这个意义?作为一个简单的测试用例,考虑如果map2在两个参数中都很严格并且我们正在评估sum(IndexedSeq(1,2,3,4))会发生什么。花点时间完成并理解以下(有点风格化的)程序跟踪。
清单7.3程序跟踪求和
sum(IndexedSeq(1,2,3,4))map2(sum(IndexedSeq(1,2)),sum(IndexedSeq(3,4)))(_+_)map2(map2(sum(IndexedSeq(1)),sum(IndexedSeq(2)))(_+_),sum(IndexedSeq(3,4)))(_+_)map2(map2(unit(1),unit(2))(_+_),sum(IndexedSeq(3,4)))(_+_)map2(map2(unit(1),unit(2))(_+_),map2(sum(IndexedSeq(3)),sum(IndexedSeq(4)))(_+_))(_+_)
在这个跟踪中,为了计算sum(x),我们将x代入sum的定义中,就像我们在前面的章节中所做的那样。因为map2是严格的,并且Scala从左到右计算参数,所以每当我们遇到map2(sum(x),sum(y))(_+_)时,我们必须递归地计算sum(x)等等。这有一个相当不幸的后果,要求我们先严格构造求和树的整个左半部分,然后再(严格)构造右半部分。这里sum(IndexedSeq(1,2))在我们考虑sum(IndexedSeq(3,4))之前完全展开。如果map2并行评估其参数(使用用于实现并行性的任何资源,如线程池),这意味着我们计算的左半部分将在我们开始构建计算的右半部分之前开始执行。
如果我们保持map2严格但不让它立即开始执行怎么办?这有帮助吗?如果map2没有立即开始评估,这意味着Par值只是构建了需要并行计算的内容的描述。在我们评估此描述之前,实际上什么都不会发生,也许使用类似get的函数。问题是,如果我们严格地构建我们的描述,它们将是相当重量级的对象。回顾我们的跟踪,我们的描述必须包含要执行的操作的完整树:
map2(map2(unit(1),unit(2))(_+_),map2(unit(3),unit(4))(_+_))(_+_)
无论我们使用什么数据结构来存储此描述,它都可能比原始列表本身占用更多的空间。如果我们的描述更轻量级,那就太好了。
似乎我们应该让map2变得懒惰,并让它立即开始并行执行双方。这也解决了任何一方都不优先于另一方的问题。
7.1.3显式分叉我们的最新选择仍然有些不对劲。我们是否总是想并行评估map2的两个参数?应该不会。考虑这个简单的假设示例:
((1),(1))(_+_)
在这种情况下,我们碰巧知道我们正在组合的两个计算将执行得如此之快,以至于生成一个单独的逻辑线程来评估它们没有多大意义,但是我们的API没有为我们提供任何提供此类信息的方法。也就是说,我们当前的API对于计算何时从主线程分叉非常明确;程序员无法指定这种分叉应该发生在哪里。如果我们使分叉更明确怎么办?我们可以通过发明另一个函数来做到这一点,deffork[A](a:=Par[A]):Par[A],我们可以认为这意味着给定的Par应该在单独的逻辑线程中运行:
defsum(ints:IndexedSeq[Int]):Par[Int]==1((0))elseval(l,r)=(/2)((sum(l)),(sum(r)))(_+_)
使用fork,我们现在可以使map2变得严格,如果程序员愿意,可以由他们来包装参数。像fork这样的函数解决了过于严格地实例化并行计算的问题,但更根本的是,它将并行性明确地置于程序员的控制之下。我们在这里解决两个问题:首先,我们需要某种方法来指示两个并行任务的结果应该合并。除此之外,我们可以选择是否应异步执行特定任务。通过将这些问题分开,我们避免了将任何类型的并行性全局策略附加到map2和我们编写的其他操作上,这意味着对哪种全局策略最好做出艰难(最终是武断的)选择。
现在让我们回到单位应该严格还是懒惰的问题。使用fork,我们现在可以使单位严格,而不会损失任何表现力。它的非严格版本——我们称之为lazyUnit——可以使用unit和fork来实现:
defunit[A](a:A):Par[A]deflazyUnit[A](a:=A):Par[A]=fork(unit(a))
函数lazyUnit是派生组合子的简单示例,与原始组合器(如unit)相反。我们在这里非正式地使用术语组合器来指代在设计时考虑组合的函数和值,允许从单一用途部件逐步构建有趣的功能。我们能够根据其他操作来定义lazyUnit;稍后,当我们为Par选择一个表示时,lazyUnit不需要知道有关该表示的任何信息——它对Par的唯一了解将来自在Par上定义的操作分支和单元。3
我们知道我们希望fork发出信号,表明它的参数在单独的逻辑线程中得到评估,但我们仍然有一个问题,即它是否应该在被调用时立即开始这样做,或者保留它的参数,以便在稍后强制计算时在逻辑线程中进行评估,使用get之类的东西。换句话说,评估应该是分叉还是get的责任?评估应该是急切还是懒惰?当您不确定要分配给API中某些函数的含义时,您可以随时继续设计过程;在以后的某个时候,不同含义选择的权衡可能会变得清晰。在这里,我们利用了一个有用的技巧:我们将考虑实现fork和获取各种含义需要什么样的信息。
如果fork立即开始并行评估其参数,则实现必须清楚地直接或间接地知道如何创建线程或将任务提交到某种线程池。此外,这意味着线程池(或我们用于实现并行性的任何资源)必须(全局)可访问并正确初始化,无论我们想要调用fork。4这意味着我们失去了控制用于程序不同部分的并行策略的能力。虽然拥有用于执行并行任务的全局资源本身并没有错,但我们可以想象,对在哪里使用哪些实现进行更精细的控制会很有用(例如,我们可能希望大型应用程序的每个子系统都拥有自己的具有不同参数的线程池)。赋予get创建线程和提交执行任务的责任似乎更合适。
请注意,得出这些结论并不需要确切地知道forkandget将如何实现,甚至不需要知道Par的表示形式是什么。我们只是非正式地推理了实际生成并行任务所需的信息类型,并检查了让Par值知道此信息的后果。
相反,如果fork只是保留其未计算的参数直到以后,它不需要访问实现并行性的机制;它只是获取一个未评估的Par并将其标记为并发评估。现在让我们假设这个意思是叉子。有了这个模型,Par本身不需要知道如何实际实现并行性。它更像是对并行计算的描述,稍后由get函数之类的东西解释。这与以前相比发生了转变,当时我们认为Par是一个值的容器,我们可以在它可用时简单地获得它。现在,它更像是我们可以运行的一流程序。因此,让我们重命名我们的get函数以运行,并指示这是实际实现并行性的地方:
extension[A](pa:Par[A])defrun:A
因为Par现在只是一个纯数据结构,所以run需要有一些实现并行性的方法,无论是生成新线程、将任务委托给线程池,还是使用其他机制。
7.2选取表示通过探索这个简单的例子并思考不同选择的后果,我们已经勾勒出以下API。
清单7.4ParAPI的基本草图
defunit[A](a:A):Par[A]①extension[A](pa:Par[A])defmap2[B,C](pb:Par[B])(f:(A,B)=C):Par[C]②deffork[A](a:=Par[A]):Par[A]③deflazyUnit[A](a:=A):Par[A]=fork(unit(a))④extension[A](pa:Par[A])defrun:A⑤
(1)创建一个计算,该计算立即生成值a
(2)将两个并行计算的结果与二进制函数相结合
(3)标记运行并发评估的计算
(4)将表达式a包装起来,以便通过运行进行并发计算
(5)完全评估给定的Par,根据fork的请求生成并行计算并提取结果值
我们还松散地为这些不同的函数分配了含义:
unit—将常量值提升为并行计算
map2—将两个并行计算的结果与二进制函数相结合
fork—标记并发评估的计算-在运行强制之前不会进行评估
lazyUnit—将其未计算的参数包装在Par中,并将其标记为并发计算
run—通过执行计算从Par中提取值
在勾勒出API的任何时候,您都可以开始考虑出现的抽象类型的可能表示形式。
习题7.2
——————————————————————————————
在继续之前,请尝试为Par提出可以实现我们API功能的表示。
让我们看看我们是否可以想出一个表示。我们知道运行需要以某种方式执行异步任务。我们可以编写自己的低级API,但是在Java标准库中已经有一个类可以使用:。以下是它的API,摘录并转录为Scala:
classExecutorService:defsubmit[A](a:Callable[A]):Future[A]traitCallable[A]:defcall:A①traitFuture[A]:defget:Adefget(timeout:Long,unit:TimeUnit):Adefcancel(evenIfRunning:Boolean):BooleandefisDone:BooleandefisCancelled:Boolean
(1)本质上只是一个懒惰的A
所以ExecutorService让我们提交一个可调用的值(在Scala中,我们可能只是使用一个惰性参数来提交)并返回一个相应的future,它是可能在单独线程中运行的计算的句柄。我们可以使用其get方法从Future中获取一个值(该方法阻止当前线程直到该值可用),并且它具有一些额外的取消功能(在阻塞一定时间后抛出异常等等)。
让我们尝试假设我们的run函数可以访问ExecutorService,看看这是否暗示了Par的表示:
extension[A](pa:Par[A])defrun(s:ExecutorService):A
Par[A]最简单的模型可能是ExecutorService=A。这将使运行变得微不足道。但是,将等待计算多长时间或是否取消计算的决定推迟到run的调用者可能会很好。所以Par[A]变成了ExecutorService=Future[A],并且run只是返回Future:
opaquetypePar[A]=ExecutorService=Future[A]①extension[A](pa:Par[A])defrun(s:ExecutorService):Future[A]=pa(s)
(1)Par被定义为不透明类型,尽管我们可以使用常规类型别名或包装函数的case类,就像上一章中的State一样。使用不透明类型可提供内部表示形式的封装,同时避免不必要的分配。
请注意,由于Par由需要ExecutorService的函数表示,因此在提供此ExecutorService之前,Future的创建实际上不会发生。真的有那么简单吗?让我们假设它是现在的,如果我们发现它不允许我们想要的某些功能,请修改我们的模型。
7.2.1优化接口到目前为止,我们的工作方式有点人为。在实践中,设计API和选择表示形式之间没有如此明确的界限,而且一个不一定先于另一个。表示的想法可以为API提供信息,API可以为表示的选择提供信息,并且很自然地在这两个视角之间流畅地切换,在出现问题时运行实验,构建原型等等。
我们将在本节中专门探讨我们的API。虽然我们在考虑一个简单的例子中得到了很多好处,但在我们添加任何新的基元操作之前,让我们更多地了解使用我们已经拥有的基元操作可以表达的内容。有了我们的原语和对它们的意义的选择,我们为自己开辟了一个小宇宙。我们现在开始发现在这个宇宙中可以表达哪些想法。这可以而且应该是一个流动的过程;我们可以随时改变宇宙的规则,从根本上改变我们的表示或引入新的原始人,并探索我们的创造物的行为方式。
让我们从实现到目前为止开发的API的功能开始。现在我们有了Par的表示,它的第一个破解应该很简单。下面是使用我们选择的Par表示的简单实现。
清单7.5Par的基本实现
objectPar:defunit[A](a:A):Par[A]=es=UnitFuture(a)①privatecaseclassUnitFuture[A](get:A)extsFuture[A]:defisDone=truedefget(timeout:Long,units:TimeUnit)=getdefisCancelled=falsedefcancel(evenIfRunning:Boolean):Boolean=falseextension[A](pa:Par[A])defmap2[B,C](pb:Par[B])(f:(A,B)=C):Par[C]=②(es:ExecutorService)=③valfutureA=a(es)valfutureB=b(es)UnitFuture(f(,))④deffork[A](a:=Par[A]):Par[A]=⑤es=(newCallable[A]{defcall=a(es).get})(1)unit表示为返回UnitFuture的函数,它是Future的简单实现,只包装一个常量值。它根本不使用执行器服务;它总是完成的,不能取消。它的get方法只是返回我们给它的值。
(2)map2不会在单独的逻辑线程中评估对f的调用,这符合我们的设计选择,即让fork成为API中用于控制并行性的唯一函数。如果我们希望f的评估发生在单独的线程中,我们总是可以做fork((pb)(f))。
(3)我们从ExecutorService返回一个函数到Future[C]。es上的类型归属在这里是可选的;Scala可以从Par的定义中推断出来。
(2)map的这种实现不遵守超时。它只是将执行器服务传递给两个面值,等待期货af和bf的结果,对它们应用f,并将它们包装在UnitFuture中。为了尊重超时,我们需要一个新的Future实现来记录评估af所花费的时间,然后从分配给评估bf的可用时间中减去该时间。
(5)这是fork最简单,最自然的实现,但它存在一些问题;首先,外部Callable将阻止等待内部任务完成。由于此阻塞占用了线程池中的线程,或者支持ExecutorService的任何资源,这意味着我们正在失去一些潜在的并行性。从本质上讲,当一个线程就足够了时,我们使用两个线程。这是实现中一个更严重问题的症状,我们将在本章后面讨论这个问题。
我们应该注意,Future没有纯粹的功能界面。这是我们不希望我们库的用户直接与Future打交道的部分原因。但重要的是,即使Future上的方法依赖于副作用,我们的整个ParAPI仍然是纯粹的。只有在用户调用运行并且实现收到执行器服务之后,我们才会公开Future机器。因此,我们的用户编程到一个纯粹的接口,其实现仍然依赖于一天结束时的效果。但由于我们的API保持纯净,因此这些影响不是副作用。在第4部分中,我们将详细讨论这种区别。
习题7.3
——————————————————————————————
困难:修复map2的实现,使其遵守Future上的超时协定。
习题7.4
——————————————————————————————
此API已启用一组丰富的操作。下面是一个简单的示例。使用lazyUnit编写一个函数,将任何函数A=B转换为异步计算其结果的函数:
defasyncF[A,B](f:A=B):A=Par[B]
我们现有的组合器还能表达什么?让我们看一个更具体的例子。
假设我们有一个Par[List[Int]]表示生成List[Int]的并行计算,并且我们想将其转换为结果排序的Par[List[Int]]:
defsortPar(parList:Par[List[Int]]):Par[List[Int]]
当然,我们可以运行Par,对结果列表进行排序,并将其重新打包到带有单位的Par中—但我们希望避免调用run。我们拥有的唯一另一个允许我们以任何方式操纵Par值的组合器是map2.因此,如果我们将parList传递给map2的一侧,我们将能够访问内部的列表并对其进行排序,我们可以将我们想要的任何内容传递给map2的另一侧,所以让我们只传递一个no-op:
defsortPar(parList:Par[List[Int]]):Par[List[Int]]=(unit(()))((a,_)=)
我们现在可以告诉Par[List[Int]]我们希望对该列表进行排序,但我们不妨进一步概括这一点。我们可以提升任何类型A=B的函数,成为取Par[A]并返回Par[B]的函数,我们可以将任何函数映射到Par上:
extension[A](pa:Par[A])defmap[B](f:A=B):Par[B]=(unit(()))((a,_)=f(a))
例如,sortPar现在只是这样:
defsortPar(parList:Par[List[Int]])=(_.sorted)
这句话言简意赅。我们只是组合了操作以使类型对齐。然而,如果你看一下map2和unit的实现,应该很清楚,map的这种实现意味着一些明智的东西。
将一个虚假的值unit(())作为参数传递给map2只是为了忽略它的值,这是作弊吗?一点也不!事实上,我们可以根据map2实现map,但不能反过来,只是表明map2严格比map更强大。当我们设计库时,这种事情经常发生;一个看似原始的函数通常可以使用一些更强大的原始函数来表达。
我们还可以使用我们的API实现什么?我们可以并行映射列表吗?与map2不同,它结合了两个并行计算,parmap(我们称之为它)需要组合N个并行计算。似乎这应该以某种方式表达:
defparMap[A,B](ps:List[A])(f:A=B):Par[List[B]]
我们总是可以将parMap写成一个新的基元。请记住,Par[A]只是ExecutorService=Future[A]的别名。
将操作实现为新的基元并没有错。在某些情况下,我们甚至可以通过假设我们正在处理的数据类型的基础表示来更有效地实现操作。但是现在,我们有兴趣探索使用我们现有的API可以表达哪些操作,并掌握我们定义的各种操作之间的关系。在第3部分中,当我们展示如何跨库抽象常见模式时,了解哪些组合子是真正的基元将变得更加重要。5
让我们看看在现有组合器方面我们可以在多大程度上实现parMap:
defparMap[A,B](ps:List[A])(f:A=B):Par[List[B]]=valfbs:List[Par[B]]=(asyncF(f))
请记住,asyncF通过分叉并行计算来生成结果,将A=B转换为A=Par[B]。因此,我们可以很容易地分叉我们的N个并行计算,但我们需要一些方法来收集它们的结果。我们被困住了吗?好吧,仅通过检查类型,我们可以看到我们需要某种方式将我们的List[Par[B]]转换为parMap返回类型所需的Par[List[B]]。
习题7.5
——————————————————————————————
写这个函数,叫做序列。不需要额外的基元;不要调用运行:
defsequence[A](ps:List[Par[A]]):Par[List[A]]
一旦我们有了序列,我们就可以完成parMap的实现:
defparMap[A,B](ps:List[A])(f:A=B):Par[List[B]]=fork:valfbs:List[Par[B]]=(asyncF(f))sequence(fbs)
请注意,我们已将我们的实现包装在对fork的调用中。通过此实现,parMap将立即返回,即使对于庞大的输入列表也是如此。当我们稍后调用run时,它将分叉单个异步计算,该计算本身生成N个并行计算,然后等待这些计算完成,将它们的结果收集到一个列表中。相反,如果我们省略了对fork的调用,调用parMap将在调用序列之前首先创建fbs列表,从而在调用线程上执行一些计算。
习题7.6
——————————————————————————————
实现parFilter,它并行过滤列表的元素:
defparFilter[A](as:List[A])(f:A=Boolean):Par[List[A]]
你能想到其他有用的函数来写吗?尝试编写一些自己的并行计算,看看哪些计算可以在没有其他基元的情况下表达。以下是一些可以尝试的想法:
我们在本章开头写的并行求和函数是否有更通用的版本?尝试使用它来并行查找IndexedSeq的最大值。
编写一个函数,该函数采用段落列表(List[String])并并行返回所有段落中的单词总数。寻找推广此函数的方法。
根据map3实现map4、map5和map2。
7.3API的代数如上一节所示,我们通常只需写下所需操作的类型签名,然后将类型跟踪到实现中,就可以走得更远。当以这种方式工作时,我们几乎可以忘记具体的领域(例如,当我们根据map2和unit实现map时),而只专注于排列类型。这不是作弊;这是一种自然的推理风格,类似于简化代数方程时的推理。我们将API视为代数6或一组抽象的操作,以及一组我们假设为真的定律或属性,并且只是按照此代数指定的规则进行形式符号操作。
到目前为止,我们一直在非正式地推理我们的API。这并没有错,但退后一步并正式确定您希望为您的API保留(或想要保留)哪些法律可能会有所帮助。7在不知不觉中,你可能已经在心理上建立了一个你期望的属性或规律的模型。实际上,将这些内容写下来并使它们精确,可以突出设计选择,这些选择在非正式推理时不会很明显。
7.3.1映射定律像任何设计选择一样,选择法律会产生后果;它对操作的含义施加约束,确定可能的实现选择,并影响哪些其他属性可以为真。让我们看一个例子,在这个例子中,我们将编造一个看似合理的可能定律。如果我们为库编写测试,这可以用作测试用例:
unit(1).map(_+1)==unit(2)
我们的意思是,使用_+1函数映射在单位(1)上在某种意义上等同于unit(2)。(法律通常是这样开始的——作为我们期望持有的身份的具体例子8。它们在什么意义上是等价的?这是一个有趣的问题。现在,假设两个Par对象是等效的,如果对于任何有效的ExecutorService参数,它们的Future结果具有相同的值。
我们可以使用如下函数检查这是否适用于特定的执行器服务:
defequal[A](e:ExecutorService)(p:Par[A],p2:Par[A]):Boolean=p(e).get==p2(e).get
法律和功能有很多共同点。正如我们可以概括函数一样,我们可以概括规律。例如,上述内容可以这样概括:
unit(x).map(f)==unit(f(x))
这里我们说这种等式应该适用于x和f的任何选择,而不仅仅是1和_+1函数。这种平等对我们的实施施加了一些限制;例如,我们的Unit实现不能检查它收到的值,并决定在输入为42时返回结果为1的并行计算——它只能传递它收到的任何内容。同样,对于我们的执行器服务,当我们向它提交可调用对象执行时,它不能根据它收到的值做出任何假设或改变行为。更具体地说,该定律不允许在映射和单元的实现中进行向下转换或isInstanceOf检查(通常分组在术语类型化下)。
就像我们努力用更简单的函数来定义函数一样,每个函数只做一件事,我们可以用更简单的定律来定义定律,每个定律只说一件事。让我们看看是否可以进一步简化这条法律。我们说我们希望这条定律适用于x和f的任何选择。如果我们用恒等函数代替f,就会发生一些有趣的事情。9我们可以简化等式的两边,得到一个相当简单的新定律:10
unit(x).map(f)==unit(f(x))①unit(x).map(id)==unit(id(x))②unit(x).map(id)==unit(x)③(id)==y④
(1)初始定律
(2)用恒等函数代替f。
(3)简化。
(4)将两边的单位(x)替换为y。
迷人!我们新的,更简单的法律只谈论地图;显然,提到单位是一个无关紧要的细节。为了深入了解这项新法律的内容,让我们考虑一下地图不能做什么。比如说,它不能在将函数应用于结果之前抛出异常并使计算崩溃(你能明白为什么这违反了法律吗?它所能做的就是将函数f应用于y的结果,当然,当该函数为id时,y不受影响。11更有趣的是,给定(id)==y,我们可以在另一个方向上执行替换,以恢复我们原来的、更复杂的定律。(试试吧!从逻辑上讲,我们可以自由地这样做,因为map对于它接收的不同函数类型,它的行为不可能有所不同。因此,给定(id)==y,则unit(x).map(f)==unit(f(x))一定为真。由于我们免费获得这个第二定律或定理,仅仅因为映射的参数性,它有时被称为自由定理。12
习题7.7
——————————————————————————————
困难:给定(id)==y,(g).map(f)==(fcomposeg)是一个自由定理。(这有时被称为映射融合,它可以用作优化;我们可以将其折叠到第一个映射中,而不是生成单独的并行计算来计算第二个映射。13你能证明吗?
7.3.2分叉定律尽管这一切很有趣,但这条特殊的法律并没有多大限制我们的实施。你可能一直在假设这些属性,甚至没有意识到它(在map,unit或的实现中有任何特殊情况或让map随机抛出异常会很奇怪)。让我们考虑一个更强的属性——fork不应该影响并行计算的结果:
fork(x)==x,
这似乎显然应该适用于我们的实现,并且它显然是一个理想的属性,符合我们对fork应该如何工作的期望。fork(x)应该做与x相同的事情,但异步地——在与主线程分开的逻辑线程中。如果这条定律并不总是成立,那么我们必须以某种方式知道何时在不改变含义的情况下进行调用是安全的,而无需类型系统的任何帮助。
令人惊讶的是,这个简单的属性对我们fork的实现施加了强烈的约束。在你写下这样的法律之后,摘下你的实施者帽子,戴上你的调试器帽子,并尝试违反你的法律。仔细考虑任何可能的极端情况,尝试提出反例,甚至构建一个非正式的证据,证明法律成立——至少足够彻底地说服一个持怀疑态度的程序员同行。
7.3.3违法:一个微妙的错误让我们试试这种思维模式。我们期待fork(x)==x用于x的所有选择和任何ExecutorService的选择。我们对x可能是什么有很好的了解;它是一些使用叉、单位和map2(以及从这些派生的其他组合子)的表达式。执行人服务怎么样?它有哪些可能的实现方式?在类中有一个很好的不同实现列表。执行程序(有关更多信息,请参阅API:)。
习题7.8
——————————————————————————————
困难:查看Executors中的各种静态方法,以了解存在的ExecutorService的不同实现。然后,在继续之前,请返回并重新审视您的fork实现,并尝试找到一个反例或说服自己法律适用于您的实现。
为什么关于代码和证明的法律很重要
陈述和证明有关API的属性似乎不寻常。这当然不是普通编程中通常做的事情。为什么它在FP中很重要?
在函数式编程中,很容易并且期望将常见功能分解为可以组合的通用、可重用的组件。副作用会损害组合性,但更一般地说,任何隐藏的或带外的假设或行为阻止我们将组件(无论是功能还是其他任何东西)视为黑匣子,都会使组合变得困难或不可能。
在我们的fork定律的例子中,我们可以看到,如果我们假设的定律不成立,我们的许多通用组合器,如parMap,将不再合理(它们的使用可能是危险的,因为它们可能会,取决于它们使用的更广泛的并行计算,导致死锁)。
为我们的API提供具有有意义和辅助推理的定律的代数,使API对客户端更有用,但也意味着我们可以将API的对象视为黑盒。正如我们将在第3部分看到的,这对于我们在编写的不同库中分解常见模式的能力至关重要。
实际上,在大多数fork实现中都会出现一个相当微妙的问题。当使用由有界大小的线程池支持的ExecutorService时(请参阅),很容易遇到死锁。14假设我们有一个由线程池支持的ExecutorService,其中最大线程数为1。尝试使用我们当前的实现运行以下示例:
vala=lazyUnit(42+1)vales=(1)println((es)(a,fork(a)))
大多数fork实现都会导致此代码死锁。你能明白为什么吗?让我们再看看我们的fork实现:
deffork[A](a:=Par[A]):Par[A]=es=(newCallable[A]{defcall=a(es).get})①(1)等待一个可赎回在另一个可赎回中的结果
请注意,我们首先提交可调用对象,在该可调用对象中,我们将向执行者服务提交另一个可调用对象并阻止其结果(回想一下,a(es)将向执行者服务提交可调用对象并取回未来)。如果我们的线程池大小为1,这是一个问题。外部可调用对象由唯一线程提交和拾取;在该线程中,在它完成之前,我们提交并阻止等待另一个可调用对象的结果,但没有线程可用于运行此可调用对象。它们在互相等待,因此我们的代码陷入僵局。
习题7.9
——————————————————————————————
困难:表明在这种fork实现的情况下,任何固定大小的线程池都可以死锁。
当你找到这样的反例时,你有两个选择:你可以尝试修复你的实现,使法律成立,或者你可以稍微完善你的定律,更明确地说明它成立的条件(你可以简单地规定你需要可以无限增长的线程池)。即使这是一个很好的练习;它强制您记录以前隐式的不变量或假设。
我们可以修复fork以在固定大小的线程池上工作吗?让我们看一个不同的实现:
deffork[A](fa:=Par[A]):Par[A]=es=fa(es)
这当然避免了僵局。唯一的问题是我们实际上并没有分叉一个单独的逻辑线程来评估fa.所以对于一些ExecutorService的fork(hugeComputing)(es)会在主线程中运行hugeComputing,这正是我们想要通过调用fork来避免的。不过,这仍然是一个有用的组合器,因为它允许我们延迟计算的实例化,直到实际需要它。让我们给它起个名字延迟:
defdelay[A](fa:=Par[A]):Par[A]=es=fa(es)
但我们真的希望能够在固定大小的线程池上运行任意计算。为此,我们需要选择Par的不同表示形式。
7.3.4使用Actos的完全非阻塞Par实现在本节中,我们将开发一个完全无阻塞的Par实现,适用于固定大小的线程池。由于这对于我们讨论功能设计各个方面的总体目标并不重要,因此如果您愿意,可以跳到下一节。否则,请继续阅读。
当前表示的本质问题是,如果没有当前线程阻塞其get方法,我们就无法从Future中获取值。不以这种方式泄漏资源的Par表示必须是非阻塞的,因为fork和map2的实现绝不能像那样调用阻塞当前线程的方法。正确编写这样的实现可能具有挑战性。幸运的是,我们有我们的法律来测试我们的实施,我们只需要把它做好一次。之后,我们库的用户可以享受一个可组合和抽象的API,每次都做正确的事情。
在下面的代码中,您不需要确切了解它的每个部分发生了什么。我们只是想用真实的代码向您展示尊重法律的Par的正确表示可能是什么样子的。
基本思想
我们如何实现Par的非阻塞表示?这个想法很简单。与其将Par转换为,我们可以从中获取一个值(这需要阻塞),我们将引入我们自己的Future版本,通过它我们可以注册一个回调,当结果准备就绪时将被调用。这是视角的轻微转变:
opaquetypeFuture[+A]=(A=Unit)=Unit①opaquetypePar[+A]=ExecutorService=Future[A]②
(1)将类型A=Unit的函数作为参数并返回Unit的函数
(2)Par看起来是一样的,但我们使用的是新的非阻塞Future而不是中的Future。
我们的Par类型看起来相同,除了我们现在使用的是新版本的Future,它的API与并发中的API不同。我们的未来不是调用get来从我们的未来获得结果,而是封装一个接收另一个函数的函数-一个期望A并返回一个单位的函数。A=Unit函数有时称为延续或回调。
通过这种编码,当我们将ExecutorService应用于表示Par[A]的函数时,我们得到一个新函数:(A=单位)=单位。然后,我们可以通过传递处理生成的A值的回调来调用它。每当计算A时,都会调用我们的回调,而不是立即调用。
对纯API使用本地副作用
我们在这里定义的Future类型是相当必要的—A=单位。这样的函数只能用于使用给定的A执行一些副作用,因为我们当然没有使用返回的结果。当使用像Future这样的类型时,我们是否仍在进行函数式编程?是的,但我们正在利用使用副作用作为纯函数式API的实现细节的常用技术。我们可以侥幸逃脱,因为我们使用的副作用对于使用Par的代码是无法观察到的。请注意,Future的函数表示是不透明的,不能由外部代码调用。
当我们完成非阻塞Par的其余实现时,您可能希望说服自己,外部代码无法观察到所使用的副作用。第14章更详细地讨论了局部效应的概念,可观察性以及我们对纯度和参考透明度定义的微妙之处,但就目前而言,非正式的理解是可以的。
有了Par的这种表示,让我们看看如何首先实现run函数,我们将它更改为只返回一个A。由于它从Par[A]到A,它必须构造一个延续并将其传递给未来。
清单7.6实现Par运行
extension[A](pa:Par[A])defrun(es:ExecutorService):A=valref=newAtomicReference[A]①vallatch=newCountDownLatch(1)②pa(es){a=(a);}③④⑤(1)用于存储结果的可变线程安全引用(有关这些类的更多信息,请参阅包)
(2)允许线程等待,直到其countDown方法被调用一定次数。在这里,当我们从p收到类型A的值时,将调用一次countDown方法,我们希望运行实现阻止,直到发生这种情况。
(3)当我们收到值时,它会设置结果并释放锁存器。
(4)等待结果可用且闩锁松开
(5)一旦我们传递了闩锁,我们就知道ref已经设置好了,我们返回它的值
应该注意的是,运行会在等待闩锁时阻塞调用线程。不可能编写不阻塞的运行实现。由于它需要返回一个类型为A的值,它需要等待该值可用才能返回。出于这个原因,我们希望API的用户在想要等待结果之前避免调用run。我们甚至可以从我们的API中完全删除run,并在Par上公开apply方法,以便用户可以注册异步回调。这当然是一个有效的设计选择,但我们现在将保留我们的API。
让我们看一个创建Par的示例。最简单的一个是单位:
defunit[A](a:A):Par[A]=es=cb=cb(a)①
(1)它只是将值传递给延续。请注意,不需要执行器服务。
由于单位已经有一个可用的A类型的值,它需要做的就是调用延续cb,传递它这个值。例如,如果该延续是来自我们运行实现的延续,这将释放闩锁并立即提供结果。
叉子呢?这就是我们介绍实际并行性的地方:
deffork[A](a:=Par[A]):Par[A]=es=cb=eval(es)(a(es)(cb))①defeval(es:ExecutorService)(r:=Unit):Unit=②(newCallable[Unit]{defcall=r})(1)评估分叉A的评估并立即返回。回调将在另一个线程上异步调用。
(2)一个帮助程序函数,用于使用某些执行器服务异步评估操作
当fork返回的未来收到它的延续cb时,它会分叉一个任务来计算按名称参数a。一旦参数被评估并调用以生成Future[A],我们就注册cb以在该Future具有其结果A时被调用。
地图2呢?召回签名:
extension[A](pa:Par[A])defmap2[B,C](pb:Par[B])(f:(A,B)=C):Par[C]
在这里,非阻塞实现要棘手得多。从概念上讲,我们希望map2并行运行两个Par参数。当两个结果都到达时,我们要调用f,然后将结果C传递给延续。但是这里有几个竞争条件需要担心,并且仅使用并发的低级原语很难实现正确的非阻塞实现。
演员简介
为了实现map2,我们将使用一个非阻塞并发原语,称为actor。Actor本质上是一个并发进程,不会经常占用线程。相反,它仅在收到消息时占用线程。重要的是,尽管多个线程可能同时向一个参与者发送消息,但该参与者一次只处理一条消息,将其他消息排队等待后续处理。这使得它们在编写必须由多个线程访问的棘手代码时作为并发原语很有用,否则这些代码很容易出现争用条件或死锁。
最好用一个例子来说明这一点。许多actor的实现都非常适合我们的目的,包括流行的Akka库中的那个(见),但为了简单起见,我们将使用文件中章节代码中包含的我们自己的最小actor实现:
*scalavals=(4)①s:=scalavalechoer=Actor[String](s):|msg=println(s"Gotmessage:'$msg'")②echoer:[String]=
(1)参与者使用执行器服务来处理消息到达时,因此我们在这里创建一个。
(2)这是一个非常简单的actor,它只是回显它收到的字符串消息。请注意,我们提供s,一个执行器服务,用于处理消息。
我们来试试这个Actor:
scalaechoer!"hello"①Gotmessage:'hello'scala②scalaechoer!"goodbye"③Gotmessage:'goodbye'scalaechoer!"You'rejustrepeatingeverythingIsay,aren'tyou?"Gotmessage:'You'rejustrepeatingeverythingIsay,aren'tyou?'
(1)它向演员发送“你好”消息。
(2)请注意,echoer此时不占用线程,因为它没有进一步的消息要处理。
(3)它向演员发送“再见”信息。参与者的反应是将任务提交到其执行器服务来处理该消息。
了解Actor实现根本不重要。正确、高效的实现是相当微妙的,但如果你好奇,请参阅章节代码中的文件。实现不到100行普通Scala代码。15
通过参与者实现map2
现在,我们可以使用Actor实现map2,以从两个参数中收集结果。代码很简单,无需担心竞争条件,因为我们知道Actor一次只会处理一条消息。
7.7使用Actor实现map2
extension[A](p:Par[A])defmap2[B,C](p2:Par[B])(f:(A,B)=C):Par[C]=es=cb=varar:Option[A]=None①varbr:Option[B]=None//thisimplementationisalittletooliberalinforkingofthreads-//itforksanewlogicalthreadfortheactorandforstack-safety,//forksevaluationofthecallback`cb`valcombiner=Actor[Either[A,B]](es):②caseLeft(a)=(es)(cb(f(a,)))③elsear=Some(a)caseRight(b)=(es)(cb(f(,b)))④elsebr=Some(b)p(es)(a=combiner!Left(a))⑤p2(es)(b=combiner!Right(b))
(1)两个可变变量用于存储两个结果。
(2)等待两个结果的参与者,将它们与f组合,并将结果传递给cb
(3)如果A结果先出现,则将其存储在ar中并等待B。如果A结果排在最后,并且我们已经有了B,它会调用f和两个结果,并将结果C传递给回调cb。
(4)类似地,如果B结果首先出现,它会将其存储在br中并等待A。如果B结果排在最后并且我们已经有了A,它会调用f和两个结果,并将结果C传递给回调cb。
鉴于这些实现,我们现在应该能够运行任意复杂度的Par值,而不必担心线程耗尽,即使参与者只能访问单个JVM线程。
让我们在REPL中尝试一下:
,fpinscala➥.=((1,100000))((_))p:[List[Double]]=functionscalavalx=((2))x:List[Double]=List(1.0,1.40951,1.7320508075688772,2.0,2.23606797749979,2.449489742783178,2.64575,2.8284271247461903,3.0,3.1622776601683795,3.3166247903554,3.46410
这将调用fork大约100,000次,启动大约100,000个演员,一次将结果组合两个。由于我们的非阻塞Actor实现,我们不需要100,000个JVM线程。
匪夷所思。我们的分叉定律现在适用于固定大小的线程池。
习题7.10
——————————————————————————————
困难:我们的非阻塞表示目前根本不处理错误。如果我们的计算在任何时候抛出异常,则运行实现的锁存器永远不会倒计时,并且异常会被简单地吞噬。你能解决这个问题吗?
退一步说,本节的目的不一定是找出fork的最佳非阻塞实现,而是表明法律很重要。它们为我们在考虑图书馆设计时提供了另一个角度。如果我们没有尝试编写API的一些定律,我们可能直到很久以后才在第一次实现中发现线程资源泄漏。
通常,在为API选择法律时,可以考虑多种方法。你可以从那里思考你的概念模型和推理,以假设应该成立的规律。你也可以发明你认为可能有用或有启发性的定律(就像我们对叉子定律所做的那样),看看是否有可能和明智地确保它们适用于你的模型。最后,您可以查看您的实施情况,并在此基础上提出您期望持有的法律。16
7.4将组合器提炼到最一般的形式功能设计是一个迭代过程。写下API并至少有一个原型实现后,请尝试将其用于逐渐更复杂或更现实的方案;有时你会发现这些场景需要新的组合器。在直接进入实现之前,最好先看看是否可以将所需的组合器细化为最通用的形式。可能你只需要一些更通用的组合器的特定情况。
关于本节中的练习
本节中的练习和答案使用我们原始的Par[A]的更简单(阻塞)表示。如果您想使用我们在上一节中开发的非阻塞实现来完成练习和答案,请参阅练习和答案项目中的文件。
让我们看一个例子。假设我们希望一个函数根据初始计算的结果在两个分叉计算之间进行选择:
defchoice[A](cond:Par[Boolean])(t:Par[A],f:Par[A]):Par[A]
这将构造一个计算,如果cond结果为真,则从t开始,如果cond结果为假,则使用f。我们当然可以通过阻塞cond的结果,然后使用此结果来确定是运行t还是f来实现这一点。下面是一个简单的阻塞实现:17
defchoice[A](cond:Par[Boolean])(t:Par[A],f:Par[A]):Par[A]=es=(es).getthent(es)①elsef(es)
(1)请注意,我们正在阻止cond的结果。
但在我们继续之前,让我们考虑一下这个组合器。它在做什么?它正在运行cond,然后当结果可用时,它运行t或f。这似乎是合理的,但让我们看看我们是否可以想到一些变化来理解这个组合器的本质。事实上,我们使用布尔值并且只在两种可能的并行计算中选择,t和f,这里有一些相当武断的事实。为什么只有两个?如果能够根据第一个计算的结果在两个并行计算之间进行选择很有用,那么在N个计算之间进行选择应该很有用:
defchoiceN[A](n:Par[Int])(choices:List[Par[A]]):Par[A]
假设choiceN运行n,然后使用它从options中选择并行计算。这比选择更笼统一些。
习题7.11
——————————————————————————————
实现选择N,然后根据选择N进行选择。
请注意我们到目前为止所做的工作;我们已经将我们原来的组合器Choice改进为ChoiceN,结果证明它更通用,能够表达Choice以及Choice不支持的其他用例。但是,让我们继续看看我们是否可以将选择细化为更通用的组合器。
习题7.12
——————————————————————————————
关于选择N仍然有一些相当武断的东西:List的选择似乎过于具体。为什么我们拥有什么样的容器很重要?例如,如果我们有一个计算列表而不是计算列表,该怎么办?18
defchoiceMap[K,V](key:Par[K])(choices:Map[K,Par[V]]):Par[V]
一组可能选项的映射编码感觉过于具体,就像List一样。如果我们看一下我们的choiceMap实现,我们可以看到我们并没有真正使用Map的API。实际上,Map[A,Par[B]]用于提供一个函数:A=Par[B]。现在我们已经发现,回顾choice和choiceN,我们可以看到,对于选择,这对参数被用作布尔=Par[A]类型的函数(布尔值选择两个Par[A]参数之一),对于choiceN,该列表被用作Int=Par[A]类型的函数!让我们做一个更通用的签名来统一它们:
extension[A](pa:Par[A])defchooser[B](choices:A=Par[B]):Par[B]
习题7.13
——————————————————————————————
实现这个新的基元选择器,然后用它来实现选择和选择N。
每当你泛化这样的函数时,当你完成时,请批判性地看看你的泛化函数。尽管该功能可能是由某些特定用例激发的,但签名和实现可能具有更一般的含义。在这种情况下,选择器可能不再是此操作最合适的名称,它实际上非常通用-它是一个并行计算,运行时将运行初始计算,其结果用于确定第二个计算。在第一次计算的结果可用之前,甚至不需要存在第二个计算。它不需要存储在容器中,例如列表or地图.也许它是使用第一次计算的结果从整块布生成的。这个函数经常出现在函数库中,通常称为绑定或flatMap:
extension[A](pa:Par[A])defflatMap[B](f:A=Par[B]):Par[B]
flatMap真的是最原始的函数吗,或者我们可以进一步泛化吗?让我们再玩一玩。flatMap这个名字暗示了这样一个事实,即此操作可以分解为两个步骤:将f:A=Par[B]映射到我们的Par[A]上,这会生成一个Par[Par[B]],然后将这个嵌套的Par[Par[B]]展平化为Par[B]。但这很有趣;它表明我们需要做的就是添加一个更简单的组合器——我们称之为连接——用于将Par[Par[X]]转换为Par[X],以便选择任何X:
defjoin[A](ppa:Par[Par[A]]):Par[A]
同样,我们只是遵循类型。我们有一个示例,它需要一个具有给定签名的函数,因此我们将其变为存在。现在它存在,我们可以考虑签名的含义。我们称之为join,因为从概念上讲,它是一个并行计算,运行时将执行内部计算,等待它完成(很像),然后返回其结果。
习题7.14
——————————————————————————————
实现联接。你能看到如何使用连接实现平面地图吗?你能使用flatMap实现连接吗?
我们将到此为止,但鼓励您进一步探索这个代数。尝试更复杂的示例,发现新的组合器,看看你发现了什么!以下是需要考虑的一些问题:
你能实现一个与map2具有相同签名但使用flatMap和单位的函数吗?它的含义与map2有何不同?
你能想到与代数的其他原语相关的定律吗?
是否有不能用这个代数表达的并行计算?你能想到任何甚至不能通过向代数添加新原语来表达的计算吗?
认识代数的表现力和局限性
当你练习更多的函数式编程时,你将培养的技能之一是能够识别哪些函数可以从代数中表达,以及该代数的局限性是什么。例如,在前面的例子中,一开始可能并不明显,像选择这样的函数不能纯粹用map、map2和unit来表示,选择可能并不明显,选择只是flatMap的一个特例。随着时间的推移,像这样的观察会很快到来,你也会更好地发现如何修改你的代数,使一些需要的组合子可表达。这些技能将对您的所有API设计工作有所帮助。
作为一个实际的考虑,能够将API简化为一组最小的基元函数非常有用。正如我们之前在现有组合器中实现parMap时所指出的,原始组合器经常封装一些相当棘手的逻辑,重用它们意味着我们不必复制这个逻辑。
7.5结论我们现在已经完成了一个库的设计,用于以纯函数方式定义并行和异步计算。虽然这个领域很有趣,但本章的主要目标是为你提供一个了解功能设计过程的窗口,了解你可能遇到的问题类型,以及处理这些问题的想法。
在下一章中,我们将研究一个完全不同的领域,为该领域提供另一个蜿蜒的API之旅,并进一步了解功能设计。
总结现有的图书馆都无法重新审查。大多数库都包含任意设计选择。尝试构建替代库可能会导致发现有关问题空间的新事物。
简单的例子让我们专注于问题域的本质,而不是迷失在偶然的细节中。
并行和异步计算可以以纯函数方式建模。
构建计算的描述以及运行计算的单独解释器允许将计算视为值,然后可以与其他计算组合。
API设计的一种有效技术是调用类型和实现,尝试实现这些类型和实现,进行调整和迭代。
Par[A]类型描述了一种计算,该计算可以在多个线程上评估部分或全部计算。
Par值可以变换并与许多熟悉的操作相结合,例如map、flatMap和序列。
将API视为代数并定义约束实现的定律既是有价值的设计工具,也是有效的测试技术。
将API划分为一组最小的基元函数和一组组合器函数可促进重用和理解。
参与者是基于消息传递的非阻塞并发原语。Actor不是纯粹的功能,但可用于实现纯功能API,如非阻塞Par的map2实现所示。
7.9练习答案答案7.1
——————————————————————————————
让我们为每个并行计算引入一个类型参数,以及组合每个并行计算结果的函数的输出:
defmap2[A,B,C](pa:Par[A],pb:Par[B])(f:(A,B)=C):Par[C]
或者,我们可以将map2定义为Par[A]上的扩展方法:
extension[A](pa:Par[A])defmap2[B,C](pb:Par[B])(f:(A,B)=C):Par[C]
答案7.2
——————————————————————————————
答案将在练习7.2之后立即讨论。
答案7.3
——————————————————————————————
我们最初的map2实现等待两个期货完成,然后返回一个包含最终结果的UnitFuture。在调用者看到未来之前,我们已经等待了两个组成计算完成!
而不是使用UnitFuture,我们需要开始组成计算并立即返回引用它们的复合未来。然后,调用方可以在Future上使用超时的get重载或任何其他方法。
为了实现get的超时变体,我们需要在我们开始的每个期货上调用它。我们可以使用提供的超时等待第一个结果,并测量完成所需的时间。然后,我们可以等待第二个结果,将超时减少等待第一个结果的时间:
extension[A](pa:Par[A])defmap2Timeouts[B,C](pb:Par[B])(f:(A,B)=C):Par[C]=es=newFuture[C]:privatevalfutureA=pa(es)①privatevalfutureB=pb(es)@volatileprivatevarcache:Option[C]=NonedefisDone=()=get(,)defget(timeout:Long,units:TimeUnit)=valtimeoutNs=(timeout,units)valstarted==(timeoutNs,)②valelapsed==(timeoutNs-elapsed,➥)③valc=f(a,b)cache=Some(c)cdefisCancelled=||(evenIfRunning:Boolean)=(evenIfRunning)||(evenIfRunning)
(1)当未来被创造出来时,我们立即开始组成计算。
(2)等待futureA完成,如果未在请求的时间内完成,则超时。
(3)等待未来B完成,但只等待请求的超时与等待未来A完成的已用时间之间的差异。
答案7.4
——————————————————————————————
我们返回一个匿名函数,在收到A类型的值时,立即调用lazyUnit,传递f(a):
defasyncF[A,B](f:A=B):A=Par[B]=a=lazyUnit(f(a))
由于lazyUnit采用按名称参数,因此尚未计算f(a)。要了解为什么lazyUnit(f(a))实现了所需的功能,让我们逐步替换定义:
a=lazyUnit(f(a))a=fork(unit(f(a)))①a=es=(newCallbable[B]{defrun=unit(f(a))(es).get})②a=es=(newCallbable[B]{defrun=(es=UnitFuture(f(a)))(es).get})③a=es=(newCallbable[B]{defrun=UnitFuture(f(a)).get})④a=es=(newCallbable[B]{defrun=f(a)})⑤(1)替换懒惰单位。
(2)替代叉。
(3)替代单位。
(4)将es应用于es=UnitFuture(f(a))。
(5)替换单位未来#获取。
我们剩下一个函数,它接收A并返回Par,该函数将作业提交给在操作系统线程上运行时计算f(a)的执行器。
答案7.5
——————————————————————————————
到目前为止,我们已经实现了很多次序列-一个使用map2的foldRight。因此,让我们稍微复制、粘贴和调整类型:
defsequence[A](pas:List[Par[A]]):Par[List[A]]=(unit([A]))((pa,acc)=(acc)(_::_))
但是,在这种情况下,我们可以做得更好,使用类似于我们使用的技术总和.让我们将计算分为两半,并并行计算每半。由于我们再次需要对集合中的元素进行有效的随机访问,因此我们将首先编写一个与IndexedSeq[Par[A]]一起使用的版本:
defsequenceBalanced[A](pas:IndexedSeq[Par[A]]):Par[IndexedSeq[A]]=()==1(a=IndexedSeq(a))elseval(l,r)=(/2)sequenceBalanced(l).map2(sequenceBalanced(r))(_++_)
然后我们可以根据序列平衡来实现序列:
defsequence[A](pas:List[Par[A]]):Par[List[A]]=sequenceBalanced().map(_.toList)
答案7.6
——————————————————————————————
我们可以首先使用提供的谓词过滤列表,并将结果列表提升到Par中:
defparFilter[A](as:List[A])(f:A=Boolean):Par[List[A]]=unit((f))
但是,此解决方案没有任何并行性。我们可以尝试通过使用lazyUnit而不是unit来解决这个问题:
defparFilter[A](as:List[A])(f:A=Boolean):Par[List[A]]=lazyUnit((f))
这要好一些,因为过滤是在不同的逻辑线程上完成的,但所有过滤都是在同一逻辑线程上完成的。我们真的希望谓词的每个调用都有它自己的逻辑线程。就像在parMap中一样,我们可以使用asyncF,但这一次,我们不是直接传递f,而是传递一个匿名函数,该函数将A转换为List[A]—如果谓词通过,则为单个元素列表,否则为空列表:
defparFilter[A](as:List[A])(f:A=Boolean):Par[List[A]]=fork:①valpars:List[Par[List[A]]]=(asyncF(a=iff(a)thenList(a)elseNil))②sequence(pars).map(_.flatten)③
(1)就像在parMap中一样,我们立即分叉,因此对原始列表的映射是在单独的逻辑线程上完成的,而不是在调用者的线程上完成的。
(2)我们使用asyncF将A=List[A]函数转换为A=Par[List[A]]函数。
(3)sequence(pars)返回一个Par[List[List[A]]],所以我们映射它并展平内部嵌套列表。
答案7.7
——————————————————————————————
我们将从我们试图证明的相等开始,然后进行代数替换,直到我们证明等式的两边简化为相同的值:
(g).map(f)==(fcomposeg)①(id).map(fcomposeg)==((fcomposeg)composeid)②(id).map(fcomposeg)==(fcomposeg)③(fcomposeg)==(fcomposeg)④
(1)初始定律
(2)将恒等函数替换为g,f在两侧将g组成f。
(3)将右侧的(f组成g)撰写id简化为f组成g。
(4)将左侧的(id)简化为y。
答案7.8
——————————————————————————————
考虑使用只有一个线程的固定线程池时会发生什么情况。下一节将更详细地探讨这一点。
答案7.9
——————————————————————————————
任何固定大小的线程池都可以通过运行fork(fork(fork(x)))形式的表达式来死锁,其中至少比池中的线程多一个分支。池中的每个线程都会阻塞对.get的调用,从而导致所有线程都被阻塞,而另一个逻辑线程正在等待运行,从而解决所有等待。
答案7.10
——————————————————————————————
我们将在第13章回到这个问题。
答案7.11
——————————————————————————————
我们通过运行n并等待结果来计算选择的索引。然后,我们在该索引的选项中查找Par并运行它:
defchoiceN[A](n:Par[Int])(choices:List[Par[A]]):Par[A]=es=valindex=(es).get%(index).run(es)
根据choiceN实现选择涉及将条件转换为索引,我们可以通过map.在这里,我们选择将真实案例索引设为0,将错误案例索引设为1:
defchoice[A](cond:Par[Boolean])(t:Par[A],f:Par[A]):Par[A]=choiceN((b=ifbthen0else1))(List(t,f))
答案7.12
——————————————————————————————
选择映射的实现与选择N的实现几乎相同;我们不是使用索引在列表中查找Par,而是使用键在Map中进行查找:
defchoiceMap[K,V](key:Par[K])(choices:Map[K,Par[V]]):Par[V]=es=valk=(es).getchoices(k).run
如果你愿意,停止阅读这里,看看你是否可以想出一个新的、更通用的组合器,你可以实现choice、choiceN和choiceMap。
答案7.13
——————————————————————————————
实现几乎与choiceN和choiceMap相同,唯一的区别是选择查找是如何完成的:
extension[A](pa:Par[A])defchooser[B](choices:A=Par[B]):Par[B]=es=vala=(es).getchoices(a).run
现在可以通过选择器实现选择,方法是传递一个函数,该函数在COND返回true时选择t,否则选择f:
defchoice[A](cond:Par[Boolean])(t:Par[A],f:Par[A]):Par[A]=(b=ifbthentelsef)
同样,可以通过传递一个在选项列表中执行查找的函数来实现choiceN。
defchoiceN[A](n:Par[Int])(choices:List[Par[A]]):Par[A]=(i=choices(i%))
答案7.14
——————————————————————————————
我们首先运行外部Par并等待它完成。然后我们运行生成的内部Par:
defjoin[A](ppa:Par[Par[A]]):Par[A]=es=(es).(es)
为了通过连接实现flatMap,我们首先在初始pa上映射f,给我们一个Par[Par[B]]。然后我们加入:
extension[A](pa:Par[A])defflatMap[B](f:A=Par[B]):Par[B]=join((f))
在flatMap中实现join有点棘手。flatMap允许我们将Par[X]转换为Par[Y],给定函数X=Par[Y]。诀窍是取X=Par[A]和Y=A.因此,我们需要一个函数Par[A]=Par[A],它是恒等函数:
defjoin[A](ppa:Par[Par[A]]):Par[A]=(identity)
1在本章中,我们将非正式地使用术语逻辑线程来表示与程序的主执行线程同时运行的计算。逻辑线程和操作系统线程之间不需要一对一的对应关系;例如,我们可能通过线程池将大量逻辑线程映射到较少数量的操作系统线程上。
2Scala中的函数参数从左到右严格计算,所以如果单元延迟执行直到调用get,那么我们将生成并行计算并等待它完成,然后再生成第二个并行计算。这意味着计算实际上是顺序的!
3这种对表示的漠不关心暗示了这些操作实际上更通用,可以抽象为适用于Par以外的类型。我们将在第3部分中详细探讨此主题。
4这很像第1章中Cafe示例中的buyCoffee方法访问信用卡处理系统的方式。
5在这种情况下,还有另一个很好的理由不将parMap实现为新的原语:正确执行此操作具有挑战性,特别是如果我们想正确尊重超时。通常情况下,原始组合器封装了一些相当棘手的逻辑,重用它们意味着我们不必复制这个逻辑。
6我们的意思是一个或多个集合的数学意义上的代数,以及对这些集合的对象进行操作的函数集合,以及一组公理。公理是假定为真的陈述,我们可以从中推导出其他也必须为真的定理。在我们的例子中,集合是特定的类型,如Par[A]和List[Par[A]],函数是像map2、单位和序列这样的运算。
7关于这一点,我们将在本书的其余部分有更多的话要说。在下一章中,我们将设计一个声明式测试库,让我们定义我们期望函数满足的属性,并自动生成测试用例来检查这些属性。在第3部分中,我们将介绍仅由定律集指定的抽象接口。
8这里我们指的是数学意义上的恒等式,即两个表达式相同或等价的陈述。
9恒等函数定义为defid[A](a:A):A=a。
10这与求解代数方程时可能做的替换和简化相同。
11我们说map需要保持结构,因为它不会改变并行计算的结构,只会改变计算中的值。
12自由定理的概念是由菲利普·瓦德勒(PhilipWadler)在他的经典论文“免费定理!()。
13我们对Par的表示无法实现此优化,因为它是一个不透明的函数。例如,(g)返回一个新的Par,它是一个黑盒——当我们对该结果调用.map(f)时,我们已经失去了用于构造(g)的部分的知识:即y、map和g。我们所看到的只是不透明的函数,因此无法提取出g来与f组成。如果将Par化为数据类型(例如,各种操作的枚举),那么我们可以模式匹配并发现应用此规则的机会。您可能想尝试自己尝试这个想法。
14在下一章中,我们将编写一个用于测试的组合器库,它可以帮助自动发现此类问题。
15actor实现中的主要棘手之处与多个线程可能同时向actor发送消息这一事实有关。实现需要确保一次处理一个消息,以及最终处理发送给参与者的所有消息,而不是无限期地排队。即便如此,代码最终还是很短。
16最后一种产生规律的方法可能是最薄弱的,因为即使实施有缺陷或需要各种不寻常的附带条件,使组合变得困难,也很容易让法律反映实施。
17请参阅非阻塞实现章节代码中的。
18Map[K,V](参见API:)是Scala标准库中的纯函数式数据结构。它将K类型的键与V类型的值以一对一的关系相关联,并允许我们通过关联的键查找值。